X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fkernel-consumer%2Fkernel-consumer.c;h=d02e8502d99914c0acf5652d9835bd71f20b540e;hb=94d4914075c61cd1ee2ec00d8b61eacff105fc47;hp=09ccda329526b69cf1e5aa147ed32f4dc8a67e81;hpb=309167d2a6f59d0c8cbf64eb23ba912cdea76a34;p=lttng-tools.git diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 09ccda329..d02e8502d 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -39,6 +39,7 @@ #include #include #include +#include #include "kernel-consumer.h" @@ -441,7 +442,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, /* Session daemon status message are handled in the following call. */ ret = consumer_add_relayd_socket(msg.u.relayd_sock.net_index, msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll, - &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id); + &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id, + msg.u.relayd_sock.relayd_session_id); goto end_nosignal; } case LTTNG_CONSUMER_ADD_CHANNEL: @@ -462,7 +464,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, msg.u.channel.relayd_id, msg.u.channel.output, msg.u.channel.tracefile_size, msg.u.channel.tracefile_count, 0, - msg.u.channel.monitor); + msg.u.channel.monitor, + msg.u.channel.live_timer_interval); if (new_channel == NULL) { lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); goto end_nosignal; @@ -501,6 +504,10 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } else { ret = consumer_add_channel(new_channel, ctx); } + if (CONSUMER_CHANNEL_TYPE_DATA) { + consumer_timer_live_start(new_channel, + msg.u.channel.live_timer_interval); + } /* If we received an error in add_channel, we need to report it. */ if (ret < 0) { @@ -898,6 +905,42 @@ static int get_index_values(struct lttng_packet_index *index, int infd) error: return ret; } +/* + * Sync metadata meaning request them to the session daemon and snapshot to the + * metadata thread can consumer them. + * + * Metadata stream lock MUST be acquired. + * + * Return 0 if new metadatda is available, EAGAIN if the metadata stream + * is empty or a negative value on error. + */ +int lttng_kconsumer_sync_metadata(struct lttng_consumer_stream *metadata) +{ + int ret; + + assert(metadata); + + ret = kernctl_buffer_flush(metadata->wait_fd); + if (ret < 0) { + ERR("Failed to flush kernel stream"); + goto end; + } + + ret = kernctl_snapshot(metadata->wait_fd); + if (ret < 0) { + if (errno != EAGAIN) { + ERR("Sync metadata, taking kernel snapshot failed."); + goto end; + } + DBG("Sync metadata, no new kernel metadata"); + /* No new metadata, exit. */ + ret = ENODATA; + goto end; + } + +end: + return ret; +} /* * Consume data on a file descriptor and write it on a trace file. @@ -906,18 +949,13 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, struct lttng_consumer_local_data *ctx) { unsigned long len, subbuf_size, padding; - int err, write_index = 0; + int err, write_index = 1; ssize_t ret = 0; int infd = stream->wait_fd; struct lttng_packet_index index; DBG("In read_subbuffer (infd : %d)", infd); - /* Indicate that for this stream we have to write the index. */ - if (stream->index_fd >= 0) { - write_index = 1; - } - /* Get the next subbuffer */ err = kernctl_get_next_subbuf(infd); if (err != 0) { @@ -941,11 +979,13 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, goto end; } - if (!stream->metadata_flag && write_index) { + if (!stream->metadata_flag) { ret = get_index_values(&index, infd); if (ret < 0) { goto end; } + } else { + write_index = 0; } switch (stream->chan->output) { @@ -1027,14 +1067,25 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, } /* Write index if needed. */ - if (write_index) { - err = index_write(stream->index_fd, &index, sizeof(index)); + if (!write_index) { + goto end; + } + + if (stream->chan->live_timer_interval && !stream->metadata_flag) { + /* + * In live, block until all the metadata is sent. + */ + err = consumer_stream_sync_metadata(ctx, stream->session_id); if (err < 0) { - ret = -1; goto end; } } + err = consumer_stream_write_index(stream, &index); + if (err < 0) { + goto end; + } + end: return ret; }