From d01178b6f6465443d7e6e1015aa7054e9d093e91 Mon Sep 17 00:00:00 2001 From: David Goulet Date: Thu, 4 Jul 2013 16:25:27 -0400 Subject: [PATCH] Fix: relayd refcount updates for stream Increment refcount only when the stream was successfully sent to the relayd and set the new stream's flag "sent_to_relayd" which is used before the refcount update when closing the relayd. A stream that was unable to be sent, the close relayd code path does not decrement the refcount anymore. Signed-off-by: David Goulet --- src/common/consumer-stream.c | 7 +++-- src/common/consumer.c | 15 +--------- src/common/consumer.h | 8 +++++ src/common/ust-consumer/ust-consumer.c | 41 ++------------------------ 4 files changed, 17 insertions(+), 54 deletions(-) diff --git a/src/common/consumer-stream.c b/src/common/consumer-stream.c index 03bac8687..723ec829f 100644 --- a/src/common/consumer-stream.c +++ b/src/common/consumer-stream.c @@ -58,8 +58,10 @@ void consumer_stream_relayd_close(struct lttng_consumer_stream *stream, assert(stream); assert(relayd); - uatomic_dec(&relayd->refcount); - assert(uatomic_read(&relayd->refcount) >= 0); + if (stream->sent_to_relayd) { + uatomic_dec(&relayd->refcount); + assert(uatomic_read(&relayd->refcount) >= 0); + } /* Closing streams requires to lock the control socket. */ pthread_mutex_lock(&relayd->ctrl_sock_mutex); @@ -82,6 +84,7 @@ void consumer_stream_relayd_close(struct lttng_consumer_stream *stream, consumer_destroy_relayd(relayd); } stream->net_seq_idx = (uint64_t) -1ULL; + stream->sent_to_relayd = 0; } /* diff --git a/src/common/consumer.c b/src/common/consumer.c index 3aafb5193..910f386d9 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -540,7 +540,6 @@ static int add_stream(struct lttng_consumer_stream *stream, struct lttng_ht *ht) { int ret = 0; - struct consumer_relayd_sock_pair *relayd; assert(stream); assert(ht); @@ -566,12 +565,6 @@ static int add_stream(struct lttng_consumer_stream *stream, */ lttng_ht_add_u64(consumer_data.stream_list_ht, &stream->node_session_id); - /* Check and cleanup relayd */ - relayd = consumer_find_relayd(stream->net_seq_idx); - if (relayd != NULL) { - uatomic_inc(&relayd->refcount); - } - /* * When nb_init_stream_left reaches 0, we don't need to trigger any action * in terms of destroying the associated channel, because the action that @@ -709,6 +702,7 @@ int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, goto end; } uatomic_inc(&relayd->refcount); + stream->sent_to_relayd = 1; } else { ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't send it.", stream->key, stream->net_seq_idx); @@ -1969,7 +1963,6 @@ static int add_metadata_stream(struct lttng_consumer_stream *stream, struct lttng_ht *ht) { int ret = 0; - struct consumer_relayd_sock_pair *relayd; struct lttng_ht_iter iter; struct lttng_ht_node_u64 *node; @@ -1996,12 +1989,6 @@ static int add_metadata_stream(struct lttng_consumer_stream *stream, node = lttng_ht_iter_get_node_u64(&iter); assert(!node); - /* Find relayd and, if one is found, increment refcount. */ - relayd = consumer_find_relayd(stream->net_seq_idx); - if (relayd != NULL) { - uatomic_inc(&relayd->refcount); - } - /* * When nb_init_stream_left reaches 0, we don't need to trigger any action * in terms of destroying the associated channel, because the action that diff --git a/src/common/consumer.h b/src/common/consumer.h index a64bcad3b..23f2c9d84 100644 --- a/src/common/consumer.h +++ b/src/common/consumer.h @@ -216,6 +216,14 @@ struct lttng_consumer_stream { gid_t gid; /* Network sequence number. Indicating on which relayd socket it goes. */ uint64_t net_seq_idx; + /* + * Indicate if this stream was successfully sent to a relayd. This is set + * after the refcount of the relayd is incremented and is checked when the + * stream is closed before decrementing the refcount in order to avoid an + * unbalanced state. + */ + unsigned int sent_to_relayd; + /* Identify if the stream is the metadata */ unsigned int metadata_flag; /* Used when the stream is set for network streaming */ diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 4a7c6db1a..3133835be 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -209,42 +209,6 @@ static int send_stream_to_thread(struct lttng_consumer_stream *stream, return ret; } -/* - * Search for a relayd object related to the stream. If found, send the stream - * to the relayd. - * - * On success, returns 0 else a negative value. - */ -static int send_stream_to_relayd(struct lttng_consumer_stream *stream) -{ - int ret = 0; - struct consumer_relayd_sock_pair *relayd; - - assert(stream); - - relayd = consumer_find_relayd(stream->net_seq_idx); - if (relayd != NULL) { - pthread_mutex_lock(&relayd->ctrl_sock_mutex); - /* Add stream on the relayd */ - ret = relayd_add_stream(&relayd->control_sock, stream->name, - stream->chan->pathname, &stream->relayd_stream_id, - stream->chan->tracefile_size, - stream->chan->tracefile_count); - pthread_mutex_unlock(&relayd->ctrl_sock_mutex); - if (ret < 0) { - goto error; - } - } else if (stream->net_seq_idx != (uint64_t) -1ULL) { - ERR("Network sequence index %" PRIu64 " unknown. Not adding stream.", - stream->net_seq_idx); - ret = -1; - goto error; - } - -error: - return ret; -} - /* * Create streams for the given channel using liblttng-ust-ctl. * @@ -411,7 +375,7 @@ static int send_sessiond_channel(int sock, cds_list_for_each_entry(stream, &channel->streams.head, send_node) { /* Try to send the stream to the relayd if one is available. */ - ret = send_stream_to_relayd(stream); + ret = consumer_send_relayd_stream(stream, stream->chan->pathname); if (ret < 0) { /* * Flag that the relayd was the problem here probably due to a @@ -737,7 +701,8 @@ static int setup_metadata(struct lttng_consumer_local_data *ctx, uint64_t key) } /* Send metadata stream to relayd if needed. */ - ret = send_stream_to_relayd(metadata->metadata_stream); + ret = consumer_send_relayd_stream(metadata->metadata_stream, + metadata->pathname); if (ret < 0) { ret = LTTCOMM_CONSUMERD_ERROR_METADATA; goto error; -- 2.34.1