Clean-up: consumerd: reduce duplication of stream output close code
[lttng-tools.git] / src / common / consumer / consumer-stream.cpp
index e466c01551be7d73755f05483c89987dfd6d87b2..b30e9aac0680eb6795a763f836a5da8ff18349fa 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 2011 Julien Desfossez <julien.desfossez@polymtl.ca>
+ * Copyright (C) 2011 EfficiOS Inc.
  * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
  * Copyright (C) 2013 David Goulet <dgoulet@efficios.com>
  *
 #include <sys/mman.h>
 #include <unistd.h>
 
-#include <common/common.h>
-#include <common/consumer/consumer-timer.h>
-#include <common/consumer/consumer-timer.h>
-#include <common/consumer/consumer.h>
-#include <common/consumer/consumer.h>
-#include <common/consumer/metadata-bucket.h>
-#include <common/consumer/metadata-bucket.h>
-#include <common/index/index.h>
-#include <common/kernel-consumer/kernel-consumer.h>
-#include <common/kernel-ctl/kernel-ctl.h>
-#include <common/macros.h>
-#include <common/relayd/relayd.h>
-#include <common/ust-consumer/ust-consumer.h>
-#include <common/utils.h>
-
-#include "consumer-stream.h"
+#include <common/common.hpp>
+#include <common/consumer/consumer-timer.hpp>
+#include <common/consumer/consumer-timer.hpp>
+#include <common/consumer/consumer.hpp>
+#include <common/consumer/consumer.hpp>
+#include <common/consumer/metadata-bucket.hpp>
+#include <common/consumer/metadata-bucket.hpp>
+#include <common/index/index.hpp>
+#include <common/kernel-consumer/kernel-consumer.hpp>
+#include <common/kernel-ctl/kernel-ctl.hpp>
+#include <common/macros.hpp>
+#include <common/relayd/relayd.hpp>
+#include <common/ust-consumer/ust-consumer.hpp>
+#include <common/utils.hpp>
+
+#include "consumer-stream.hpp"
 
 /*
  * RCU call to free stream. MUST only be used with call_rcu().
@@ -35,9 +35,9 @@
 static void free_stream_rcu(struct rcu_head *head)
 {
        struct lttng_ht_node_u64 *node =
-               caa_container_of(head, struct lttng_ht_node_u64, head);
+               lttng::utils::container_of(head, &lttng_ht_node_u64::head);
        struct lttng_consumer_stream *stream =
-               caa_container_of(node, struct lttng_consumer_stream, node);
+               lttng::utils::container_of(node, &lttng_consumer_stream::node);
 
        pthread_mutex_destroy(&stream->lock);
        free(stream);
@@ -160,7 +160,7 @@ void ctf_packet_index_populate(struct ctf_packet_index *index,
 }
 
 static ssize_t consumer_stream_consume_mmap(
-               struct lttng_consumer_local_data *ctx,
+               struct lttng_consumer_local_data *ctx __attribute__((unused)),
                struct lttng_consumer_stream *stream,
                const struct stream_subbuffer *subbuffer)
 {
@@ -231,7 +231,7 @@ static ssize_t consumer_stream_consume_splice(
 static int consumer_stream_send_index(
                struct lttng_consumer_stream *stream,
                const struct stream_subbuffer *subbuffer,
-               struct lttng_consumer_local_data *ctx)
+               struct lttng_consumer_local_data *ctx __attribute__((unused)))
 {
        off_t packet_offset = 0;
        struct ctf_packet_index index = {};
@@ -419,6 +419,7 @@ static int consumer_stream_sync_metadata_index(
                const struct stream_subbuffer *subbuffer,
                struct lttng_consumer_local_data *ctx)
 {
+       bool missed_metadata_flush;
        int ret;
 
        /* Block until all the metadata is sent. */
