X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fkernel-consumer%2Fkernel-consumer.c;h=a6306291af17f153a35c56aa300a4bd51c3ed03b;hp=02198bd1b9e854f26a8614c5f76fbf76a36835f1;hb=10a5031171c7bca5b4498c871b119e5a88b6a3fb;hpb=618a6a28c0956fc6829c165a39ffec97f239096c diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 02198bd1b..a6306291a 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -104,72 +104,6 @@ int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream, return ret; } -/* - * Find a relayd and send the stream - * - * Returns 0 on success, < 0 on error - */ -static int send_relayd_stream(struct lttng_consumer_stream *stream, - char *path) -{ - int ret = 0; - const char *stream_path; - struct consumer_relayd_sock_pair *relayd; - - assert(stream); - assert(stream->net_seq_idx != -1ULL); - - if (path != NULL) { - stream_path = path; - } else { - stream_path = stream->chan->pathname; - } - - /* The stream is not metadata. Get relayd reference if exists. */ - rcu_read_lock(); - relayd = consumer_find_relayd(stream->net_seq_idx); - if (relayd != NULL) { - /* Add stream on the relayd */ - pthread_mutex_lock(&relayd->ctrl_sock_mutex); - ret = relayd_add_stream(&relayd->control_sock, stream->name, - stream_path, &stream->relayd_stream_id, - stream->chan->tracefile_size, stream->chan->tracefile_count); - pthread_mutex_unlock(&relayd->ctrl_sock_mutex); - if (ret < 0) { - goto end; - } - uatomic_inc(&relayd->refcount); - } else { - ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't send it.", - stream->key, stream->net_seq_idx); - ret = -1; - goto end; - } - - DBG("Stream %s with key %" PRIu64 " sent to relayd id %" PRIu64, - stream->name, stream->key, stream->net_seq_idx); - -end: - rcu_read_unlock(); - return ret; -} - -/* - * Find a relayd and close the stream - */ -static void close_relayd_stream(struct lttng_consumer_stream *stream) -{ - struct consumer_relayd_sock_pair *relayd; - - /* The stream is not metadata. Get relayd reference if exists. */ - rcu_read_lock(); - relayd = consumer_find_relayd(stream->net_seq_idx); - if (relayd) { - consumer_stream_relayd_close(stream, relayd); - } - rcu_read_unlock(); -} - /* * Take a snapshot of all the stream of a channel * @@ -201,8 +135,7 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, goto end; } - cds_list_for_each_entry(stream, &channel->stream_no_monitor_list.head, - no_monitor_node) { + cds_list_for_each_entry(stream, &channel->streams.head, send_node) { /* * Lock stream because we are about to change its state. */ @@ -215,14 +148,15 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, stream->net_seq_idx = relayd_id; channel->relayd_id = relayd_id; if (relayd_id != (uint64_t) -1ULL) { - ret = send_relayd_stream(stream, path); + ret = consumer_send_relayd_stream(stream, path); if (ret < 0) { ERR("sending stream to relayd"); goto end_unlock; } } else { ret = utils_create_stream_file(path, stream->name, - stream->chan->tracefile_size, stream->tracefile_count_current, + stream->chan->tracefile_size, + stream->tracefile_count_current, stream->uid, stream->gid); if (ret < 0) { ERR("utils_create_stream_file"); @@ -238,7 +172,7 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, ret = kernctl_buffer_flush(stream->wait_fd); if (ret < 0) { - ERR("Failed to flush kernel metadata stream"); + ERR("Failed to flush kernel stream"); goto end_unlock; } @@ -391,7 +325,7 @@ int lttng_kconsumer_snapshot_metadata(uint64_t key, char *path, } if (use_relayd) { - ret = send_relayd_stream(metadata_stream, path); + ret = consumer_send_relayd_stream(metadata_stream, path); if (ret < 0) { goto error; } @@ -685,12 +619,11 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, DBG("Kernel consumer add stream %s in no monitor mode with " "relayd id %" PRIu64, new_stream->name, new_stream->net_seq_idx); - cds_list_add(&new_stream->no_monitor_node, - &channel->stream_no_monitor_list.head); + cds_list_add(&new_stream->send_node, &channel->streams.head); break; } - ret = send_relayd_stream(new_stream, NULL); + ret = consumer_send_relayd_stream(new_stream, NULL); if (ret < 0) { consumer_stream_free(new_stream); goto end_nosignal;