consumerd: rename `data_read` to `has_data_left_to_be_read_before_teardown`
authorJérémie Galarneau <jeremie.galarneau@efficios.com>
Tue, 21 Jun 2022 20:46:29 +0000 (16:46 -0400)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Mon, 4 Jul 2022 15:43:43 +0000 (11:43 -0400)
Document the sequence of events after a stream hangs up and rename
the `data_read` stream attribute to give it a more specific name.

Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
Change-Id: Id7d26fa04fc0d2f0875dced18b6a3f6a7fe7d139

src/common/consumer/consumer.cpp
src/common/consumer/consumer.hpp

index ae8469ea233038b8478626e3da6fc70f0c014b59..8326a8e391d64d7506fcab19a981558955fcfd0b 100644 (file)
@@ -2713,7 +2713,7 @@ void *consumer_thread_data_poll(void *data)
                                        consumer_del_stream(local_stream[i], data_ht);
                                        local_stream[i] = NULL;
                                } else if (len > 0) {
-                                       local_stream[i]->data_read = 1;
+                                       local_stream[i]->has_data_left_to_be_read_before_teardown = 1;
                                }
                        }
                }
@@ -2744,7 +2744,7 @@ void *consumer_thread_data_poll(void *data)
                                        consumer_del_stream(local_stream[i], data_ht);
                                        local_stream[i] = NULL;
                                } else if (len > 0) {
-                                       local_stream[i]->data_read = 1;
+                                       local_stream[i]->has_data_left_to_be_read_before_teardown = 1;
                                }
                        }
                }
@@ -2764,37 +2764,45 @@ void *consumer_thread_data_poll(void *data)
                                                pollfd[i].fd);
                                lttng_ustconsumer_on_stream_hangup(local_stream[i]);
                                /* Attempt read again, for the data we just flushed. */
-                               local_stream[i]->data_read = 1;
+                               local_stream[i]->has_data_left_to_be_read_before_teardown = 1;
                        }
                        /*
+                        * When a stream's pipe dies (hup/err/nval), an "inactive producer" flush is
+                        * performed. This type of flush ensures that a new packet is produced no
+                        * matter the consumed/produced positions are.
+                        *
+                        * This, in turn, causes the next pass to see that data available for the
+                        * stream. When we come back here, we can be assured that all available
+                        * data has been consumed and we can finally destroy the stream.
+                        *
                         * If the poll flag is HUP/ERR/NVAL and we have
                         * read no data in this pass, we can remove the
                         * stream from its hash table.
                         */
                        if ((pollfd[i].revents & POLLHUP)) {
                                DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
-                               if (!local_stream[i]->data_read) {
+                               if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
                                        consumer_del_stream(local_stream[i], data_ht);
                                        local_stream[i] = NULL;
                                        num_hup++;
                                }
                        } else if (pollfd[i].revents & POLLERR) {
                                ERR("Error returned in polling fd %d.", pollfd[i].fd);
-                               if (!local_stream[i]->data_read) {
+                               if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
                                        consumer_del_stream(local_stream[i], data_ht);
                                        local_stream[i] = NULL;
                                        num_hup++;
                                }
                        } else if (pollfd[i].revents & POLLNVAL) {
                                ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
-                               if (!local_stream[i]->data_read) {
+                               if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
                                        consumer_del_stream(local_stream[i], data_ht);
                                        local_stream[i] = NULL;
                                        num_hup++;
                                }
                        }
                        if (local_stream[i] != NULL) {
-                               local_stream[i]->data_read = 0;
+                               local_stream[i]->has_data_left_to_be_read_before_teardown = 0;
                        }
                }
        }
index 2c16ae1fac051edf432dde8cbca73aa8ffa3bba9..dd8eb40d44798c8466c7f3d3eaa794ee6d60c169 100644 (file)
@@ -449,7 +449,11 @@ struct lttng_consumer_stream {
        /* Amount of bytes written to the output */
        uint64_t output_written;
        int shm_fd_is_copy;
-       int data_read;
+       /*
+        * When a stream's pipe is hung up, a final flush is performed (see hangup_flush_done). This
+        * indicates whether or not the data resulting from this flush is still to be consumed.
+        */
+       int has_data_left_to_be_read_before_teardown;
        int hangup_flush_done;
 
        /*
This page took 0.029583 seconds and 4 git commands to generate.