X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=0661f1264fa0e7fa136a9b4772f302558287ab56;hp=59207da2f2f99f03a27b8a014f682025b03e39f7;hb=ecc48a904cc7c419fd1400afaa9ccb93be490cdd;hpb=a1ae300ff3a139f4d4f593eddf444b0fca52280f diff --git a/src/common/consumer.c b/src/common/consumer.c index 59207da2f..0661f1264 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -33,6 +33,7 @@ #include #include #include +#include #include #include #include @@ -498,6 +499,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, stream->key = stream_key; stream->out_fd = -1; stream->out_fd_offset = 0; + stream->output_written = 0; stream->state = state; stream->uid = uid; stream->gid = gid; @@ -505,6 +507,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, stream->session_id = session_id; stream->monitor = monitor; stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE; + stream->index_fd = -1; pthread_mutex_init(&stream->lock, NULL); /* If channel is the metadata, flag this stream as metadata. */ @@ -836,7 +839,8 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, uint64_t tracefile_size, uint64_t tracefile_count, uint64_t session_id_per_pid, - unsigned int monitor) + unsigned int monitor, + unsigned int live_timer_interval) { struct lttng_consumer_channel *channel; @@ -857,6 +861,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, channel->tracefile_size = tracefile_size; channel->tracefile_count = tracefile_count; channel->monitor = monitor; + channel->live_timer_interval = live_timer_interval; pthread_mutex_init(&channel->lock, NULL); pthread_mutex_init(&channel->timer_lock, NULL); @@ -1316,7 +1321,8 @@ end: ssize_t lttng_consumer_on_read_subbuffer_mmap( struct lttng_consumer_local_data *ctx, struct lttng_consumer_stream *stream, unsigned long len, - unsigned long padding) + unsigned long padding, + struct lttng_packet_index *index) { unsigned long mmap_offset; void *mmap_base; @@ -1423,18 +1429,34 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( ret = utils_rotate_stream_file(stream->chan->pathname, stream->name, stream->chan->tracefile_size, stream->chan->tracefile_count, stream->uid, stream->gid, - stream->out_fd, &(stream->tracefile_count_current)); + stream->out_fd, &(stream->tracefile_count_current), + &stream->out_fd); if (ret < 0) { ERR("Rotating output file"); goto end; } - outfd = stream->out_fd = ret; + outfd = stream->out_fd; + + if (stream->index_fd >= 0) { + ret = index_create_file(stream->chan->pathname, + stream->name, stream->uid, stream->gid, + stream->chan->tracefile_size, + stream->tracefile_count_current); + if (ret < 0) { + goto end; + } + stream->index_fd = ret; + } + /* Reset current size because we just perform a rotation. */ stream->tracefile_size_current = 0; stream->out_fd_offset = 0; orig_offset = 0; } stream->tracefile_size_current += len; + if (index) { + index->offset = htobe64(stream->out_fd_offset); + } } while (len > 0) { @@ -1475,6 +1497,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( SYNC_FILE_RANGE_WRITE); stream->out_fd_offset += ret; } + stream->output_written += ret; written += ret; } lttng_consumer_sync_trace_file(stream, orig_offset); @@ -1508,7 +1531,8 @@ end: ssize_t lttng_consumer_on_read_subbuffer_splice( struct lttng_consumer_local_data *ctx, struct lttng_consumer_stream *stream, unsigned long len, - unsigned long padding) + unsigned long padding, + struct lttng_packet_index *index) { ssize_t ret = 0, written = 0, ret_splice = 0; loff_t offset = 0; @@ -1608,18 +1632,32 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( ret = utils_rotate_stream_file(stream->chan->pathname, stream->name, stream->chan->tracefile_size, stream->chan->tracefile_count, stream->uid, stream->gid, - stream->out_fd, &(stream->tracefile_count_current)); + stream->out_fd, &(stream->tracefile_count_current), + &stream->out_fd); if (ret < 0) { ERR("Rotating output file"); goto end; } - outfd = stream->out_fd = ret; + outfd = stream->out_fd; + + if (stream->index_fd >= 0) { + ret = index_create_file(stream->chan->pathname, + stream->name, stream->uid, stream->gid, + stream->chan->tracefile_size, + stream->tracefile_count_current); + if (ret < 0) { + goto end; + } + stream->index_fd = ret; + } + /* Reset current size because we just perform a rotation. */ stream->tracefile_size_current = 0; stream->out_fd_offset = 0; orig_offset = 0; } stream->tracefile_size_current += len; + index->offset = htobe64(stream->out_fd_offset); } while (len > 0) { @@ -1688,6 +1726,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( SYNC_FILE_RANGE_WRITE); stream->out_fd_offset += ret_splice; } + stream->output_written += ret_splice; written += ret_splice; } lttng_consumer_sync_trace_file(stream, orig_offset); @@ -2194,11 +2233,6 @@ restart: revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); - /* Just don't waste time if no returned events for the fd */ - if (!revents) { - continue; - } - if (pollfd == lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)) { if (revents & (LPOLLERR | LPOLLHUP )) { DBG("Metadata thread pipe hung up"); @@ -3400,6 +3434,15 @@ int consumer_data_pending(uint64_t id) */ ret = cds_lfht_is_node_deleted(&stream->node.node); if (!ret) { + /* + * An empty output file is not valid. We need at least one packet + * generated per stream, even if it contains no event, so it + * contains at least one packet header. + */ + if (stream->output_written == 0) { + pthread_mutex_unlock(&stream->lock); + goto data_pending; + } /* Check the stream if there is data in the buffers. */ ret = data_pending(stream); if (ret == 1) {