X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;ds=sidebyside;f=src%2Fcommon%2Fkernel-consumer%2Fkernel-consumer.c;h=4618ccedcd750d18d651c4ace97477dd620f9843;hb=d3e2ba59faddb31870e2ce29b6a881f7ad5ad883;hp=bfec4d2f0db343c067c288f62f42d426ed88afb6;hpb=56591bac20c0f3b728c95d92702d243de838bdc4;p=lttng-tools.git diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index bfec4d2f0..4618ccedc 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -38,6 +38,8 @@ #include #include #include +#include +#include #include "kernel-consumer.h" @@ -158,7 +160,7 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, ret = utils_create_stream_file(path, stream->name, stream->chan->tracefile_size, stream->tracefile_count_current, - stream->uid, stream->gid); + stream->uid, stream->gid, NULL); if (ret < 0) { ERR("utils_create_stream_file"); goto end_unlock; @@ -248,7 +250,7 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, } read_len = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len, - padded_len - len); + padded_len - len, NULL); /* * We write the padded len in local tracefiles but the data len * when using a relay. Display the error but continue processing @@ -352,7 +354,7 @@ int lttng_kconsumer_snapshot_metadata(uint64_t key, char *path, ret = utils_create_stream_file(path, metadata_stream->name, metadata_stream->chan->tracefile_size, metadata_stream->tracefile_count_current, - metadata_stream->uid, metadata_stream->gid); + metadata_stream->uid, metadata_stream->gid, NULL); if (ret < 0) { goto error; } @@ -440,7 +442,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, /* Session daemon status message are handled in the following call. */ ret = consumer_add_relayd_socket(msg.u.relayd_sock.net_index, msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll, - &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id); + &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id, + msg.u.relayd_sock.relayd_session_id); goto end_nosignal; } case LTTNG_CONSUMER_ADD_CHANNEL: @@ -461,7 +464,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, msg.u.channel.relayd_id, msg.u.channel.output, msg.u.channel.tracefile_size, msg.u.channel.tracefile_count, 0, - msg.u.channel.monitor); + msg.u.channel.monitor, + msg.u.channel.live_timer_interval); if (new_channel == NULL) { lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); goto end_nosignal; @@ -500,6 +504,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } else { ret = consumer_add_channel(new_channel, ctx); } + consumer_timer_live_start(new_channel, msg.u.channel.live_timer_interval); /* If we received an error in add_channel, we need to report it. */ if (ret < 0) { @@ -843,6 +848,61 @@ error_fatal: return -1; } +/* + * Populate index values of a kernel stream. Values are set in big endian order. + * + * Return 0 on success or else a negative value. + */ +static int get_index_values(struct lttng_packet_index *index, int infd) +{ + int ret; + + ret = kernctl_get_timestamp_begin(infd, &index->timestamp_begin); + if (ret < 0) { + PERROR("kernctl_get_timestamp_begin"); + goto error; + } + index->timestamp_begin = htobe64(index->timestamp_begin); + + ret = kernctl_get_timestamp_end(infd, &index->timestamp_end); + if (ret < 0) { + PERROR("kernctl_get_timestamp_end"); + goto error; + } + index->timestamp_end = htobe64(index->timestamp_end); + + ret = kernctl_get_events_discarded(infd, &index->events_discarded); + if (ret < 0) { + PERROR("kernctl_get_events_discarded"); + goto error; + } + index->events_discarded = htobe64(index->events_discarded); + + ret = kernctl_get_content_size(infd, &index->content_size); + if (ret < 0) { + PERROR("kernctl_get_content_size"); + goto error; + } + index->content_size = htobe64(index->content_size); + + ret = kernctl_get_packet_size(infd, &index->packet_size); + if (ret < 0) { + PERROR("kernctl_get_packet_size"); + goto error; + } + index->packet_size = htobe64(index->packet_size); + + ret = kernctl_get_stream_id(infd, &index->stream_id); + if (ret < 0) { + PERROR("kernctl_get_stream_id"); + goto error; + } + index->stream_id = htobe64(index->stream_id); + +error: + return ret; +} + /* * Consume data on a file descriptor and write it on a trace file. */ @@ -850,11 +910,13 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, struct lttng_consumer_local_data *ctx) { unsigned long len, subbuf_size, padding; - int err; + int err, write_index = 1; ssize_t ret = 0; int infd = stream->wait_fd; + struct lttng_packet_index index; DBG("In read_subbuffer (infd : %d)", infd); + /* Get the next subbuffer */ err = kernctl_get_next_subbuf(infd); if (err != 0) { @@ -878,6 +940,15 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, goto end; } + if (!stream->metadata_flag) { + ret = get_index_values(&index, infd); + if (ret < 0) { + goto end; + } + } else { + write_index = 0; + } + switch (stream->chan->output) { case CONSUMER_CHANNEL_SPLICE: /* @@ -890,7 +961,7 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, /* splice the subbuffer to the tracefile */ ret = lttng_consumer_on_read_subbuffer_splice(ctx, stream, subbuf_size, - padding); + padding, &index); /* * XXX: Splice does not support network streaming so the return value * is simply checked against subbuf_size and not like the mmap() op. @@ -902,6 +973,7 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, */ ERR("Error splicing to tracefile (ret: %zd != len: %lu)", ret, subbuf_size); + write_index = 0; } break; case CONSUMER_CHANNEL_MMAP: @@ -920,7 +992,7 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, /* write the subbuffer to the tracefile */ ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, subbuf_size, - padding); + padding, &index); /* * The mmap operation should write subbuf_size amount of data when * network streaming or the full padding (len) size when we are _not_ @@ -935,6 +1007,7 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, ERR("Error writing to tracefile " "(ret: %zd != len: %lu != subbuf_size: %lu)", ret, len, subbuf_size); + write_index = 0; } break; default: @@ -954,6 +1027,16 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, goto end; } + /* Write index if needed. */ + if (!write_index) { + goto end; + } + + err = consumer_stream_write_index(stream, &index); + if (err < 0) { + goto end; + } + end: return ret; } @@ -971,12 +1054,23 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream) if (stream->net_seq_idx == (uint64_t) -1ULL && stream->chan->monitor) { ret = utils_create_stream_file(stream->chan->pathname, stream->name, stream->chan->tracefile_size, stream->tracefile_count_current, - stream->uid, stream->gid); + stream->uid, stream->gid, NULL); if (ret < 0) { goto error; } stream->out_fd = ret; stream->tracefile_size_current = 0; + + if (!stream->metadata_flag) { + ret = index_create_file(stream->chan->pathname, + stream->name, stream->uid, stream->gid, + stream->chan->tracefile_size, + stream->tracefile_count_current); + if (ret < 0) { + goto error; + } + stream->index_fd = ret; + } } if (stream->output == LTTNG_EVENT_MMAP) {