X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer.c;h=d57271c3a9d27222c72d95d20128a3fdd4788df6;hb=d2b102df1d8edd7b6ff9ef3083308da5190e50af;hp=f0bacfb8ff0c897fd3259168a7733f8427243dc3;hpb=3c1150e16dc06456439ce4ae45da50b44a9c3b7c;p=lttng-tools.git diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index f0bacfb8f..d57271c3a 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -571,6 +571,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, stream->monitor = monitor; stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE; stream->index_fd = -1; + stream->last_sequence_number = -1ULL; pthread_mutex_init(&stream->lock, NULL); pthread_mutex_init(&stream->metadata_timer_lock, NULL); @@ -1555,6 +1556,16 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( if (stream->metadata_flag) { /* Metadata requires the control socket. */ pthread_mutex_lock(&relayd->ctrl_sock_mutex); + if (stream->reset_metadata_flag) { + ret = relayd_reset_metadata(&relayd->control_sock, + stream->relayd_stream_id, + stream->metadata_version); + if (ret < 0) { + relayd_hang_up = 1; + goto write_error; + } + stream->reset_metadata_flag = 0; + } netlen += sizeof(struct lttcomm_relayd_metadata_payload); } @@ -1578,6 +1589,15 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( /* No streaming, we have to set the len with the full padding */ len += padding; + if (stream->metadata_flag && stream->reset_metadata_flag) { + ret = utils_truncate_stream_file(stream->out_fd, 0); + if (ret < 0) { + ERR("Reset metadata file"); + goto end; + } + stream->reset_metadata_flag = 0; + } + /* * Check if we need to change the tracefile before writing the packet. */ @@ -1596,6 +1616,12 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( outfd = stream->out_fd; if (stream->index_fd >= 0) { + ret = close(stream->index_fd); + if (ret < 0) { + PERROR("Closing index"); + goto end; + } + stream->index_fd = -1; ret = index_create_file(stream->chan->pathname, stream->name, stream->uid, stream->gid, stream->chan->tracefile_size, @@ -1737,6 +1763,16 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( */ pthread_mutex_lock(&relayd->ctrl_sock_mutex); + if (stream->reset_metadata_flag) { + ret = relayd_reset_metadata(&relayd->control_sock, + stream->relayd_stream_id, + stream->metadata_version); + if (ret < 0) { + relayd_hang_up = 1; + goto write_error; + } + stream->reset_metadata_flag = 0; + } ret = write_relayd_metadata_id(splice_pipe[1], stream, relayd, padding); if (ret < 0) { @@ -1760,6 +1796,14 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( /* No streaming, we have to set the len with the full padding */ len += padding; + if (stream->metadata_flag && stream->reset_metadata_flag) { + ret = utils_truncate_stream_file(stream->out_fd, 0); + if (ret < 0) { + ERR("Reset metadata file"); + goto end; + } + stream->reset_metadata_flag = 0; + } /* * Check if we need to change the tracefile before writing the packet. */