From c715ddc950bf653d9456d92c6ead2e3cbd3c54ae Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=A9mie=20Galarneau?= Date: Tue, 21 Jun 2022 16:46:29 -0400 Subject: [PATCH] consumerd: rename `data_read` to `has_data_left_to_be_read_before_teardown` MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Document the sequence of events after a stream hangs up and rename the `data_read` stream attribute to give it a more specific name. Signed-off-by: Jérémie Galarneau Change-Id: Id7d26fa04fc0d2f0875dced18b6a3f6a7fe7d139 --- src/common/consumer/consumer.cpp | 22 +++++++++++++++------- src/common/consumer/consumer.hpp | 6 +++++- 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/src/common/consumer/consumer.cpp b/src/common/consumer/consumer.cpp index ae8469ea2..8326a8e39 100644 --- a/src/common/consumer/consumer.cpp +++ b/src/common/consumer/consumer.cpp @@ -2713,7 +2713,7 @@ void *consumer_thread_data_poll(void *data) consumer_del_stream(local_stream[i], data_ht); local_stream[i] = NULL; } else if (len > 0) { - local_stream[i]->data_read = 1; + local_stream[i]->has_data_left_to_be_read_before_teardown = 1; } } } @@ -2744,7 +2744,7 @@ void *consumer_thread_data_poll(void *data) consumer_del_stream(local_stream[i], data_ht); local_stream[i] = NULL; } else if (len > 0) { - local_stream[i]->data_read = 1; + local_stream[i]->has_data_left_to_be_read_before_teardown = 1; } } } @@ -2764,37 +2764,45 @@ void *consumer_thread_data_poll(void *data) pollfd[i].fd); lttng_ustconsumer_on_stream_hangup(local_stream[i]); /* Attempt read again, for the data we just flushed. */ - local_stream[i]->data_read = 1; + local_stream[i]->has_data_left_to_be_read_before_teardown = 1; } /* + * When a stream's pipe dies (hup/err/nval), an "inactive producer" flush is + * performed. This type of flush ensures that a new packet is produced no + * matter the consumed/produced positions are. + * + * This, in turn, causes the next pass to see that data available for the + * stream. When we come back here, we can be assured that all available + * data has been consumed and we can finally destroy the stream. + * * If the poll flag is HUP/ERR/NVAL and we have * read no data in this pass, we can remove the * stream from its hash table. */ if ((pollfd[i].revents & POLLHUP)) { DBG("Polling fd %d tells it has hung up.", pollfd[i].fd); - if (!local_stream[i]->data_read) { + if (!local_stream[i]->has_data_left_to_be_read_before_teardown) { consumer_del_stream(local_stream[i], data_ht); local_stream[i] = NULL; num_hup++; } } else if (pollfd[i].revents & POLLERR) { ERR("Error returned in polling fd %d.", pollfd[i].fd); - if (!local_stream[i]->data_read) { + if (!local_stream[i]->has_data_left_to_be_read_before_teardown) { consumer_del_stream(local_stream[i], data_ht); local_stream[i] = NULL; num_hup++; } } else if (pollfd[i].revents & POLLNVAL) { ERR("Polling fd %d tells fd is not open.", pollfd[i].fd); - if (!local_stream[i]->data_read) { + if (!local_stream[i]->has_data_left_to_be_read_before_teardown) { consumer_del_stream(local_stream[i], data_ht); local_stream[i] = NULL; num_hup++; } } if (local_stream[i] != NULL) { - local_stream[i]->data_read = 0; + local_stream[i]->has_data_left_to_be_read_before_teardown = 0; } } } diff --git a/src/common/consumer/consumer.hpp b/src/common/consumer/consumer.hpp index 2c16ae1fa..dd8eb40d4 100644 --- a/src/common/consumer/consumer.hpp +++ b/src/common/consumer/consumer.hpp @@ -449,7 +449,11 @@ struct lttng_consumer_stream { /* Amount of bytes written to the output */ uint64_t output_written; int shm_fd_is_copy; - int data_read; + /* + * When a stream's pipe is hung up, a final flush is performed (see hangup_flush_done). This + * indicates whether or not the data resulting from this flush is still to be consumed. + */ + int has_data_left_to_be_read_before_teardown; int hangup_flush_done; /* -- 2.34.1