Fix: cleanup inactive FDs in the consumer polling thread
authorJulien Desfossez <jdesfossez@efficios.com>
Thu, 1 Feb 2018 19:24:10 +0000 (14:24 -0500)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Mon, 5 Feb 2018 17:20:59 +0000 (12:20 -0500)
Users have reported assert() hitting on consumerd shutdown on a
non-empty data stream hash table.

Relevant stack trace:
[...] in lttng_ht_destroy (ht=0x6) at hashtable.c:162
[...] in lttng_consumer_cleanup () at consumer.c:1207
[...] in main ([...]) at lttng-consumerd.c:625

This is reproducible when a consumerd is shutting down at the same
time as one of its relay daemon peers.

On failure to reach a relay daemon, all of that relay daemons'
associated streams are marked as having an inactive endpoint (see
cleanup_relayd(), consumer.c:467). The data polling thread is notified
of the change through an empty message on the "data" pipe.

Before blocking on the next poll(), the data polling thread checks if
it needs to update its poll set using the "need_update" flag. This
flag is set anytime a stream is added or deleted.

While building a new poll set, streams that are now marked as inactive
or as having an inactive endpoint are not included in the new poll
set. Those inactive streams are in a transitional state, awaiting
a clean-up.

After updating the poll set, the data polling thread checks if it
should quit (via the consumer_quit flag). Assuming this flag is set,
the thread cannot simply exit; it must clean-up any remaining data
stream.

The thread currently performs this check at consumer.c:2532. This
check is erroneous as it assumes that the number of FDs in the poll set is
indicative of the number of FDs the thread has ownership of.

If all streams are inactive, the poll set will contain no FDs to
monitor and the thread will assume that it can exit. This will leave
streams in "data_ht", causing an assertion to hit in the main thread
during the clean-up.

This patch adds an inactive FD count which must also reach zero before
the data polling thread can exit.

The clean-up of the inactive streams occurs as the data polling thread
wakes-up on its "data" pipe. Upon being woken-up on the "data" pipe,
the data polling thread will validate the endpoint status of every
data stream and close those that have been marked as inactive
(see consumer_del_stream(), consumer.c:525).

This occurs as often as necessary to allow the thread to clean-up all
of its inactive streams and exit cleanly.

Signed-off-by: Julien Desfossez <jdesfossez@efficios.com>
Signed-off-by: Jonathan Rajotte <jonathan.rajotte-julien@efficios.com>
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
src/common/consumer/consumer.c

index 96ad8512e394c9ca86efb47a005d85caecd1a622..3755932ab59cf8ad177607b5ad64c95a9c76bcae 100644 (file)
@@ -1074,7 +1074,7 @@ int consumer_add_channel(struct lttng_consumer_channel *channel,
  */
 static int update_poll_array(struct lttng_consumer_local_data *ctx,
                struct pollfd **pollfd, struct lttng_consumer_stream **local_stream,
-               struct lttng_ht *ht)
+               struct lttng_ht *ht, int *nb_inactive_fd)
 {
        int i = 0;
        struct lttng_ht_iter iter;
@@ -1086,6 +1086,7 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx,
        assert(local_stream);
 
        DBG("Updating poll fd array");
+       *nb_inactive_fd = 0;
        rcu_read_lock();
        cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
                /*
@@ -1096,9 +1097,14 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx,
                 * just after the check. However, this is OK since the stream(s) will
                 * be deleted once the thread is notified that the end point state has
                 * changed where this function will be called back again.
+                *
+                * We track the number of inactive FDs because they still need to be
+                * closed by the polling thread after a wakeup on the data_pipe or
+                * metadata_pipe.
                 */
                if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM ||
                                stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) {
+                       (*nb_inactive_fd)++;
                        continue;
                }
                /*
@@ -2452,6 +2458,8 @@ void *consumer_thread_data_poll(void *data)
        struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL;
        /* local view of consumer_data.fds_count */
        int nb_fd = 0;
+       /* Number of FDs with CONSUMER_ENDPOINT_INACTIVE but still open. */
+       int nb_inactive_fd = 0;
        struct lttng_consumer_local_data *ctx = data;
        ssize_t len;
 
@@ -2508,7 +2516,7 @@ void *consumer_thread_data_poll(void *data)
                                goto end;
                        }
                        ret = update_poll_array(ctx, &pollfd, local_stream,
-                                       data_ht);
+                                       data_ht, &nb_inactive_fd);
                        if (ret < 0) {
                                ERR("Error in allocating pollfd or local_outfds");
                                lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
@@ -2521,7 +2529,8 @@ void *consumer_thread_data_poll(void *data)
                pthread_mutex_unlock(&consumer_data.lock);
 
                /* No FDs and consumer_quit, consumer_cleanup the thread */
-               if (nb_fd == 0 && CMM_LOAD_SHARED(consumer_quit) == 1) {
+               if (nb_fd == 0 && nb_inactive_fd == 0 &&
+                               CMM_LOAD_SHARED(consumer_quit) == 1) {
                        err = 0;        /* All is OK */
                        goto end;
                }
This page took 0.027795 seconds and 4 git commands to generate.