Fix: ust-consumerd: set `hangup_flush_done` in a locked context
[lttng-tools.git] / src / common / ust-consumer / ust-consumer.cpp
index a2bf099b236f94772c9fd1d9fbb0fa3831639848..0340434448e7be0350a1db630f5eea9b63b44a9f 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 2011 Julien Desfossez <julien.desfossez@polymtl.ca>
+ * Copyright (C) 2011 EfficiOS Inc.
  * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
  * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
  *
 #include <stdbool.h>
 #include <stdint.h>
 
-#include <bin/lttng-consumerd/health-consumerd.h>
-#include <common/common.h>
-#include <common/sessiond-comm/sessiond-comm.h>
-#include <common/relayd/relayd.h>
-#include <common/compat/fcntl.h>
-#include <common/compat/endian.h>
-#include <common/consumer/consumer-metadata-cache.h>
-#include <common/consumer/consumer-stream.h>
-#include <common/consumer/consumer-timer.h>
-#include <common/utils.h>
-#include <common/index/index.h>
-#include <common/consumer/consumer.h>
-#include <common/shm.h>
-#include <common/optional.h>
-
-#include "ust-consumer.h"
+#include <bin/lttng-consumerd/health-consumerd.hpp>
+#include <common/common.hpp>
+#include <common/sessiond-comm/sessiond-comm.hpp>
+#include <common/relayd/relayd.hpp>
+#include <common/compat/fcntl.hpp>
+#include <common/compat/endian.hpp>
+#include <common/consumer/consumer-metadata-cache.hpp>
+#include <common/consumer/consumer-stream.hpp>
+#include <common/consumer/consumer-timer.hpp>
+#include <common/utils.hpp>
+#include <common/index/index.hpp>
+#include <common/consumer/consumer.hpp>
+#include <common/shm.hpp>
+#include <common/optional.hpp>
+
+#include "ust-consumer.hpp"
 
 #define INT_MAX_STR_LEN 12     /* includes \0 */
 
@@ -49,46 +49,6 @@ extern int consumer_poll_timeout;
 
 LTTNG_EXPORT DEFINE_LTTNG_UST_SIGBUS_STATE();
 
-/*
- * Free channel object and all streams associated with it. This MUST be used
- * only and only if the channel has _NEVER_ been added to the global channel
- * hash table.
- */
-static void destroy_channel(struct lttng_consumer_channel *channel)
-{
-       struct lttng_consumer_stream *stream, *stmp;
-
-       LTTNG_ASSERT(channel);
-
-       DBG("UST consumer cleaning stream list");
-
-       cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
-                       send_node) {
-
-               health_code_update();
-
-               cds_list_del(&stream->send_node);
-               lttng_ust_ctl_destroy_stream(stream->ustream);
-               lttng_trace_chunk_put(stream->trace_chunk);
-               free(stream);
-       }
-
-       /*
-        * If a channel is available meaning that was created before the streams
-        * were, delete it.
-        */
-       if (channel->uchan) {
-               lttng_ustconsumer_del_channel(channel);
-               lttng_ustconsumer_free_channel(channel);
-       }
-
-       if (channel->trace_chunk) {
-               lttng_trace_chunk_put(channel->trace_chunk);
-       }
-
-       free(channel);
-}
-
 /*
  * Add channel to internal consumer state.
  *
@@ -203,7 +163,7 @@ static int send_stream_to_thread(struct lttng_consumer_stream *stream,
         * global.
         */
        stream->globally_visible = 1;
