Fix: consumer: snapshot: assertion on subsequent snapshot
[lttng-tools.git] / src / common / ust-consumer / ust-consumer.cpp
index aeafae642b554e1aafc825705b8be25bf1437ebf..b08cafdff9da6c217fb2702a7ad0e55379514de1 100644 (file)
 #include <stdbool.h>
 #include <stdint.h>
 
-#include <bin/lttng-consumerd/health-consumerd.h>
-#include <common/common.h>
-#include <common/sessiond-comm/sessiond-comm.h>
-#include <common/relayd/relayd.h>
-#include <common/compat/fcntl.h>
-#include <common/compat/endian.h>
-#include <common/consumer/consumer-metadata-cache.h>
-#include <common/consumer/consumer-stream.h>
-#include <common/consumer/consumer-timer.h>
-#include <common/utils.h>
-#include <common/index/index.h>
-#include <common/consumer/consumer.h>
-#include <common/shm.h>
-#include <common/optional.h>
-
-#include "ust-consumer.h"
+#include <bin/lttng-consumerd/health-consumerd.hpp>
+#include <common/common.hpp>
+#include <common/sessiond-comm/sessiond-comm.hpp>
+#include <common/relayd/relayd.hpp>
+#include <common/compat/fcntl.hpp>
+#include <common/compat/endian.hpp>
+#include <common/consumer/consumer-metadata-cache.hpp>
+#include <common/consumer/consumer-stream.hpp>
+#include <common/consumer/consumer-timer.hpp>
+#include <common/utils.hpp>
+#include <common/index/index.hpp>
+#include <common/consumer/consumer.hpp>
+#include <common/shm.hpp>
+#include <common/optional.hpp>
+
+#include "ust-consumer.hpp"
 
 #define INT_MAX_STR_LEN 12     /* includes \0 */
 
@@ -49,46 +49,6 @@ extern int consumer_poll_timeout;
 
 LTTNG_EXPORT DEFINE_LTTNG_UST_SIGBUS_STATE();
 
-/*
- * Free channel object and all streams associated with it. This MUST be used
- * only and only if the channel has _NEVER_ been added to the global channel
- * hash table.
- */
-static void destroy_channel(struct lttng_consumer_channel *channel)
-{
-       struct lttng_consumer_stream *stream, *stmp;
-
-       LTTNG_ASSERT(channel);
-
-       DBG("UST consumer cleaning stream list");
-
-       cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
-                       send_node) {
-
-               health_code_update();
-
-               cds_list_del_init(&stream->send_node);
-               lttng_ust_ctl_destroy_stream(stream->ustream);
-               lttng_trace_chunk_put(stream->trace_chunk);
-               free(stream);
-       }
-
-       /*
-        * If a channel is available meaning that was created before the streams
-        * were, delete it.
-        */
-       if (channel->uchan) {
-               lttng_ustconsumer_del_channel(channel);
-               lttng_ustconsumer_free_channel(channel);
-       }
-
-       if (channel->trace_chunk) {
-               lttng_trace_chunk_put(channel->trace_chunk);
-       }
-
-       free(channel);
-}
-
 /*
  * Add channel to internal consumer state.
  *
@@ -407,7 +367,7 @@ static int create_ust_channel(struct lttng_consumer_channel *channel,
                nr_stream_fds = 1;
        else
                nr_stream_fds = lttng_ust_ctl_get_nr_stream_per_channel();
-       stream_fds = (int *) zmalloc(nr_stream_fds * sizeof(*stream_fds));
+       stream_fds = calloc<int>(nr_stream_fds);
        if (!stream_fds) {
                ret = -1;
                goto error_alloc;
@@ -742,6 +702,14 @@ static int flush_channel(uint64_t chan_key)
 next:
                pthread_mutex_unlock(&stream->lock);
        }
+
+       /*
+        * Send one last buffer statistics update to the session daemon. This
+        * ensures that the session daemon gets at least one statistics update
+        * per channel even in the case of short-lived channels, such as when a
+        * short-lived app is traced in per-pid mode.
+        */
+       sample_and_send_channel_buffer_stats(channel);
 error:
        rcu_read_unlock();
        return ret;
@@ -1121,13 +1089,13 @@ static int snapshot_channel(struct lttng_consumer_channel *channel,
                if (use_relayd) {
                        ret = consumer_send_relayd_stream(stream, path);
                        if (ret < 0) {
-                               goto error_unlock;
+                               goto error_close_stream;
                        }
                } else {
                        ret = consumer_stream_create_output_files(stream,
                                        false);
                        if (ret < 0) {
-                               goto error_unlock;
+                               goto error_close_stream;
                        }
                        DBG("UST consumer snapshot stream (%" PRIu64 ")",
                                        stream->key);
@@ -1149,19 +1117,19 @@ static int snapshot_channel(struct lttng_consumer_channel *channel,
                ret = lttng_ustconsumer_take_snapshot(stream);
                if (ret < 0) {
                        ERR("Taking UST snapshot");
-                       goto error_unlock;
+                       goto error_close_stream;
                }
 
                ret = lttng_ustconsumer_get_produced_snapshot(stream, &produced_pos);
                if (ret < 0) {
                        ERR("Produced UST snapshot position");
-                       goto error_unlock;
+                       goto error_close_stream;
                }
 
                ret = lttng_ustconsumer_get_consumed_snapshot(stream, &consumed_pos);
                if (ret < 0) {
                        ERR("Consumerd UST snapshot position");
-                       goto error_unlock;
+                       goto error_close_stream;
                }
 
                /*
@@ -1286,7 +1254,7 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
 
        DBG("UST consumer push metadata key %" PRIu64 " of len %" PRIu64, key, len);
 
-       metadata_str = (char *) zmalloc(len * sizeof(char));
+       metadata_str = calloc<char>(len);
        if (!metadata_str) {
                PERROR("zmalloc metadata string");
                ret_code = LTTCOMM_CONSUMERD_ENOMEM;
@@ -2133,9 +2101,11 @@ end_rotate_channel_nosignal:
        case LTTNG_CONSUMER_INIT:
        {
                int ret_send_status;
+               lttng_uuid sessiond_uuid;
 
-               ret_code = lttng_consumer_init_command(ctx,
-                               msg.u.init.sessiond_uuid);
+               std::copy(std::begin(msg.u.init.sessiond_uuid), std::end(msg.u.init.sessiond_uuid),
+                               sessiond_uuid.begin());
+               ret_code = lttng_consumer_init_command(ctx, sessiond_uuid);
                health_code_update();
                ret_send_status = consumer_send_status_msg(sock, ret_code);
                if (ret_send_status < 0) {
@@ -2315,11 +2285,7 @@ end_msg_sessiond:
 
 end_channel_error:
        if (channel) {
-               /*
-                * Free channel here since no one has a reference to it. We don't
-                * free after that because a stream can store this pointer.
-                */
-               destroy_channel(channel);
+               consumer_del_channel(channel);
        }
        /* We have to send a status channel message indicating an error. */
        {
@@ -2465,8 +2431,9 @@ void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream)
                        stream->quiescent = true;
                }
        }
-       pthread_mutex_unlock(&stream->lock);
+
        stream->hangup_flush_done = 1;
+       pthread_mutex_unlock(&stream->lock);
 }
 
 void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan)
This page took 0.026681 seconds and 4 git commands to generate.