X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer-stream.c;h=9f134e141b549f4e478faadaf15a64f58b83fc71;hb=a4eb26f0f09cd6d031166329a7d5d7b5d40408a6;hp=47f2eb2171fa2883d84ce8a7dfe8830fa10134fb;hpb=56c86e0a3f16a2f7fd3e473dd8c9d753a13e86ee;p=lttng-tools.git diff --git a/src/common/consumer/consumer-stream.c b/src/common/consumer/consumer-stream.c index 47f2eb217..9f134e141 100644 --- a/src/common/consumer/consumer-stream.c +++ b/src/common/consumer/consumer-stream.c @@ -1,5 +1,5 @@ /* - * Copyright (C) 2011 Julien Desfossez + * Copyright (C) 2011 EfficiOS Inc. * Copyright (C) 2011 Mathieu Desnoyers * Copyright (C) 2013 David Goulet * @@ -14,14 +14,15 @@ #include #include +#include +#include +#include #include #include +#include #include #include #include -#include -#include -#include #include "consumer-stream.h" @@ -51,6 +52,12 @@ static void consumer_stream_data_unlock_all(struct lttng_consumer_stream *stream pthread_mutex_unlock(&stream->chan->lock); } +static void consumer_stream_data_assert_locked_all(struct lttng_consumer_stream *stream) +{ + ASSERT_LOCKED(stream->lock); + ASSERT_LOCKED(stream->chan->lock); +} + static void consumer_stream_metadata_lock_all(struct lttng_consumer_stream *stream) { consumer_stream_data_lock_all(stream); @@ -63,6 +70,12 @@ static void consumer_stream_metadata_unlock_all(struct lttng_consumer_stream *st consumer_stream_data_unlock_all(stream); } +static void consumer_stream_metadata_assert_locked_all(struct lttng_consumer_stream *stream) +{ + ASSERT_LOCKED(stream->metadata_rdv_lock); + consumer_stream_data_assert_locked_all(stream); +} + /* Only used for data streams. */ static int consumer_stream_update_stats(struct lttng_consumer_stream *stream, const struct stream_subbuffer *subbuf) @@ -403,6 +416,7 @@ static int consumer_stream_sync_metadata_index( const struct stream_subbuffer *subbuffer, struct lttng_consumer_local_data *ctx) { + bool missed_metadata_flush; int ret; /* Block until all the metadata is sent. */ @@ -415,18 +429,34 @@ static int consumer_stream_sync_metadata_index( pthread_mutex_lock(&stream->metadata_timer_lock); stream->waiting_on_metadata = false; - if (stream->missed_metadata_flush) { + missed_metadata_flush = stream->missed_metadata_flush; + if (missed_metadata_flush) { stream->missed_metadata_flush = false; - pthread_mutex_unlock(&stream->metadata_timer_lock); - (void) stream->read_subbuffer_ops.send_live_beacon(stream); - } else { - pthread_mutex_unlock(&stream->metadata_timer_lock); } + pthread_mutex_unlock(&stream->metadata_timer_lock); if (ret < 0) { goto end; } ret = consumer_stream_send_index(stream, subbuffer, ctx); + /* + * Send the live inactivity beacon to handle the situation where + * the live timer is prevented from sampling this stream + * because the stream lock was being held while this stream is + * waiting on metadata. This ensures live viewer progress in the + * unlikely scenario where a live timer would be prevented from + * locking a stream lock repeatedly due to a steady flow of + * incoming metadata, for a stream which is mostly inactive. + * + * It is important to send the inactivity beacon packet to + * relayd _after_ sending the index associated with the data + * that was just sent, otherwise this can cause live viewers to + * observe timestamps going backwards between an inactivity + * beacon and a following trace packet. + */ + if (missed_metadata_flush) { + (void) stream->read_subbuffer_ops.send_live_beacon(stream); + } end: return ret; } @@ -479,13 +509,14 @@ struct lttng_consumer_stream *consumer_stream_create( goto end; } + rcu_read_lock(); + if (trace_chunk && !lttng_trace_chunk_get(trace_chunk)) { ERR("Failed to acquire trace chunk reference during the creation of a stream"); ret = -1; goto error; } - rcu_read_lock(); stream->chan = channel; stream->key = stream_key; stream->trace_chunk = trace_chunk; @@ -558,12 +589,16 @@ struct lttng_consumer_stream *consumer_stream_create( consumer_stream_metadata_lock_all; stream->read_subbuffer_ops.unlock = consumer_stream_metadata_unlock_all; + stream->read_subbuffer_ops.assert_locked = + consumer_stream_metadata_assert_locked_all; stream->read_subbuffer_ops.pre_consume_subbuffer = metadata_stream_check_version; } else { stream->read_subbuffer_ops.lock = consumer_stream_data_lock_all; stream->read_subbuffer_ops.unlock = consumer_stream_data_unlock_all; + stream->read_subbuffer_ops.assert_locked = + consumer_stream_data_assert_locked_all; stream->read_subbuffer_ops.pre_consume_subbuffer = consumer_stream_update_stats; if (channel->is_live) {