X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=191367cd0cd55c55be523cdf139182d4dbe8feca;hb=7ce3675685dbbc7be9536eb9c2b5ff8d677dc0b5;hp=4b657f33249f7a4f777b7b3ff784b379b15f5878;hpb=6a00837f8cb0431a2ad90974d67fae138ea97dd5;p=lttng-tools.git diff --git a/src/common/consumer.c b/src/common/consumer.c index 4b657f332..191367cd0 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -33,6 +33,7 @@ #include #include #include +#include #include #include #include @@ -498,6 +499,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, stream->key = stream_key; stream->out_fd = -1; stream->out_fd_offset = 0; + stream->output_written = 0; stream->state = state; stream->uid = uid; stream->gid = gid; @@ -505,6 +507,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, stream->session_id = session_id; stream->monitor = monitor; stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE; + stream->index_fd = -1; pthread_mutex_init(&stream->lock, NULL); /* If channel is the metadata, flag this stream as metadata. */ @@ -726,6 +729,7 @@ int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, if (ret < 0) { goto end; } + uatomic_inc(&relayd->refcount); stream->sent_to_relayd = 1; } else { @@ -836,7 +840,8 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, uint64_t tracefile_size, uint64_t tracefile_count, uint64_t session_id_per_pid, - unsigned int monitor) + unsigned int monitor, + unsigned int live_timer_interval) { struct lttng_consumer_channel *channel; @@ -857,6 +862,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, channel->tracefile_size = tracefile_size; channel->tracefile_count = tracefile_count; channel->monitor = monitor; + channel->live_timer_interval = live_timer_interval; pthread_mutex_init(&channel->lock, NULL); pthread_mutex_init(&channel->timer_lock, NULL); @@ -1316,7 +1322,8 @@ end: ssize_t lttng_consumer_on_read_subbuffer_mmap( struct lttng_consumer_local_data *ctx, struct lttng_consumer_stream *stream, unsigned long len, - unsigned long padding) + unsigned long padding, + struct lttng_packet_index *index) { unsigned long mmap_offset; void *mmap_base; @@ -1334,6 +1341,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( if (stream->net_seq_idx != (uint64_t) -1ULL) { relayd = consumer_find_relayd(stream->net_seq_idx); if (relayd == NULL) { + ret = -EPIPE; goto end; } } @@ -1343,28 +1351,31 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( case LTTNG_CONSUMER_KERNEL: mmap_base = stream->mmap_base; ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset); + if (ret != 0) { + PERROR("tracer ctl get_mmap_read_offset"); + written = -errno; + goto end; + } break; case LTTNG_CONSUMER32_UST: case LTTNG_CONSUMER64_UST: mmap_base = lttng_ustctl_get_mmap_base(stream); if (!mmap_base) { ERR("read mmap get mmap base for stream %s", stream->name); - written = -1; + written = -EPERM; goto end; } ret = lttng_ustctl_get_mmap_read_offset(stream, &mmap_offset); - + if (ret != 0) { + PERROR("tracer ctl get_mmap_read_offset"); + written = ret; + goto end; + } break; default: ERR("Unknown consumer_data type"); assert(0); } - if (ret != 0) { - errno = -ret; - PERROR("tracer ctl get_mmap_read_offset"); - written = ret; - goto end; - } /* Handle stream on the relayd if the output is on the network */ if (relayd) { @@ -1419,16 +1430,34 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( ret = utils_rotate_stream_file(stream->chan->pathname, stream->name, stream->chan->tracefile_size, stream->chan->tracefile_count, stream->uid, stream->gid, - stream->out_fd, &(stream->tracefile_count_current)); + stream->out_fd, &(stream->tracefile_count_current), + &stream->out_fd); if (ret < 0) { ERR("Rotating output file"); goto end; } - outfd = stream->out_fd = ret; + outfd = stream->out_fd; + + if (stream->index_fd >= 0) { + ret = index_create_file(stream->chan->pathname, + stream->name, stream->uid, stream->gid, + stream->chan->tracefile_size, + stream->tracefile_count_current); + if (ret < 0) { + goto end; + } + stream->index_fd = ret; + } + /* Reset current size because we just perform a rotation. */ stream->tracefile_size_current = 0; + stream->out_fd_offset = 0; + orig_offset = 0; } stream->tracefile_size_current += len; + if (index) { + index->offset = htobe64(stream->out_fd_offset); + } } while (len > 0) { @@ -1445,7 +1474,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( */ DBG("Error in file write mmap"); if (written == 0) { - written = ret; + written = -errno; } /* Socket operation failed. We consider the relayd dead */ if (errno == EPIPE || errno == EINVAL) { @@ -1469,6 +1498,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( SYNC_FILE_RANGE_WRITE); stream->out_fd_offset += ret; } + stream->output_written += ret; written += ret; } lttng_consumer_sync_trace_file(stream, orig_offset); @@ -1502,7 +1532,8 @@ end: ssize_t lttng_consumer_on_read_subbuffer_splice( struct lttng_consumer_local_data *ctx, struct lttng_consumer_stream *stream, unsigned long len, - unsigned long padding) + unsigned long padding, + struct lttng_packet_index *index) { ssize_t ret = 0, written = 0, ret_splice = 0; loff_t offset = 0; @@ -1533,6 +1564,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( if (stream->net_seq_idx != (uint64_t) -1ULL) { relayd = consumer_find_relayd(stream->net_seq_idx); if (relayd == NULL) { + ret = -EPIPE; goto end; } } @@ -1601,16 +1633,32 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( ret = utils_rotate_stream_file(stream->chan->pathname, stream->name, stream->chan->tracefile_size, stream->chan->tracefile_count, stream->uid, stream->gid, - stream->out_fd, &(stream->tracefile_count_current)); + stream->out_fd, &(stream->tracefile_count_current), + &stream->out_fd); if (ret < 0) { ERR("Rotating output file"); goto end; } - outfd = stream->out_fd = ret; + outfd = stream->out_fd; + + if (stream->index_fd >= 0) { + ret = index_create_file(stream->chan->pathname, + stream->name, stream->uid, stream->gid, + stream->chan->tracefile_size, + stream->tracefile_count_current); + if (ret < 0) { + goto end; + } + stream->index_fd = ret; + } + /* Reset current size because we just perform a rotation. */ stream->tracefile_size_current = 0; + stream->out_fd_offset = 0; + orig_offset = 0; } stream->tracefile_size_current += len; + index->offset = htobe64(stream->out_fd_offset); } while (len > 0) { @@ -1679,6 +1727,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( SYNC_FILE_RANGE_WRITE); stream->out_fd_offset += ret_splice; } + stream->output_written += ret_splice; written += ret_splice; } lttng_consumer_sync_trace_file(stream, orig_offset); @@ -2185,11 +2234,6 @@ restart: revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); - /* Just don't waste time if no returned events for the fd */ - if (!revents) { - continue; - } - if (pollfd == lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)) { if (revents & (LPOLLERR | LPOLLHUP )) { DBG("Metadata thread pipe hung up"); @@ -3391,6 +3435,15 @@ int consumer_data_pending(uint64_t id) */ ret = cds_lfht_is_node_deleted(&stream->node.node); if (!ret) { + /* + * An empty output file is not valid. We need at least one packet + * generated per stream, even if it contains no event, so it + * contains at least one packet header. + */ + if (stream->output_written == 0) { + pthread_mutex_unlock(&stream->lock); + goto data_pending; + } /* Check the stream if there is data in the buffers. */ ret = data_pending(stream); if (ret == 1) {