-       cds_list_del(&stream->send_node);
+       cds_list_del_init(&stream->send_node);
 
        ret = lttng_pipe_write(stream_pipe, &stream, sizeof(stream));
        if (ret < 0) {
@@ -407,7 +367,7 @@ static int create_ust_channel(struct lttng_consumer_channel *channel,
                nr_stream_fds = 1;
        else
                nr_stream_fds = lttng_ust_ctl_get_nr_stream_per_channel();
-       stream_fds = (int *) zmalloc(nr_stream_fds * sizeof(*stream_fds));
+       stream_fds = calloc<int>(nr_stream_fds);
        if (!stream_fds) {
                ret = -1;
                goto error_alloc;
@@ -876,6 +836,8 @@ static int setup_metadata(struct lttng_consumer_local_data *ctx, uint64_t key)
        int ret;
        struct lttng_consumer_channel *metadata;
 
+       ASSERT_RCU_READ_LOCKED();
+
        DBG("UST consumer setup metadata key %" PRIu64, key);
 
        metadata = consumer_find_channel(key);
@@ -948,7 +910,6 @@ error:
         * will make sure to clean that list.
         */
        consumer_stream_destroy(metadata->metadata_stream, NULL);
-       cds_list_del(&metadata->metadata_stream->send_node);
        metadata->metadata_stream = NULL;
 send_streams_error:
 error_no_stream:
@@ -971,6 +932,7 @@ static int snapshot_metadata(struct lttng_consumer_channel *metadata_channel,
 
        LTTNG_ASSERT(path);
        LTTNG_ASSERT(ctx);
+       ASSERT_RCU_READ_LOCKED();
 
        DBG("UST consumer snapshot metadata with key %" PRIu64 " at path %s",
                        key, path);
@@ -1004,7 +966,7 @@ static int snapshot_metadata(struct lttng_consumer_channel *metadata_channel,
        metadata_stream = metadata_channel->metadata_stream;
        LTTNG_ASSERT(metadata_stream);
 
-       pthread_mutex_lock(&metadata_stream->lock);
+       metadata_stream->read_subbuffer_ops.lock(metadata_stream);
        if (relayd_id != (uint64_t) -1ULL) {
                metadata_stream->net_seq_idx = relayd_id;
                ret = consumer_send_relayd_stream(metadata_stream, path);
@@ -1012,14 +974,12 @@ static int snapshot_metadata(struct lttng_consumer_channel *metadata_channel,
                ret = consumer_stream_create_output_files(metadata_stream,
                                false);
        }
-       pthread_mutex_unlock(&metadata_stream->lock);
        if (ret < 0) {
                goto error_stream;
        }
 
        do {
                health_code_update();
-
                ret = lttng_consumer_read_subbuffer(metadata_stream, ctx, true);
                if (ret < 0) {
                        goto error_stream;
@@ -1027,12 +987,12 @@ static int snapshot_metadata(struct lttng_consumer_channel *metadata_channel,
        } while (ret > 0);
 
 error_stream:
+       metadata_stream->read_subbuffer_ops.unlock(metadata_stream);
        /*
-        * Clean up the stream completly because the next snapshot will use a new
-        * metadata stream.
+        * Clean up the stream completely because the next snapshot will use a
+        * new metadata stream.
         */
        consumer_stream_destroy(metadata_stream, NULL);
-       cds_list_del(&metadata_stream->send_node);
        metadata_channel->metadata_stream = NULL;
 
 error:
@@ -1087,6 +1047,7 @@ static int snapshot_channel(struct lttng_consumer_channel *channel,
 
        LTTNG_ASSERT(path);
        LTTNG_ASSERT(ctx);
+       ASSERT_RCU_READ_LOCKED();
 
        rcu_read_lock();
 
@@ -1285,7 +1246,7 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
 
        DBG("UST consumer push metadata key %" PRIu64 " of len %" PRIu64, key, len);
 
-       metadata_str = (char *) zmalloc(len * sizeof(char));
+       metadata_str = calloc<char>(len);
        if (!metadata_str) {
                PERROR("zmalloc metadata string");
                ret_code = LTTCOMM_CONSUMERD_ENOMEM;
@@ -1323,10 +1284,21 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
                 * metadata position to ensure the metadata poll thread consumes
                 * the whole cache.
                 */
-               pthread_mutex_lock(&channel->metadata_stream->lock);
-               metadata_stream_reset_cache_consumed_position(
-                               channel->metadata_stream);
-               pthread_mutex_unlock(&channel->metadata_stream->lock);
+
+               /*
+                * channel::metadata_stream can be null when the metadata
+                * channel is under a snapshot session type. No need to update
+                * the stream position in that scenario.
+                */
+               if (channel->metadata_stream != NULL) {
+                       pthread_mutex_lock(&channel->metadata_stream->lock);
+                       metadata_stream_reset_cache_consumed_position(
+                                       channel->metadata_stream);
+                       pthread_mutex_unlock(&channel->metadata_stream->lock);
+               } else {
+                       /* Validate we are in snapshot mode. */
+                       LTTNG_ASSERT(!channel->monitor);
+               }
                /* Fall-through. */
        case CONSUMER_METADATA_CACHE_WRITE_STATUS_APPENDED_CONTENT:
                /*
@@ -1417,11 +1389,18 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        switch (msg.cmd_type) {
        case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
        {
+               uint32_t major = msg.u.relayd_sock.major;
+               uint32_t minor = msg.u.relayd_sock.minor;
+               enum lttcomm_sock_proto protocol =
+                               (enum lttcomm_sock_proto) msg.u.relayd_sock
+                                               .relayd_socket_protocol;
+
                /* Session daemon status message are handled in the following call. */
                consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
-                               msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll,
-                               &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id,
-                               msg.u.relayd_sock.relayd_session_id);
+                               msg.u.relayd_sock.type, ctx, sock,
+                               consumer_sockpoll, msg.u.relayd_sock.session_id,
+                               msg.u.relayd_sock.relayd_session_id, major,
+                               minor, protocol);
                goto end_nosignal;
        }
        case LTTNG_CONSUMER_DESTROY_RELAYD:
@@ -2046,8 +2025,7 @@ error_push_metadata_fatal:
                         */
                        rotate_channel = lttng_consumer_rotate_channel(
                                        found_channel, key,
-                                       msg.u.rotate_channel.relayd_id,
-                                       msg.u.rotate_channel.metadata, ctx);
+                                       msg.u.rotate_channel.relayd_id);
                        if (rotate_channel < 0) {
                                ERR("Rotate channel failed");
                                ret_code = LTTCOMM_CONSUMERD_ROTATION_FAIL;
@@ -2074,8 +2052,7 @@ error_push_metadata_fatal:
 
                        ret_rotate_read_streams =
                                        lttng_consumer_rotate_ready_streams(
-                                                       found_channel, key,
-                                                       ctx);
+                                                       found_channel, key);
                        if (ret_rotate_read_streams < 0) {
                                ERR("Rotate channel failed");
                        }
@@ -2116,9 +2093,11 @@ end_rotate_channel_nosignal:
        case LTTNG_CONSUMER_INIT:
        {
                int ret_send_status;
+               lttng_uuid sessiond_uuid;
 
-               ret_code = lttng_consumer_init_command(ctx,
-                               msg.u.init.sessiond_uuid);
+               std::copy(std::begin(msg.u.init.sessiond_uuid), std::end(msg.u.init.sessiond_uuid),
+                               sessiond_uuid.begin());
+               ret_code = lttng_consumer_init_command(ctx, sessiond_uuid);
                health_code_update();
                ret_send_status = consumer_send_status_msg(sock, ret_code);
                if (ret_send_status < 0) {
@@ -2298,11 +2277,7 @@ end_msg_sessiond:
 
 end_channel_error:
        if (channel) {
-               /*
-                * Free channel here since no one has a reference to it. We don't
-                * free after that because a stream can store this pointer.
-                */
-               destroy_channel(channel);
+               consumer_del_channel(channel);
        }
        /* We have to send a status channel message indicating an error. */
        {
@@ -2448,8 +2423,9 @@ void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream)
                        stream->quiescent = true;
                }
        }
-       pthread_mutex_unlock(&stream->lock);
+
        stream->hangup_flush_done = 1;
+       pthread_mutex_unlock(&stream->lock);
 }
 
 void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan)
@@ -2639,6 +2615,7 @@ enum sync_metadata_status lttng_ustconsumer_sync_metadata(
 
        LTTNG_ASSERT(ctx);
        LTTNG_ASSERT(metadata_stream);
+       ASSERT_RCU_READ_LOCKED();
 
        metadata_channel = metadata_stream->chan;
        pthread_mutex_unlock(&metadata_stream->lock);
@@ -3049,7 +3026,7 @@ end:
 }
 
 static int put_next_subbuffer(struct lttng_consumer_stream *stream,
-               struct stream_subbuffer *subbuffer)
+               struct stream_subbuffer *subbuffer __attribute__((unused)))
 {
        const int ret = lttng_ust_ctl_put_next_subbuf(stream->ustream);
 
@@ -3058,7 +3035,7 @@ static int put_next_subbuffer(struct lttng_consumer_stream *stream,
 }
 
 static int signal_metadata(struct lttng_consumer_stream *stream,
-               struct lttng_consumer_local_data *ctx)
+               struct lttng_consumer_local_data *ctx __attribute__((unused)))
 {
        ASSERT_LOCKED(stream->metadata_rdv_lock);
        return pthread_cond_broadcast(&stream->metadata_rdv) ? -errno : 0;
This page took 0.028055 seconds and 4 git commands to generate.