Build fix: missing stdio.h include in signal-helper.hpp
[lttng-tools.git] / src / common / consumer / consumer.cpp
index a5691b1c27df6deec93703bdc38d19121d3e237b..f97c119705451c1f2080e99327de6488e9283062 100644 (file)
@@ -209,7 +209,7 @@ static struct lttng_consumer_stream *find_stream(uint64_t key,
        lttng_ht_lookup(ht, &key, &iter);
        node = lttng_ht_iter_get_node_u64(&iter);
        if (node != NULL) {
-               stream = caa_container_of(node, struct lttng_consumer_stream, node);
+               stream = lttng::utils::container_of(node, &lttng_consumer_stream::node);
        }
 
        rcu_read_unlock();
@@ -257,7 +257,7 @@ struct lttng_consumer_channel *consumer_find_channel(uint64_t key)
        lttng_ht_lookup(the_consumer_data.channel_ht, &key, &iter);
        node = lttng_ht_iter_get_node_u64(&iter);
        if (node != NULL) {
-               channel = caa_container_of(node, struct lttng_consumer_channel, node);
+               channel = lttng::utils::container_of(node, &lttng_consumer_channel::node);
        }
 
        return channel;
@@ -292,9 +292,9 @@ static void steal_channel_key(uint64_t key)
 static void free_channel_rcu(struct rcu_head *head)
 {
        struct lttng_ht_node_u64 *node =
-               caa_container_of(head, struct lttng_ht_node_u64, head);
+               lttng::utils::container_of(head, &lttng_ht_node_u64::head);
        struct lttng_consumer_channel *channel =
-               caa_container_of(node, struct lttng_consumer_channel, node);
+               lttng::utils::container_of(node, &lttng_consumer_channel::node);
 
        switch (the_consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
@@ -316,9 +316,9 @@ static void free_channel_rcu(struct rcu_head *head)
 static void free_relayd_rcu(struct rcu_head *head)
 {
        struct lttng_ht_node_u64 *node =
-               caa_container_of(head, struct lttng_ht_node_u64, head);
+               lttng::utils::container_of(head, &lttng_ht_node_u64::head);
        struct consumer_relayd_sock_pair *relayd =
-               caa_container_of(node, struct consumer_relayd_sock_pair, node);
+               lttng::utils::container_of(node, &consumer_relayd_sock_pair::node);
 
        /*
         * Close all sockets. This is done in the call RCU since we don't want the
@@ -383,6 +383,12 @@ void consumer_del_channel(struct lttng_consumer_channel *channel)
                consumer_timer_monitor_stop(channel);
        }
 
+       /*
+        * Send a last buffer statistics sample to the session daemon
+        * to ensure it tracks the amount of data consumed by this channel.
+        */
+       sample_and_send_channel_buffer_stats(channel);
+
        switch (the_consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
                break;
@@ -705,7 +711,7 @@ struct consumer_relayd_sock_pair *consumer_find_relayd(uint64_t key)
        lttng_ht_lookup(the_consumer_data.relayd_ht, &key, &iter);
        node = lttng_ht_iter_get_node_u64(&iter);
        if (node != NULL) {
-               relayd = caa_container_of(node, struct consumer_relayd_sock_pair, node);
+               relayd = lttng::utils::container_of(node, &consumer_relayd_sock_pair::node);
        }
 
 error:
@@ -2161,7 +2167,7 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
        consumer_stream_delete(stream, ht);
 
        /* Close down everything including the relayd if one. */
-       consumer_stream_close(stream);
+       consumer_stream_close_output(stream);
        /* Destroy tracer buffers of the stream. */
        consumer_stream_destroy_buffers(stream);
 
@@ -2422,9 +2428,8 @@ restart:
                                                        stream->wait_fd);
 
                                        /* Add metadata stream to the global poll events list */
-                                       lttng_poll_add(&events, stream->wait_fd,
-                                                       LPOLLIN | LPOLLPRI | LPOLLHUP);
-                               } else if (revents & (LPOLLERR | LPOLLHUP)) {
+                                       lttng_poll_add(&events, stream->wait_fd, LPOLLIN | LPOLLPRI);
+                               }else if (revents & (LPOLLERR | LPOLLHUP)) {
                                        DBG("Metadata thread pipe hung up");
                                        /*
                                         * Remove the pipe from the poll set and continue the loop
@@ -2713,7 +2718,7 @@ void *consumer_thread_data_poll(void *data)
                                        consumer_del_stream(local_stream[i], data_ht);
                                        local_stream[i] = NULL;
                                } else if (len > 0) {
-                                       local_stream[i]->data_read = 1;
+                                       local_stream[i]->has_data_left_to_be_read_before_teardown = 1;
                                }
                        }
                }
@@ -2744,7 +2749,7 @@ void *consumer_thread_data_poll(void *data)
                                        consumer_del_stream(local_stream[i], data_ht);
                                        local_stream[i] = NULL;
                                } else if (len > 0) {
-                                       local_stream[i]->data_read = 1;
+                                       local_stream[i]->has_data_left_to_be_read_before_teardown = 1;
                                }
                        }
                }
@@ -2764,37 +2769,45 @@ void *consumer_thread_data_poll(void *data)
                                                pollfd[i].fd);
                                lttng_ustconsumer_on_stream_hangup(local_stream[i]);
                                /* Attempt read again, for the data we just flushed. */
-                               local_stream[i]->data_read = 1;
+                               local_stream[i]->has_data_left_to_be_read_before_teardown = 1;
                        }
                        /*
+                        * When a stream's pipe dies (hup/err/nval), an "inactive producer" flush is
+                        * performed. This type of flush ensures that a new packet is produced no
+                        * matter the consumed/produced positions are.
+                        *
+                        * This, in turn, causes the next pass to see that data available for the
+                        * stream. When we come back here, we can be assured that all available
+                        * data has been consumed and we can finally destroy the stream.
+                        *
                         * If the poll flag is HUP/ERR/NVAL and we have
                         * read no data in this pass, we can remove the
                         * stream from its hash table.
                         */
                        if ((pollfd[i].revents & POLLHUP)) {
                                DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
-                               if (!local_stream[i]->data_read) {
+                               if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
                                        consumer_del_stream(local_stream[i], data_ht);
                                        local_stream[i] = NULL;
                                        num_hup++;
                                }
                        } else if (pollfd[i].revents & POLLERR) {
                                ERR("Error returned in polling fd %d.", pollfd[i].fd);
-                               if (!local_stream[i]->data_read) {
+                               if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
                                        consumer_del_stream(local_stream[i], data_ht);
                                        local_stream[i] = NULL;
                                        num_hup++;
                                }
                        } else if (pollfd[i].revents & POLLNVAL) {
                                ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
-                               if (!local_stream[i]->data_read) {
+                               if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
                                        consumer_del_stream(local_stream[i], data_ht);
                                        local_stream[i] = NULL;
                                        num_hup++;
                                }
                        }
                        if (local_stream[i] != NULL) {
-                               local_stream[i]->data_read = 0;
+                               local_stream[i]->has_data_left_to_be_read_before_teardown = 0;
                        }
                }
        }
@@ -3006,8 +3019,8 @@ restart:
                                                                &chan->wait_fd_node);
                                                rcu_read_unlock();
                                                /* Add channel to the global poll events list */
-                                               lttng_poll_add(&events, chan->wait_fd,
-                                                               LPOLLERR | LPOLLHUP);
+                                               // FIXME: Empty flag on a pipe pollset, this might hang on FreeBSD.
+                                               lttng_poll_add(&events, chan->wait_fd, 0);
                                                break;
                                        case CONSUMER_CHANNEL_DEL:
                                        {
This page took 0.026363 seconds and 4 git commands to generate.