From: Julien Desfossez Date: Thu, 1 Feb 2018 19:24:10 +0000 (-0500) Subject: Fix: cleanup inactive FDs in the consumer polling thread X-Git-Tag: v2.11.0-rc1~407 X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=commitdiff_plain;h=9a2fcf78ea85b9ad74405db4605338896d70b46f Fix: cleanup inactive FDs in the consumer polling thread Users have reported assert() hitting on consumerd shutdown on a non-empty data stream hash table. Relevant stack trace: [...] in lttng_ht_destroy (ht=0x6) at hashtable.c:162 [...] in lttng_consumer_cleanup () at consumer.c:1207 [...] in main ([...]) at lttng-consumerd.c:625 This is reproducible when a consumerd is shutting down at the same time as one of its relay daemon peers. On failure to reach a relay daemon, all of that relay daemons' associated streams are marked as having an inactive endpoint (see cleanup_relayd(), consumer.c:467). The data polling thread is notified of the change through an empty message on the "data" pipe. Before blocking on the next poll(), the data polling thread checks if it needs to update its poll set using the "need_update" flag. This flag is set anytime a stream is added or deleted. While building a new poll set, streams that are now marked as inactive or as having an inactive endpoint are not included in the new poll set. Those inactive streams are in a transitional state, awaiting a clean-up. After updating the poll set, the data polling thread checks if it should quit (via the consumer_quit flag). Assuming this flag is set, the thread cannot simply exit; it must clean-up any remaining data stream. The thread currently performs this check at consumer.c:2532. This check is erroneous as it assumes that the number of FDs in the poll set is indicative of the number of FDs the thread has ownership of. If all streams are inactive, the poll set will contain no FDs to monitor and the thread will assume that it can exit. This will leave streams in "data_ht", causing an assertion to hit in the main thread during the clean-up. This patch adds an inactive FD count which must also reach zero before the data polling thread can exit. The clean-up of the inactive streams occurs as the data polling thread wakes-up on its "data" pipe. Upon being woken-up on the "data" pipe, the data polling thread will validate the endpoint status of every data stream and close those that have been marked as inactive (see consumer_del_stream(), consumer.c:525). This occurs as often as necessary to allow the thread to clean-up all of its inactive streams and exit cleanly. Signed-off-by: Julien Desfossez Signed-off-by: Jonathan Rajotte Signed-off-by: Jérémie Galarneau --- diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index 96ad8512e..3755932ab 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -1074,7 +1074,7 @@ int consumer_add_channel(struct lttng_consumer_channel *channel, */ static int update_poll_array(struct lttng_consumer_local_data *ctx, struct pollfd **pollfd, struct lttng_consumer_stream **local_stream, - struct lttng_ht *ht) + struct lttng_ht *ht, int *nb_inactive_fd) { int i = 0; struct lttng_ht_iter iter; @@ -1086,6 +1086,7 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx, assert(local_stream); DBG("Updating poll fd array"); + *nb_inactive_fd = 0; rcu_read_lock(); cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) { /* @@ -1096,9 +1097,14 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx, * just after the check. However, this is OK since the stream(s) will * be deleted once the thread is notified that the end point state has * changed where this function will be called back again. + * + * We track the number of inactive FDs because they still need to be + * closed by the polling thread after a wakeup on the data_pipe or + * metadata_pipe. */ if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM || stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) { + (*nb_inactive_fd)++; continue; } /* @@ -2452,6 +2458,8 @@ void *consumer_thread_data_poll(void *data) struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL; /* local view of consumer_data.fds_count */ int nb_fd = 0; + /* Number of FDs with CONSUMER_ENDPOINT_INACTIVE but still open. */ + int nb_inactive_fd = 0; struct lttng_consumer_local_data *ctx = data; ssize_t len; @@ -2508,7 +2516,7 @@ void *consumer_thread_data_poll(void *data) goto end; } ret = update_poll_array(ctx, &pollfd, local_stream, - data_ht); + data_ht, &nb_inactive_fd); if (ret < 0) { ERR("Error in allocating pollfd or local_outfds"); lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR); @@ -2521,7 +2529,8 @@ void *consumer_thread_data_poll(void *data) pthread_mutex_unlock(&consumer_data.lock); /* No FDs and consumer_quit, consumer_cleanup the thread */ - if (nb_fd == 0 && CMM_LOAD_SHARED(consumer_quit) == 1) { + if (nb_fd == 0 && nb_inactive_fd == 0 && + CMM_LOAD_SHARED(consumer_quit) == 1) { err = 0; /* All is OK */ goto end; }