summary |
shortlog |
log |
commit | commitdiff |
tree
raw |
patch |
inline | side by side (from parent 1:
d9ab8c6)
Document the sequence of events after a stream hangs up and rename
the `data_read` stream attribute to give it a more specific name.
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
Change-Id: Id7d26fa04fc0d2f0875dced18b6a3f6a7fe7d139
consumer_del_stream(local_stream[i], data_ht);
local_stream[i] = NULL;
} else if (len > 0) {
consumer_del_stream(local_stream[i], data_ht);
local_stream[i] = NULL;
} else if (len > 0) {
- local_stream[i]->data_read = 1;
+ local_stream[i]->has_data_left_to_be_read_before_teardown = 1;
consumer_del_stream(local_stream[i], data_ht);
local_stream[i] = NULL;
} else if (len > 0) {
consumer_del_stream(local_stream[i], data_ht);
local_stream[i] = NULL;
} else if (len > 0) {
- local_stream[i]->data_read = 1;
+ local_stream[i]->has_data_left_to_be_read_before_teardown = 1;
pollfd[i].fd);
lttng_ustconsumer_on_stream_hangup(local_stream[i]);
/* Attempt read again, for the data we just flushed. */
pollfd[i].fd);
lttng_ustconsumer_on_stream_hangup(local_stream[i]);
/* Attempt read again, for the data we just flushed. */
- local_stream[i]->data_read = 1;
+ local_stream[i]->has_data_left_to_be_read_before_teardown = 1;
+ * When a stream's pipe dies (hup/err/nval), an "inactive producer" flush is
+ * performed. This type of flush ensures that a new packet is produced no
+ * matter the consumed/produced positions are.
+ *
+ * This, in turn, causes the next pass to see that data available for the
+ * stream. When we come back here, we can be assured that all available
+ * data has been consumed and we can finally destroy the stream.
+ *
* If the poll flag is HUP/ERR/NVAL and we have
* read no data in this pass, we can remove the
* stream from its hash table.
*/
if ((pollfd[i].revents & POLLHUP)) {
DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
* If the poll flag is HUP/ERR/NVAL and we have
* read no data in this pass, we can remove the
* stream from its hash table.
*/
if ((pollfd[i].revents & POLLHUP)) {
DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
- if (!local_stream[i]->data_read) {
+ if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
consumer_del_stream(local_stream[i], data_ht);
local_stream[i] = NULL;
num_hup++;
}
} else if (pollfd[i].revents & POLLERR) {
ERR("Error returned in polling fd %d.", pollfd[i].fd);
consumer_del_stream(local_stream[i], data_ht);
local_stream[i] = NULL;
num_hup++;
}
} else if (pollfd[i].revents & POLLERR) {
ERR("Error returned in polling fd %d.", pollfd[i].fd);
- if (!local_stream[i]->data_read) {
+ if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
consumer_del_stream(local_stream[i], data_ht);
local_stream[i] = NULL;
num_hup++;
}
} else if (pollfd[i].revents & POLLNVAL) {
ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
consumer_del_stream(local_stream[i], data_ht);
local_stream[i] = NULL;
num_hup++;
}
} else if (pollfd[i].revents & POLLNVAL) {
ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
- if (!local_stream[i]->data_read) {
+ if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
consumer_del_stream(local_stream[i], data_ht);
local_stream[i] = NULL;
num_hup++;
}
}
if (local_stream[i] != NULL) {
consumer_del_stream(local_stream[i], data_ht);
local_stream[i] = NULL;
num_hup++;
}
}
if (local_stream[i] != NULL) {
- local_stream[i]->data_read = 0;
+ local_stream[i]->has_data_left_to_be_read_before_teardown = 0;
/* Amount of bytes written to the output */
uint64_t output_written;
int shm_fd_is_copy;
/* Amount of bytes written to the output */
uint64_t output_written;
int shm_fd_is_copy;
+ /*
+ * When a stream's pipe is hung up, a final flush is performed (see hangup_flush_done). This
+ * indicates whether or not the data resulting from this flush is still to be consumed.
+ */
+ int has_data_left_to_be_read_before_teardown;
int hangup_flush_done;
/*
int hangup_flush_done;
/*