X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer.c;h=cb05a1eb411c9f522b37efaf8600ecdca930352f;hb=862d3a3b75a25b29eb589fb9b3efc7e19d1c1bec;hp=2897fb855dde0844be2984df3278818be910eb56;hpb=c8fea79c745d42ea8143b7020ae11b4fc2da0d8a;p=lttng-tools.git diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index 2897fb855..cb05a1eb4 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -571,6 +571,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, stream->monitor = monitor; stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE; stream->index_fd = -1; + stream->last_sequence_number = -1ULL; pthread_mutex_init(&stream->lock, NULL); pthread_mutex_init(&stream->metadata_timer_lock, NULL); @@ -1555,6 +1556,16 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( if (stream->metadata_flag) { /* Metadata requires the control socket. */ pthread_mutex_lock(&relayd->ctrl_sock_mutex); + if (stream->reset_metadata_flag) { + ret = relayd_reset_metadata(&relayd->control_sock, + stream->relayd_stream_id, + stream->metadata_version); + if (ret < 0) { + relayd_hang_up = 1; + goto write_error; + } + stream->reset_metadata_flag = 0; + } netlen += sizeof(struct lttcomm_relayd_metadata_payload); } @@ -1578,6 +1589,15 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( /* No streaming, we have to set the len with the full padding */ len += padding; + if (stream->metadata_flag && stream->reset_metadata_flag) { + ret = utils_truncate_stream_file(stream->out_fd, 0); + if (ret < 0) { + ERR("Reset metadata file"); + goto end; + } + stream->reset_metadata_flag = 0; + } + /* * Check if we need to change the tracefile before writing the packet. */ @@ -1596,6 +1616,12 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( outfd = stream->out_fd; if (stream->index_fd >= 0) { + ret = close(stream->index_fd); + if (ret < 0) { + PERROR("Closing index"); + goto end; + } + stream->index_fd = -1; ret = index_create_file(stream->chan->pathname, stream->name, stream->uid, stream->gid, stream->chan->tracefile_size, @@ -1737,6 +1763,16 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( */ pthread_mutex_lock(&relayd->ctrl_sock_mutex); + if (stream->reset_metadata_flag) { + ret = relayd_reset_metadata(&relayd->control_sock, + stream->relayd_stream_id, + stream->metadata_version); + if (ret < 0) { + relayd_hang_up = 1; + goto write_error; + } + stream->reset_metadata_flag = 0; + } ret = write_relayd_metadata_id(splice_pipe[1], stream, relayd, padding); if (ret < 0) { @@ -1760,6 +1796,14 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( /* No streaming, we have to set the len with the full padding */ len += padding; + if (stream->metadata_flag && stream->reset_metadata_flag) { + ret = utils_truncate_stream_file(stream->out_fd, 0); + if (ret < 0) { + ERR("Reset metadata file"); + goto end; + } + stream->reset_metadata_flag = 0; + } /* * Check if we need to change the tracefile before writing the packet. */ @@ -1779,6 +1823,12 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( outfd = stream->out_fd; if (stream->index_fd >= 0) { + ret = close(stream->index_fd); + if (ret < 0) { + PERROR("Closing index"); + goto end; + } + stream->index_fd = -1; ret = index_create_file(stream->chan->pathname, stream->name, stream->uid, stream->gid, stream->chan->tracefile_size, @@ -2211,10 +2261,10 @@ restart: DBG("Metadata poll return from wait with %d fd(s)", LTTNG_POLL_GETNB(&events)); health_poll_exit(); - DBG("Metadata event catched in thread"); + DBG("Metadata event caught in thread"); if (ret < 0) { if (errno == EINTR) { - ERR("Poll EINTR catched"); + ERR("Poll EINTR caught"); goto restart; } if (LTTNG_POLL_GETNB(&events) == 0) { @@ -2794,10 +2844,10 @@ restart: DBG("Channel poll return from wait with %d fd(s)", LTTNG_POLL_GETNB(&events)); health_poll_exit(); - DBG("Channel event catched in thread"); + DBG("Channel event caught in thread"); if (ret < 0) { if (errno == EINTR) { - ERR("Poll EINTR catched"); + ERR("Poll EINTR caught"); goto restart; } if (LTTNG_POLL_GETNB(&events) == 0) {