@@ -431,18 +432,34 @@ static int consumer_stream_sync_metadata_index(
 
        pthread_mutex_lock(&stream->metadata_timer_lock);
        stream->waiting_on_metadata = false;
-       if (stream->missed_metadata_flush) {
+       missed_metadata_flush = stream->missed_metadata_flush;
+       if (missed_metadata_flush) {
                stream->missed_metadata_flush = false;
-               pthread_mutex_unlock(&stream->metadata_timer_lock);
-               (void) stream->read_subbuffer_ops.send_live_beacon(stream);
-       } else {
-               pthread_mutex_unlock(&stream->metadata_timer_lock);
        }
+       pthread_mutex_unlock(&stream->metadata_timer_lock);
        if (ret < 0) {
                goto end;
        }
 
        ret = consumer_stream_send_index(stream, subbuffer, ctx);
+       /*
+        * Send the live inactivity beacon to handle the situation where
+        * the live timer is prevented from sampling this stream
+        * because the stream lock was being held while this stream is
+        * waiting on metadata. This ensures live viewer progress in the
+        * unlikely scenario where a live timer would be prevented from
+        * locking a stream lock repeatedly due to a steady flow of
+        * incoming metadata, for a stream which is mostly inactive.
+        *
+        * It is important to send the inactivity beacon packet to
+        * relayd _after_ sending the index associated with the data
+        * that was just sent, otherwise this can cause live viewers to
+        * observe timestamps going backwards between an inactivity
+        * beacon and a following trace packet.
+        */
+       if (missed_metadata_flush) {
+               (void) stream->read_subbuffer_ops.send_live_beacon(stream);
+       }
 end:
        return ret;
 }
@@ -574,8 +591,8 @@ end:
  */
 static
 int post_consume_open_new_packet(struct lttng_consumer_stream *stream,
-               const struct stream_subbuffer *subbuffer,
-               struct lttng_consumer_local_data *ctx)
+               const struct stream_subbuffer *subbuffer __attribute__((unused)),
+               struct lttng_consumer_local_data *ctx __attribute__((unused)))
 {
        int ret = 0;
 
@@ -637,7 +654,7 @@ struct lttng_consumer_stream *consumer_stream_create(
        int ret;
        struct lttng_consumer_stream *stream;
 
-       stream = (lttng_consumer_stream *) zmalloc(sizeof(*stream));
+       stream = zmalloc<lttng_consumer_stream>();
        if (stream == NULL) {
                PERROR("malloc struct lttng_consumer_stream");
                ret = -ENOMEM;
@@ -652,6 +669,7 @@ struct lttng_consumer_stream *consumer_stream_create(
                goto error;
        }
 
+       stream->send_node = CDS_LIST_HEAD_INIT(stream->send_node);
        stream->chan = channel;
        stream->key = stream_key;
        stream->trace_chunk = trace_chunk;
@@ -832,69 +850,19 @@ void consumer_stream_relayd_close(struct lttng_consumer_stream *stream,
  * The consumer data lock MUST be acquired.
  * The stream lock MUST be acquired.
  */
-void consumer_stream_close(struct lttng_consumer_stream *stream)
+void consumer_stream_close_output(struct lttng_consumer_stream *stream)
 {
-       int ret;
        struct consumer_relayd_sock_pair *relayd;
 
        LTTNG_ASSERT(stream);
 
-       switch (the_consumer_data.type) {
-       case LTTNG_CONSUMER_KERNEL:
-               if (stream->mmap_base != NULL) {
-                       ret = munmap(stream->mmap_base, stream->mmap_len);
-                       if (ret != 0) {
-                               PERROR("munmap");
-                       }
-               }
-
-               if (stream->wait_fd >= 0) {
-                       ret = close(stream->wait_fd);
-                       if (ret) {
-                               PERROR("close");
-                       }
-                       stream->wait_fd = -1;
-               }
-               if (stream->chan->output == CONSUMER_CHANNEL_SPLICE) {
-                       utils_close_pipe(stream->splice_pipe);
-               }
-               break;
-       case LTTNG_CONSUMER32_UST:
-       case LTTNG_CONSUMER64_UST:
-       {
-               /*
-                * Special case for the metadata since the wait fd is an internal pipe
-                * polled in the metadata thread.
-                */
-               if (stream->metadata_flag && stream->chan->monitor) {
-                       int rpipe = stream->ust_metadata_poll_pipe[0];
-
-                       /*
-                        * This will stop the channel timer if one and close the write side
-                        * of the metadata poll pipe.
-                        */
-                       lttng_ustconsumer_close_metadata(stream->chan);
-                       if (rpipe >= 0) {
-                               ret = close(rpipe);
-                               if (ret < 0) {
-                                       PERROR("closing metadata pipe read side");
-                               }
-                               stream->ust_metadata_poll_pipe[0] = -1;
-                       }
-               }
-               break;
-       }
-       default:
-               ERR("Unknown consumer_data type");
-               abort();
-       }
-
        /* Close output fd. Could be a socket or local file at this point. */
        if (stream->out_fd >= 0) {
-               ret = close(stream->out_fd);
+               const auto ret = close(stream->out_fd);
                if (ret) {
-                       PERROR("close");
+                       PERROR("Failed to close stream output file descriptor");
                }
+
                stream->out_fd = -1;
        }
 
@@ -911,7 +879,9 @@ void consumer_stream_close(struct lttng_consumer_stream *stream)
        relayd = consumer_find_relayd(stream->net_seq_idx);
        if (relayd != NULL) {
                consumer_stream_relayd_close(stream, relayd);
+               stream->net_seq_idx = -1ULL;
        }
+
        rcu_read_unlock();
 }
 
@@ -983,9 +953,54 @@ void consumer_stream_destroy_buffers(struct lttng_consumer_stream *stream)
 
        switch (the_consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
+               if (stream->mmap_base != NULL) {
+                       const auto ret = munmap(stream->mmap_base, stream->mmap_len);
+
+                       if (ret != 0) {
+                               PERROR("munmap");
+                       }
+               }
+
+               if (stream->wait_fd >= 0) {
+                       const auto ret = close(stream->wait_fd);
+
+                       if (ret) {
+                               PERROR("close");
+                       }
+
+                       stream->wait_fd = -1;
+               }
+
+               if (stream->chan->output == CONSUMER_CHANNEL_SPLICE) {
+                       utils_close_pipe(stream->splice_pipe);
+               }
+
                break;
        case LTTNG_CONSUMER32_UST:
        case LTTNG_CONSUMER64_UST:
+               /*
+                * Special case for the metadata since the wait fd is an internal pipe
+                * polled in the metadata thread.
+                */
+               if (stream->metadata_flag && stream->chan->monitor) {
+                       const auto rpipe = stream->ust_metadata_poll_pipe[0];
+
+                       /*
+                        * This will stop the channel timer if one and close the write side
+                        * of the metadata poll pipe.
+                        */
+                       lttng_ustconsumer_close_metadata(stream->chan);
+                       if (rpipe >= 0) {
+                               const auto ret = close(rpipe);
+
+                               if (ret < 0) {
+                                       PERROR("closing metadata pipe read side");
+                               }
+
+                               stream->ust_metadata_poll_pipe[0] = -1;
+                       }
+               }
+
                lttng_ustconsumer_del_stream(stream);
                break;
        default:
@@ -1006,7 +1021,7 @@ static void destroy_close_stream(struct lttng_consumer_stream *stream)
        /* Destroy tracer buffers of the stream. */
        consumer_stream_destroy_buffers(stream);
        /* Close down everything including the relayd if one. */
-       consumer_stream_close(stream);
+       consumer_stream_close_output(stream);
 }
 
 /*
@@ -1043,6 +1058,8 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream,
 {
        LTTNG_ASSERT(stream);
 
+       cds_list_del_init(&stream->send_node);
+
        /* Stream is in monitor mode. */
        if (stream->monitor) {
                struct lttng_consumer_channel *free_chan = NULL;
@@ -1055,10 +1072,12 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream,
                if (stream->globally_visible) {
                        pthread_mutex_lock(&the_consumer_data.lock);
                        pthread_mutex_lock(&stream->chan->lock);
+
                        pthread_mutex_lock(&stream->lock);
                        /* Remove every reference of the stream in the consumer. */
                        consumer_stream_delete(stream, ht);
 
+
                        destroy_close_stream(stream);
 
                        /* Update channel's refcount of the stream. */
@@ -1075,6 +1094,7 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream,
                         * If the stream is not visible globally, this needs to be done
                         * outside of the consumer data lock section.
                         */
+                       destroy_close_stream(stream);
                        free_chan = unref_channel(stream);
                }
 
@@ -1253,7 +1273,7 @@ end:
 }
 
 static ssize_t metadata_bucket_consume(
-               struct lttng_consumer_local_data *unused,
+               struct lttng_consumer_local_data *unused __attribute__((unused)),
                struct lttng_consumer_stream *stream,
                const struct stream_subbuffer *subbuffer)
 {
This page took 0.028008 seconds and 4 git commands to generate.