return ret;
}
-/*
- * Find a relayd and send the stream
- *
- * Returns 0 on success, < 0 on error
- */
-static
-int send_relayd_stream(struct lttng_consumer_stream *stream, char *path)
-{
- struct consumer_relayd_sock_pair *relayd;
- int ret = 0;
- char *stream_path;
-
- if (path != NULL) {
- stream_path = path;
- } else {
- stream_path = stream->chan->pathname;
- }
- /* The stream is not metadata. Get relayd reference if exists. */
- rcu_read_lock();
- relayd = consumer_find_relayd(stream->net_seq_idx);
- if (relayd != NULL) {
- /* Add stream on the relayd */
- pthread_mutex_lock(&relayd->ctrl_sock_mutex);
- ret = relayd_add_stream(&relayd->control_sock,
- stream->name, stream_path,
- &stream->relayd_stream_id,
- stream->chan->tracefile_size,
- stream->chan->tracefile_count);
- pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
- if (ret < 0) {
- goto end;
- }
- uatomic_inc(&relayd->refcount);
- } else if (stream->net_seq_idx != (uint64_t) -1ULL) {
- ERR("Network sequence index %" PRIu64 " unknown. Not adding stream.",
- stream->net_seq_idx);
- ret = -1;
- goto end;
- }
-
-end:
- rcu_read_unlock();
- return ret;
-}
-
-/*
- * Find a relayd and close the stream
- */
-static
-void close_relayd_stream(struct lttng_consumer_stream *stream)
-{
- struct consumer_relayd_sock_pair *relayd;
-
- /* The stream is not metadata. Get relayd reference if exists. */
- rcu_read_lock();
- relayd = consumer_find_relayd(stream->net_seq_idx);
- if (relayd != NULL) {
- consumer_stream_relayd_close(stream, relayd);
- }
- rcu_read_unlock();
-}
-
/*
* Take a snapshot of all the stream of a channel
*
goto end;
}
- cds_list_for_each_entry(stream, &channel->stream_no_monitor_list.head,
- no_monitor_node) {
+ cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
/*
* Lock stream because we are about to change its state.
*/
pthread_mutex_lock(&stream->lock);
+ /*
+ * Assign the received relayd ID so we can use it for streaming. The streams
+ * are not visible to anyone so this is OK to change it.
+ */
stream->net_seq_idx = relayd_id;
channel->relayd_id = relayd_id;
if (relayd_id != (uint64_t) -1ULL) {
- ret = send_relayd_stream(stream, path);
+ ret = consumer_send_relayd_stream(stream, path);
if (ret < 0) {
ERR("sending stream to relayd");
goto end_unlock;
}
- DBG("Stream %s sent to the relayd", stream->name);
} else {
ret = utils_create_stream_file(path, stream->name,
- stream->chan->tracefile_size, stream->tracefile_count_current,
+ stream->chan->tracefile_size,
+ stream->tracefile_count_current,
stream->uid, stream->gid);
if (ret < 0) {
ERR("utils_create_stream_file");
ret = kernctl_buffer_flush(stream->wait_fd);
if (ret < 0) {
- ERR("Failed to flush kernel metadata stream");
+ ERR("Failed to flush kernel stream");
goto end_unlock;
}
ret = kernctl_get_subbuf_size(stream->wait_fd, &len);
if (ret < 0) {
ERR("Snapshot kernctl_get_subbuf_size");
- goto end_unlock;
+ goto error_put_subbuf;
}
ret = kernctl_get_padded_subbuf_size(stream->wait_fd, &padded_len);
if (ret < 0) {
ERR("Snapshot kernctl_get_padded_subbuf_size");
- goto end_unlock;
+ goto error_put_subbuf;
}
read_len = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len,
padded_len - len);
/*
- * We write the padded len in local tracefiles but the
- * data len when using a relay.
- * Display the error but continue processing to try to
- * release the subbuffer.
+ * We write the padded len in local tracefiles but the data len
+ * when using a relay. Display the error but continue processing
+ * to try to release the subbuffer.
*/
if (relayd_id != (uint64_t) -1ULL) {
if (read_len != len) {
ret = 0;
goto end;
+error_put_subbuf:
+ ret = kernctl_put_subbuf(stream->wait_fd);
+ if (ret < 0) {
+ ERR("Snapshot kernctl_put_subbuf error path");
+ }
end_unlock:
pthread_mutex_unlock(&stream->lock);
end:
* Returns 0 on success, < 0 on error
*/
int lttng_kconsumer_snapshot_metadata(uint64_t key, char *path,
- struct lttng_consumer_local_data *ctx)
+ uint64_t relayd_id, struct lttng_consumer_local_data *ctx)
{
+ int ret, use_relayd = 0;
+ ssize_t ret_read;
struct lttng_consumer_channel *metadata_channel;
struct lttng_consumer_stream *metadata_stream;
- int ret;
+
+ assert(ctx);
DBG("Kernel consumer snapshot metadata with key %" PRIu64 " at path %s",
key, path);
metadata_channel = consumer_find_channel(key);
if (!metadata_channel) {
- ERR("Snapshot kernel metadata channel not found for key %lu", key);
+ ERR("Kernel snapshot metadata not found for key %" PRIu64, key);
ret = -1;
- goto end;
+ goto error;
}
metadata_stream = metadata_channel->metadata_stream;
assert(metadata_stream);
- ret = utils_create_stream_file(path, metadata_stream->name,
- metadata_stream->chan->tracefile_size,
- metadata_stream->tracefile_count_current,
- metadata_stream->uid, metadata_stream->gid);
- if (ret < 0) {
- goto end;
+ /* Flag once that we have a valid relayd for the stream. */
+ if (relayd_id != (uint64_t) -1ULL) {
+ use_relayd = 1;
}
- metadata_stream->out_fd = ret;
- ret = 0;
- while (ret >= 0) {
- ret = lttng_kconsumer_read_subbuffer(metadata_stream, ctx);
+ if (use_relayd) {
+ ret = consumer_send_relayd_stream(metadata_stream, path);
+ if (ret < 0) {
+ goto error;
+ }
+ } else {
+ ret = utils_create_stream_file(path, metadata_stream->name,
+ metadata_stream->chan->tracefile_size,
+ metadata_stream->tracefile_count_current,
+ metadata_stream->uid, metadata_stream->gid);
if (ret < 0) {
- if (ret != -EPERM) {
- ERR("Kernel snapshot reading subbuffer");
- goto end;
+ goto error;
+ }
+ metadata_stream->out_fd = ret;
+ }
+
+ do {
+ ret_read = lttng_kconsumer_read_subbuffer(metadata_stream, ctx);
+ if (ret_read < 0) {
+ if (ret_read != -EPERM) {
+ ERR("Kernel snapshot reading metadata subbuffer (ret: %ld)",
+ ret_read);
+ goto error;
}
- /* "ret" is negative at this point so we will exit the loop. */
+ /* ret_read is negative at this point so we will exit the loop. */
continue;
}
+ } while (ret_read >= 0);
+
+ if (use_relayd) {
+ close_relayd_stream(metadata_stream);
+ metadata_stream->net_seq_idx = (uint64_t) -1ULL;
+ } else {
+ ret = close(metadata_stream->out_fd);
+ if (ret < 0) {
+ PERROR("Kernel consumer snapshot metadata close out_fd");
+ /*
+ * Don't go on error here since the snapshot was successful at this
+ * point but somehow the close failed.
+ */
+ }
+ metadata_stream->out_fd = -1;
}
ret = 0;
-end:
+
+ cds_list_del(&metadata_stream->send_node);
+ consumer_stream_destroy(metadata_stream, NULL);
+ metadata_channel->metadata_stream = NULL;
+error:
rcu_read_unlock();
return ret;
}
msg.u.channel.name, msg.u.channel.uid, msg.u.channel.gid,
msg.u.channel.relayd_id, msg.u.channel.output,
msg.u.channel.tracefile_size,
- msg.u.channel.tracefile_count,
+ msg.u.channel.tracefile_count, 0,
msg.u.channel.monitor);
if (new_channel == NULL) {
lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
goto end_nosignal;
}
new_channel->nb_init_stream_left = msg.u.channel.nb_init_streams;
+ switch (msg.u.channel.output) {
+ case LTTNG_EVENT_SPLICE:
+ new_channel->output = CONSUMER_CHANNEL_SPLICE;
+ break;
+ case LTTNG_EVENT_MMAP:
+ new_channel->output = CONSUMER_CHANNEL_MMAP;
+ break;
+ default:
+ ERR("Channel output unknown %d", msg.u.channel.output);
+ goto end_nosignal;
+ }
/* Translate and save channel type. */
switch (msg.u.channel.type) {
/* First send a status message before receiving the fds. */
ret = consumer_send_status_msg(sock, ret_code);
if (ret < 0) {
- /*
- * Somehow, the session daemon is not responding
- * anymore.
- */
+ /* Somehow, the session daemon is not responding anymore. */
goto error_fatal;
}
if (ret_code != LTTNG_OK) {
- /*
- * Channel was not found.
- */
+ /* Channel was not found. */
goto end_nosignal;
}
- /* block */
+ /* Blocking call */
if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
rcu_read_unlock();
return -EINTR;
}
goto end_nosignal;
}
+
new_stream->chan = channel;
new_stream->wait_fd = fd;
switch (channel->output) {
*/
new_stream->hangup_flush_done = 0;
- ret = send_relayd_stream(new_stream, NULL);
- if (ret < 0) {
- consumer_del_stream(new_stream, NULL);
- goto end_nosignal;
- }
-
if (ctx->on_recv_stream) {
ret = ctx->on_recv_stream(new_stream);
if (ret < 0) {
- consumer_del_stream(new_stream, NULL);
+ consumer_stream_free(new_stream);
goto end_nosignal;
}
}
/* Do not monitor this stream. */
if (!channel->monitor) {
- DBG("Kernel consumer add stream %s in no monitor mode with"
+ DBG("Kernel consumer add stream %s in no monitor mode with "
"relayd id %" PRIu64, new_stream->name,
- new_stream->relayd_stream_id);
- cds_list_add(&new_stream->no_monitor_node,
- &channel->stream_no_monitor_list.head);
+ new_stream->net_seq_idx);
+ cds_list_add(&new_stream->send_node, &channel->streams.head);
break;
}
+ /* Send stream to relayd if the stream has an ID. */
+ if (new_stream->net_seq_idx != (uint64_t) -1ULL) {
+ ret = consumer_send_relayd_stream(new_stream, NULL);
+ if (ret < 0) {
+ consumer_stream_free(new_stream);
+ goto end_nosignal;
+ }
+ }
+
/* Get the right pipe where the stream will be sent. */
if (new_stream->metadata_flag) {
stream_pipe = ctx->consumer_metadata_pipe;
ERR("Consumer write %s stream to pipe %d",
new_stream->metadata_flag ? "metadata" : "data",
lttng_pipe_get_writefd(stream_pipe));
- consumer_del_stream(new_stream, NULL);
+ consumer_stream_free(new_stream);
goto end_nosignal;
}
{
if (msg.u.snapshot_channel.metadata == 1) {
ret = lttng_kconsumer_snapshot_metadata(msg.u.snapshot_channel.key,
- msg.u.snapshot_channel.pathname, ctx);
+ msg.u.snapshot_channel.pathname,
+ msg.u.snapshot_channel.relayd_id, ctx);
if (ret < 0) {
ERR("Snapshot metadata failed");
ret_code = LTTNG_ERR_KERN_META_FAIL;
return 0;
error_close_fd:
- {
+ if (stream->out_fd >= 0) {
int err;
err = close(stream->out_fd);
assert(!err);
+ stream->out_fd = -1;
}
error:
return ret;