X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fust-consumer%2Fust-consumer.c;h=6d6690a32161a4cbc24e05157f31407fe9c1df74;hp=1af9840cd5184b0cfebad0bf680257fbbadac9a5;hb=f5ba75b4f0c0b44092c76bc931b25b24a2e62718;hpb=6f9449c22eef59294cf1e1dc3610a5cbf14baec0 diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 1af9840cd..6d6690a32 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -37,6 +37,7 @@ #include #include #include +#include #include "ust-consumer.h" @@ -1230,7 +1231,7 @@ static int snapshot_channel(struct lttng_consumer_channel *channel, subbuf_view = lttng_buffer_view_init( subbuf_addr, 0, padded_len); - read_len = lttng_consumer_on_read_subbuffer_mmap(ctx, + read_len = lttng_consumer_on_read_subbuffer_mmap( stream, &subbuf_view, padded_len - len); if (use_relayd) { if (read_len != len) { @@ -2464,7 +2465,7 @@ int commit_one_metadata_packet(struct lttng_consumer_stream *stream) assert(write_len != 0); if (write_len < 0) { ERR("Writing one metadata packet"); - ret = -1; + ret = write_len; goto end; } stream->ust_metadata_pushed += write_len; @@ -2810,28 +2811,88 @@ static int get_next_subbuffer_metadata(struct lttng_consumer_stream *stream, struct stream_subbuffer *subbuffer) { int ret; + bool cache_empty; + bool got_subbuffer; + bool coherent; + bool buffer_empty; + unsigned long consumed_pos, produced_pos; - ret = ustctl_get_next_subbuf(stream->ustream); - if (ret) { - ret = commit_one_metadata_packet(stream); - if (ret < 0) { - goto end; - } else if (ret == 0) { - /* Not an error, the cache is empty. */ - ret = -ENODATA; - goto end; + do { + ret = ustctl_get_next_subbuf(stream->ustream); + if (ret == 0) { + got_subbuffer = true; + } else { + got_subbuffer = false; + if (ret != -EAGAIN) { + /* Fatal error. */ + goto end; + } } - ret = ustctl_get_next_subbuf(stream->ustream); - if (ret) { - goto end; + /* + * Determine if the cache is empty and ensure that a sub-buffer + * is made available if the cache is not empty. + */ + if (!got_subbuffer) { + ret = commit_one_metadata_packet(stream); + if (ret < 0 && ret != -ENOBUFS) { + goto end; + } else if (ret == 0) { + /* Not an error, the cache is empty. */ + cache_empty = true; + ret = -ENODATA; + goto end; + } else { + cache_empty = false; + } + } else { + pthread_mutex_lock(&stream->chan->metadata_cache->lock); + cache_empty = stream->chan->metadata_cache->max_offset == + stream->ust_metadata_pushed; + pthread_mutex_unlock(&stream->chan->metadata_cache->lock); } - } + } while (!got_subbuffer); + /* Populate sub-buffer infos and view. */ ret = get_next_subbuffer_common(stream, subbuffer); if (ret) { goto end; } + + ret = lttng_ustconsumer_sample_snapshot_positions(stream); + if (ret < 0) { + /* + * -EAGAIN is not expected since we got a sub-buffer and haven't + * pushed the consumption position yet (on put_next). + */ + PERROR("Failed to take a snapshot of metadata buffer positions"); + goto end; + } + + ret = lttng_ustconsumer_get_consumed_snapshot(stream, &consumed_pos); + if (ret) { + PERROR("Failed to get metadata consumed position"); + goto end; + } + + ret = lttng_ustconsumer_get_produced_snapshot(stream, &produced_pos); + if (ret) { + PERROR("Failed to get metadata produced position"); + goto end; + } + + /* Last sub-buffer of the ring buffer ? */ + buffer_empty = (consumed_pos + stream->max_sb_size) == produced_pos; + + /* + * The sessiond registry lock ensures that coherent units of metadata + * are pushed to the consumer daemon at once. Hence, if a sub-buffer is + * acquired, the cache is empty, and it is the only available sub-buffer + * available, it is safe to assume that it is "coherent". + */ + coherent = got_subbuffer && cache_empty && buffer_empty; + + LTTNG_OPTIONAL_SET(&subbuffer->info.metadata.coherent, coherent); end: return ret; } @@ -2851,9 +2912,11 @@ static int signal_metadata(struct lttng_consumer_stream *stream, return pthread_cond_broadcast(&stream->metadata_rdv) ? -errno : 0; } -static void lttng_ustconsumer_set_stream_ops( +static int lttng_ustconsumer_set_stream_ops( struct lttng_consumer_stream *stream) { + int ret = 0; + stream->read_subbuffer_ops.on_wake_up = consumer_stream_ust_on_wake_up; if (stream->metadata_flag) { stream->read_subbuffer_ops.get_next_subbuffer = @@ -2862,7 +2925,14 @@ static void lttng_ustconsumer_set_stream_ops( extract_metadata_subbuffer_info; stream->read_subbuffer_ops.reset_metadata = metadata_stream_reset_cache; - stream->read_subbuffer_ops.on_sleep = signal_metadata; + if (stream->chan->is_live) { + stream->read_subbuffer_ops.on_sleep = signal_metadata; + ret = consumer_stream_enable_metadata_bucketization( + stream); + if (ret) { + goto end; + } + } } else { stream->read_subbuffer_ops.get_next_subbuffer = get_next_subbuffer; @@ -2876,6 +2946,8 @@ static void lttng_ustconsumer_set_stream_ops( } stream->read_subbuffer_ops.put_next_subbuffer = put_next_subbuffer; +end: + return ret; } /*