X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer.c;h=e0d6ea496ade52af826cf02b0fc9fe0054b64895;hp=3638273d0797d55f549d1047175f2087a3919c5f;hb=3a84e2f3c3df5b461d9621b64f14abc3b8c3c29c;hpb=d88744a44aa5f2ca90ab87946692b9eed3120641 diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index 3638273d0..e0d6ea496 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -323,6 +323,7 @@ static void free_relayd_rcu(struct rcu_head *head) (void) relayd_close(&relayd->control_sock); (void) relayd_close(&relayd->data_sock); + pthread_mutex_destroy(&relayd->ctrl_sock_mutex); free(relayd); } @@ -562,7 +563,8 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, int cpu, int *alloc_ret, enum consumer_channel_type type, - unsigned int monitor) + unsigned int monitor, + uint64_t trace_archive_id) { int ret; struct lttng_consumer_stream *stream; @@ -589,6 +591,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE; stream->index_file = NULL; stream->last_sequence_number = -1ULL; + stream->trace_archive_id = trace_archive_id; pthread_mutex_init(&stream->lock, NULL); pthread_mutex_init(&stream->metadata_timer_lock, NULL); @@ -806,7 +809,8 @@ int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, pthread_mutex_lock(&relayd->ctrl_sock_mutex); ret = relayd_add_stream(&relayd->control_sock, stream->name, path, &stream->relayd_stream_id, - stream->chan->tracefile_size, stream->chan->tracefile_count); + stream->chan->tracefile_size, stream->chan->tracefile_count, + stream->trace_archive_id); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { goto end; @@ -1475,7 +1479,7 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx) */ static int write_relayd_metadata_id(int fd, struct lttng_consumer_stream *stream, - struct consumer_relayd_sock_pair *relayd, unsigned long padding) + unsigned long padding) { ssize_t ret; struct lttcomm_relayd_metadata_payload hdr; @@ -1610,7 +1614,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( /* Write metadata stream id before payload */ if (stream->metadata_flag) { - ret = write_relayd_metadata_id(outfd, stream, relayd, padding); + ret = write_relayd_metadata_id(outfd, stream, padding); if (ret < 0) { relayd_hang_up = 1; goto write_error; @@ -1799,7 +1803,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( } stream->reset_metadata_flag = 0; } - ret = write_relayd_metadata_id(splice_pipe[1], stream, relayd, + ret = write_relayd_metadata_id(splice_pipe[1], stream, padding); if (ret < 0) { written = ret; @@ -2337,7 +2341,6 @@ int consumer_post_rotation(struct lttng_consumer_stream *stream, stream->chan->name); ret = rotate_notify_sessiond(ctx, stream->chan->key); } - assert(stream->chan->nr_stream_rotate_pending >= 0); pthread_mutex_unlock(&stream->chan->lock); return ret; @@ -2574,7 +2577,9 @@ void *consumer_thread_data_poll(void *data) /* local view of the streams */ struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL; /* local view of consumer_data.fds_count */ - int nb_fd = 0, nb_pipes_fd; + int nb_fd = 0; + /* 2 for the consumer_data_pipe and wake up pipe */ + const int nb_pipes_fd = 2; /* Number of FDs with CONSUMER_ENDPOINT_INACTIVE but still open. */ int nb_inactive_fd = 0; struct lttng_consumer_local_data *ctx = data; @@ -2614,12 +2619,7 @@ void *consumer_thread_data_poll(void *data) free(local_stream); local_stream = NULL; - /* - * Allocate for all fds + 2: - * +1 for the consumer_data_pipe - * +1 for wake up pipe - */ - nb_pipes_fd = 2; + /* Allocate for all fds */ pollfd = zmalloc((consumer_data.stream_count + nb_pipes_fd) * sizeof(struct pollfd)); if (pollfd == NULL) { PERROR("pollfd malloc");