Fix: consumer: snapshot: assertion on subsequent snapshot
[lttng-tools.git] / src / common / ust-consumer / ust-consumer.c
index 88520dbe260fbfd14a27d1431b3bc0df616e86e7..d274904e871eed0b68b22b1bcc57214e682f0bd9 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 2011 Julien Desfossez <julien.desfossez@polymtl.ca>
+ * Copyright (C) 2011 EfficiOS Inc.
  * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
  * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
  *
@@ -64,7 +64,7 @@ static void destroy_channel(struct lttng_consumer_channel *channel)
 
                health_code_update();
 
-               cds_list_del(&stream->send_node);
+               cds_list_del_init(&stream->send_node);
                ustctl_destroy_stream(stream->ustream);
                lttng_trace_chunk_put(stream->trace_chunk);
                free(stream);
@@ -200,7 +200,7 @@ static int send_stream_to_thread(struct lttng_consumer_stream *stream,
         * global.
         */
        stream->globally_visible = 1;
-       cds_list_del(&stream->send_node);
+       cds_list_del_init(&stream->send_node);
 
        ret = lttng_pipe_write(stream_pipe, &stream, sizeof(stream));
        if (ret < 0) {
@@ -975,7 +975,6 @@ error:
         * will make sure to clean that list.
         */
        consumer_stream_destroy(metadata->metadata_stream, NULL);
-       cds_list_del(&metadata->metadata_stream->send_node);
        metadata->metadata_stream = NULL;
 send_streams_error:
 error_no_stream:
@@ -1031,7 +1030,7 @@ static int snapshot_metadata(struct lttng_consumer_channel *metadata_channel,
        metadata_stream = metadata_channel->metadata_stream;
        assert(metadata_stream);
 
-       pthread_mutex_lock(&metadata_stream->lock);
+       metadata_stream->read_subbuffer_ops.lock(metadata_stream);
        if (relayd_id != (uint64_t) -1ULL) {
                metadata_stream->net_seq_idx = relayd_id;
                ret = consumer_send_relayd_stream(metadata_stream, path);
@@ -1039,14 +1038,12 @@ static int snapshot_metadata(struct lttng_consumer_channel *metadata_channel,
                ret = consumer_stream_create_output_files(metadata_stream,
                                false);
        }
-       pthread_mutex_unlock(&metadata_stream->lock);
        if (ret < 0) {
                goto error_stream;
        }
 
        do {
                health_code_update();
-
                ret = lttng_consumer_read_subbuffer(metadata_stream, ctx, true);
                if (ret < 0) {
                        goto error_stream;
@@ -1054,12 +1051,12 @@ static int snapshot_metadata(struct lttng_consumer_channel *metadata_channel,
        } while (ret > 0);
 
 error_stream:
+       metadata_stream->read_subbuffer_ops.unlock(metadata_stream);
        /*
-        * Clean up the stream completly because the next snapshot will use a new
-        * metadata stream.
+        * Clean up the stream completely because the next snapshot will use a
+        * new metadata stream.
         */
        consumer_stream_destroy(metadata_stream, NULL);
-       cds_list_del(&metadata_stream->send_node);
        metadata_channel->metadata_stream = NULL;
 
 error:
@@ -1147,13 +1144,13 @@ static int snapshot_channel(struct lttng_consumer_channel *channel,
                if (use_relayd) {
                        ret = consumer_send_relayd_stream(stream, path);
                        if (ret < 0) {
-                               goto error_unlock;
+                               goto error_close_stream;
                        }
                } else {
                        ret = consumer_stream_create_output_files(stream,
                                        false);
                        if (ret < 0) {
-                               goto error_unlock;
+                               goto error_close_stream;
                        }
                        DBG("UST consumer snapshot stream (%" PRIu64 ")",
                                        stream->key);
@@ -1170,19 +1167,19 @@ static int snapshot_channel(struct lttng_consumer_channel *channel,
                ret = lttng_ustconsumer_take_snapshot(stream);
                if (ret < 0) {
                        ERR("Taking UST snapshot");
-                       goto error_unlock;
+                       goto error_close_stream;
                }
 
                ret = lttng_ustconsumer_get_produced_snapshot(stream, &produced_pos);
                if (ret < 0) {
                        ERR("Produced UST snapshot position");
-                       goto error_unlock;
+                       goto error_close_stream;
                }
 
                ret = lttng_ustconsumer_get_consumed_snapshot(stream, &consumed_pos);
                if (ret < 0) {
                        ERR("Consumerd UST snapshot position");
-                       goto error_unlock;
+                       goto error_close_stream;
                }
 
                /*
@@ -1278,6 +1275,17 @@ error_unlock:
        return ret;
 }
 
+static
+void metadata_stream_reset_cache_consumed_position(
+               struct lttng_consumer_stream *stream)
+{
+       ASSERT_LOCKED(stream->lock);
+
+       DBG("Reset metadata cache of session %" PRIu64,
+                       stream->chan->session_id);
+       stream->ust_metadata_pushed = 0;
+}
+
 /*
  * Receive the metadata updates from the sessiond. Supports receiving
  * overlapping metadata, but is needs to always belong to a contiguous
@@ -1292,6 +1300,7 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
 {
        int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS;
        char *metadata_str;
+       enum consumer_metadata_cache_write_status cache_write_status;
 
        DBG("UST consumer push metadata key %" PRIu64 " of len %" PRIu64, key, len);
 
@@ -1315,10 +1324,48 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
        health_code_update();
 
        pthread_mutex_lock(&channel->metadata_cache->lock);
-       ret = consumer_metadata_cache_write(channel, offset, len, version,
-                       metadata_str);
+       cache_write_status = consumer_metadata_cache_write(
+                       channel, offset, len, version, metadata_str);
        pthread_mutex_unlock(&channel->metadata_cache->lock);
-       if (ret < 0) {
+       switch (cache_write_status) {
+       case CONSUMER_METADATA_CACHE_WRITE_STATUS_NO_CHANGE:
+               /*
+                * The write entirely overlapped with existing contents of the
+                * same metadata version (same content); there is nothing to do.
+                */
+               break;
+       case CONSUMER_METADATA_CACHE_WRITE_STATUS_INVALIDATED:
+               /*
+                * The metadata cache was invalidated (previously pushed
+                * content has been overwritten). Reset the stream's consumed
+                * metadata position to ensure the metadata poll thread consumes
+                * the whole cache.
+                */
+
+               /*
+                * channel::metadata_stream can be null when the metadata
+                * channel is under a snapshot session type. No need to update
+                * the stream position in that scenario.
+                */
+               if (channel->metadata_stream != NULL) {
+                       pthread_mutex_lock(&channel->metadata_stream->lock);
+                       metadata_stream_reset_cache_consumed_position(
+                                       channel->metadata_stream);
+                       pthread_mutex_unlock(&channel->metadata_stream->lock);
+               }
+               /* Fall-through. */
+       case CONSUMER_METADATA_CACHE_WRITE_STATUS_APPENDED_CONTENT:
+               /*
+                * In both cases, the metadata poll thread has new data to
+                * consume.
+                */
+               ret = consumer_metadata_wakeup_pipe(channel);
+               if (ret) {
+                       ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA;
+                       goto end_free;
+               }
+               break;
+       case CONSUMER_METADATA_CACHE_WRITE_STATUS_ERROR:
                /* Unable to handle metadata. Notify session daemon. */
                ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA;
                /*
@@ -1327,6 +1374,8 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
                 * waiting for the metadata cache to be flushed.
                 */
                goto end_free;
+       default:
+               abort();
        }
 
        if (!wait) {
@@ -1389,11 +1438,18 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        switch (msg.cmd_type) {
        case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
        {
+               uint32_t major = msg.u.relayd_sock.major;
+               uint32_t minor = msg.u.relayd_sock.minor;
+               enum lttcomm_sock_proto protocol =
+                               (enum lttcomm_sock_proto) msg.u.relayd_sock
+                                               .relayd_socket_protocol;
+
                /* Session daemon status message are handled in the following call. */
                consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
-                               msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll,
-                               &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id,
-                               msg.u.relayd_sock.relayd_session_id);
+                               msg.u.relayd_sock.type, ctx, sock,
+                               consumer_sockpoll, msg.u.relayd_sock.session_id,
+                               msg.u.relayd_sock.relayd_session_id, major,
+                               minor, protocol);
                goto end_nosignal;
        }
        case LTTNG_CONSUMER_DESTROY_RELAYD:
@@ -2142,7 +2198,7 @@ end_rotate_channel_nosignal:
                const uint64_t relayd_id =
                                msg.u.close_trace_chunk.relayd_id.value;
                struct lttcomm_consumer_close_trace_chunk_reply reply;
-               char closed_trace_chunk_path[LTTNG_PATH_MAX];
+               char closed_trace_chunk_path[LTTNG_PATH_MAX] = {};
                int ret;
 
                ret_code = lttng_consumer_close_trace_chunk(
@@ -2371,8 +2427,9 @@ void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream)
                ustctl_flush_buffer(stream->ustream, 0);
                stream->quiescent = true;
        }
-       pthread_mutex_unlock(&stream->lock);
+
        stream->hangup_flush_done = 1;
+       pthread_mutex_unlock(&stream->lock);
 }
 
 void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan)
@@ -2455,15 +2512,6 @@ int lttng_ustconsumer_close_wakeup_fd(struct lttng_consumer_stream *stream)
        return ustctl_stream_close_wakeup_fd(stream->ustream);
 }
 
-static
-void metadata_stream_reset_cache_consumed_position(
-               struct lttng_consumer_stream *stream)
-{
-       DBG("Reset metadata cache of session %" PRIu64,
-                       stream->chan->session_id);
-       stream->ust_metadata_pushed = 0;
-}
-
 /*
  * Write up to one packet from the metadata cache to the channel.
  *
@@ -3041,6 +3089,7 @@ int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream)
 
        assert(stream);
        assert(stream->ustream);
+       ASSERT_LOCKED(stream->lock);
 
        DBG("UST consumer checking data pending");
 
This page took 0.02721 seconds and 4 git commands to generate.