X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer-stream.cpp;h=bb0ec0a2436db1a9fb042b29b1e53b622c2eb60e;hb=319dcddc7409961e156af76666fe70d31baec55a;hp=fbaf4aef502e69ea3d8937863df6aa68e0ccbdee;hpb=97535efaa975ca52bf02c2d5e76351bfd2e3defa;p=lttng-tools.git diff --git a/src/common/consumer/consumer-stream.cpp b/src/common/consumer/consumer-stream.cpp index fbaf4aef5..bb0ec0a24 100644 --- a/src/common/consumer/consumer-stream.cpp +++ b/src/common/consumer/consumer-stream.cpp @@ -1,5 +1,5 @@ /* - * Copyright (C) 2011 Julien Desfossez + * Copyright (C) 2011 EfficiOS Inc. * Copyright (C) 2011 Mathieu Desnoyers * Copyright (C) 2013 David Goulet * @@ -12,18 +12,22 @@ #include #include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "consumer-stream.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "consumer-stream.hpp" /* * RCU call to free stream. MUST only be used with call_rcu(). @@ -31,9 +35,9 @@ static void free_stream_rcu(struct rcu_head *head) { struct lttng_ht_node_u64 *node = - caa_container_of(head, struct lttng_ht_node_u64, head); + lttng::utils::container_of(head, <tng_ht_node_u64::head); struct lttng_consumer_stream *stream = - caa_container_of(node, struct lttng_consumer_stream, node); + lttng::utils::container_of(node, <tng_consumer_stream::node); pthread_mutex_destroy(&stream->lock); free(stream); @@ -51,6 +55,12 @@ static void consumer_stream_data_unlock_all(struct lttng_consumer_stream *stream pthread_mutex_unlock(&stream->chan->lock); } +static void consumer_stream_data_assert_locked_all(struct lttng_consumer_stream *stream) +{ + ASSERT_LOCKED(stream->lock); + ASSERT_LOCKED(stream->chan->lock); +} + static void consumer_stream_metadata_lock_all(struct lttng_consumer_stream *stream) { consumer_stream_data_lock_all(stream); @@ -63,6 +73,12 @@ static void consumer_stream_metadata_unlock_all(struct lttng_consumer_stream *st consumer_stream_data_unlock_all(stream); } +static void consumer_stream_metadata_assert_locked_all(struct lttng_consumer_stream *stream) +{ + ASSERT_LOCKED(stream->metadata_rdv_lock); + consumer_stream_data_assert_locked_all(stream); +} + /* Only used for data streams. */ static int consumer_stream_update_stats(struct lttng_consumer_stream *stream, const struct stream_subbuffer *subbuf) @@ -144,7 +160,7 @@ void ctf_packet_index_populate(struct ctf_packet_index *index, } static ssize_t consumer_stream_consume_mmap( - struct lttng_consumer_local_data *ctx, + struct lttng_consumer_local_data *ctx __attribute__((unused)), struct lttng_consumer_stream *stream, const struct stream_subbuffer *subbuffer) { @@ -215,7 +231,7 @@ static ssize_t consumer_stream_consume_splice( static int consumer_stream_send_index( struct lttng_consumer_stream *stream, const struct stream_subbuffer *subbuffer, - struct lttng_consumer_local_data *ctx) + struct lttng_consumer_local_data *ctx __attribute__((unused))) { off_t packet_offset = 0; struct ctf_packet_index index = {}; @@ -403,6 +419,7 @@ static int consumer_stream_sync_metadata_index( const struct stream_subbuffer *subbuffer, struct lttng_consumer_local_data *ctx) { + bool missed_metadata_flush; int ret; /* Block until all the metadata is sent. */ @@ -415,18 +432,34 @@ static int consumer_stream_sync_metadata_index( pthread_mutex_lock(&stream->metadata_timer_lock); stream->waiting_on_metadata = false; - if (stream->missed_metadata_flush) { + missed_metadata_flush = stream->missed_metadata_flush; + if (missed_metadata_flush) { stream->missed_metadata_flush = false; - pthread_mutex_unlock(&stream->metadata_timer_lock); - (void) stream->read_subbuffer_ops.send_live_beacon(stream); - } else { - pthread_mutex_unlock(&stream->metadata_timer_lock); } + pthread_mutex_unlock(&stream->metadata_timer_lock); if (ret < 0) { goto end; } ret = consumer_stream_send_index(stream, subbuffer, ctx); + /* + * Send the live inactivity beacon to handle the situation where + * the live timer is prevented from sampling this stream + * because the stream lock was being held while this stream is + * waiting on metadata. This ensures live viewer progress in the + * unlikely scenario where a live timer would be prevented from + * locking a stream lock repeatedly due to a steady flow of + * incoming metadata, for a stream which is mostly inactive. + * + * It is important to send the inactivity beacon packet to + * relayd _after_ sending the index associated with the data + * that was just sent, otherwise this can cause live viewers to + * observe timestamps going backwards between an inactivity + * beacon and a following trace packet. + */ + if (missed_metadata_flush) { + (void) stream->read_subbuffer_ops.send_live_beacon(stream); + } end: return ret; } @@ -558,8 +591,8 @@ end: */ static int post_consume_open_new_packet(struct lttng_consumer_stream *stream, - const struct stream_subbuffer *subbuffer, - struct lttng_consumer_local_data *ctx) + const struct stream_subbuffer *subbuffer __attribute__((unused)), + struct lttng_consumer_local_data *ctx __attribute__((unused))) { int ret = 0; @@ -621,7 +654,7 @@ struct lttng_consumer_stream *consumer_stream_create( int ret; struct lttng_consumer_stream *stream; - stream = (lttng_consumer_stream *) zmalloc(sizeof(*stream)); + stream = zmalloc(); if (stream == NULL) { PERROR("malloc struct lttng_consumer_stream"); ret = -ENOMEM; @@ -636,6 +669,7 @@ struct lttng_consumer_stream *consumer_stream_create( goto error; } + stream->send_node = CDS_LIST_HEAD_INIT(stream->send_node); stream->chan = channel; stream->key = stream_key; stream->trace_chunk = trace_chunk; @@ -711,6 +745,8 @@ struct lttng_consumer_stream *consumer_stream_create( consumer_stream_metadata_lock_all; stream->read_subbuffer_ops.unlock = consumer_stream_metadata_unlock_all; + stream->read_subbuffer_ops.assert_locked = + consumer_stream_metadata_assert_locked_all; stream->read_subbuffer_ops.pre_consume_subbuffer = metadata_stream_check_version; } else { @@ -739,6 +775,8 @@ struct lttng_consumer_stream *consumer_stream_create( stream->read_subbuffer_ops.lock = consumer_stream_data_lock_all; stream->read_subbuffer_ops.unlock = consumer_stream_data_unlock_all; + stream->read_subbuffer_ops.assert_locked = + consumer_stream_data_assert_locked_all; stream->read_subbuffer_ops.pre_consume_subbuffer = consumer_stream_update_stats; } @@ -1023,6 +1061,8 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream, { LTTNG_ASSERT(stream); + cds_list_del_init(&stream->send_node); + /* Stream is in monitor mode. */ if (stream->monitor) { struct lttng_consumer_channel *free_chan = NULL; @@ -1035,10 +1075,12 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream, if (stream->globally_visible) { pthread_mutex_lock(&the_consumer_data.lock); pthread_mutex_lock(&stream->chan->lock); + pthread_mutex_lock(&stream->lock); /* Remove every reference of the stream in the consumer. */ consumer_stream_delete(stream, ht); + destroy_close_stream(stream); /* Update channel's refcount of the stream. */ @@ -1055,6 +1097,7 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream, * If the stream is not visible globally, this needs to be done * outside of the consumer data lock section. */ + destroy_close_stream(stream); free_chan = unref_channel(stream); } @@ -1233,7 +1276,7 @@ end: } static ssize_t metadata_bucket_consume( - struct lttng_consumer_local_data *unused, + struct lttng_consumer_local_data *unused __attribute__((unused)), struct lttng_consumer_stream *stream, const struct stream_subbuffer *subbuffer) {