Add UST snapshot support
[lttng-tools.git] / src / common / ust-consumer / ust-consumer.c
index 18eb507dec003a08381f6c6a17f6da9997769850..f318af153ca3aca1dcdd1573590d7447de61490c 100644 (file)
@@ -37,6 +37,7 @@
 #include <common/relayd/relayd.h>
 #include <common/compat/fcntl.h>
 #include <common/consumer-metadata-cache.h>
+#include <common/consumer-stream.h>
 #include <common/consumer-timer.h>
 #include <common/utils.h>
 
@@ -284,7 +285,9 @@ static int create_ust_streams(struct lttng_consumer_channel *channel,
                 * Increment channel refcount since the channel reference has now been
                 * assigned in the allocation process above.
                 */
-               uatomic_inc(&stream->chan->refcount);
+               if (stream->chan->monitor) {
+                       uatomic_inc(&stream->chan->refcount);
+               }
 
                /*
                 * Order is important this is why a list is used. On error, the caller
@@ -503,18 +506,27 @@ static int ask_channel(struct lttng_consumer_local_data *ctx, int sock,
        /* The reply msg status is handled in the following call. */
        ret = create_ust_channel(attr, &channel->uchan);
        if (ret < 0) {
-               goto error;
+               goto end;
        }
 
        channel->wait_fd = ustctl_channel_get_wait_fd(channel->uchan);
 
+       /*
+        * For the snapshots (no monitor), we create the metadata streams
+        * on demand, not during the channel creation.
+        */
+       if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA && !channel->monitor) {
+               ret = 0;
+               goto end;
+       }
+
        /* Open all streams for this channel. */
        ret = create_ust_streams(channel, ctx);
        if (ret < 0) {
-               goto error;
+               goto end;
        }
 
-error:
+end:
        return ret;
 }
 
@@ -694,7 +706,17 @@ static int setup_metadata(struct lttng_consumer_local_data *ctx, uint64_t key)
        if (!metadata) {
                ERR("UST consumer push metadata %" PRIu64 " not found", key);
                ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
-               goto error_find;
+               goto end;
+       }
+
+       /*
+        * In no monitor mode, the metadata channel has no stream(s) so skip the
+        * ownership transfer to the metadata thread.
+        */
+       if (!metadata->monitor) {
+               DBG("Metadata channel in no monitor");
+               ret = 0;
+               goto end;
        }
 
        /*
@@ -726,7 +748,8 @@ static int setup_metadata(struct lttng_consumer_local_data *ctx, uint64_t key)
        /* List MUST be empty after or else it could be reused. */
        assert(cds_list_empty(&metadata->streams.head));
 
-       return 0;
+       ret = 0;
+       goto end;
 
 error:
        /*
@@ -736,7 +759,268 @@ error:
         * will make sure to clean that list.
         */
        consumer_del_channel(metadata);
-error_find:
+end:
+       return ret;
+}
+
+/*
+ * Snapshot the whole metadata.
+ *
+ * Returns 0 on success, < 0 on error
+ */
+static int snapshot_metadata(uint64_t key, char *path, uint64_t relayd_id,
+               struct lttng_consumer_local_data *ctx)
+{
+       int ret = 0;
+       ssize_t write_len;
+       uint64_t total_len = 0;
+       struct lttng_consumer_channel *metadata_channel;
+       struct lttng_consumer_stream *metadata_stream;
+
+       assert(path);
+       assert(ctx);
+
+       DBG("UST consumer snapshot metadata with key %" PRIu64 " at path %s",
+                       key, path);
+
+       rcu_read_lock();
+
+       metadata_channel = consumer_find_channel(key);
+       if (!metadata_channel) {
+               ERR("UST snapshot metadata channel not found for key %lu", key);
+               ret = -1;
+               goto error;
+       }
+       assert(!metadata_channel->monitor);
+
+       /*
+        * Ask the sessiond if we have new metadata waiting and update the
+        * consumer metadata cache.
+        */
+       ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel);
+       if (ret < 0) {
+               goto error;
+       }
+
+       /*
+        * The metadata stream is NOT created in no monitor mode when the channel
+        * is created on a sessiond ask channel command.
+        */
+       ret = create_ust_streams(metadata_channel, ctx);
+       if (ret < 0) {
+               goto error;
+       }
+
+       metadata_stream = metadata_channel->metadata_stream;
+       assert(metadata_stream);
+
+       if (relayd_id != (uint64_t) -1ULL) {
+               metadata_stream->net_seq_idx = relayd_id;
+               ret = consumer_send_relayd_stream(metadata_stream, path);
+               if (ret < 0) {
+                       goto error_stream;
+               }
+       } else {
+               ret = utils_create_stream_file(path, metadata_stream->name,
+                               metadata_stream->chan->tracefile_size,
+                               metadata_stream->tracefile_count_current,
+                               metadata_stream->uid, metadata_stream->gid);
+               if (ret < 0) {
+                       goto error_stream;
+               }
+               metadata_stream->out_fd = ret;
+               metadata_stream->tracefile_size_current = 0;
+       }
+
+       pthread_mutex_lock(&metadata_channel->metadata_cache->lock);
+       while (total_len < metadata_channel->metadata_cache->total_bytes_written) {
+               /*
+                * Write at most one packet of metadata into the channel
+                * to avoid blocking here.
+                */
+               write_len = ustctl_write_one_packet_to_channel(metadata_channel->uchan,
+                               metadata_channel->metadata_cache->data,
+                               metadata_channel->metadata_cache->total_bytes_written);
+               if (write_len < 0) {
+                       ERR("UST consumer snapshot writing metadata packet");
+                       ret = -1;
+                       goto error_unlock;
+               }
+               total_len += write_len;
+
+               DBG("Written %" PRIu64 " bytes to metadata (left: %" PRIu64 ")",
+                               write_len,
+                               metadata_channel->metadata_cache->total_bytes_written - write_len);
+               ustctl_flush_buffer(metadata_stream->ustream, 1);
+               ret = lttng_consumer_read_subbuffer(metadata_stream, ctx);
+               if (ret < 0) {
+                       goto error_unlock;
+               }
+       }
+
+error_unlock:
+       pthread_mutex_unlock(&metadata_channel->metadata_cache->lock);
+
+error_stream:
+       /*
+        * Clean up the stream completly because the next snapshot will use a new
+        * metadata stream.
+        */
+       cds_list_del(&metadata_stream->send_node);
+       consumer_stream_destroy(metadata_stream, NULL);
+       metadata_channel->metadata_stream = NULL;
+
+error:
+       rcu_read_unlock();
+       return ret;
+}
+
+/*
+ * Take a snapshot of all the stream of a channel.
+ *
+ * Returns 0 on success, < 0 on error
+ */
+static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id,
+               struct lttng_consumer_local_data *ctx)
+{
+       int ret;
+       unsigned use_relayd = 0;
+       unsigned long consumed_pos, produced_pos;
+       struct lttng_consumer_channel *channel;
+       struct lttng_consumer_stream *stream;
+
+       assert(path);
+       assert(ctx);
+
+       rcu_read_lock();
+
+       if (relayd_id != (uint64_t) -1ULL) {
+               use_relayd = 1;
+       }
+
+       channel = consumer_find_channel(key);
+       if (!channel) {
+               ERR("UST snapshot channel not found for key %lu", key);
+               ret = -1;
+               goto error;
+       }
+       assert(!channel->monitor);
+       DBG("UST consumer snapshot channel %lu", key);
+
+       cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
+               /* Lock stream because we are about to change its state. */
+               pthread_mutex_lock(&stream->lock);
+               stream->net_seq_idx = relayd_id;
+
+               if (use_relayd) {
+                       ret = consumer_send_relayd_stream(stream, path);
+                       if (ret < 0) {
+                               goto error_unlock;
+                       }
+               } else {
+                       ret = utils_create_stream_file(path, stream->name,
+                                       stream->chan->tracefile_size,
+                                       stream->tracefile_count_current,
+                                       stream->uid, stream->gid);
+                       if (ret < 0) {
+                               goto error_unlock;
+                       }
+                       stream->out_fd = ret;
+                       stream->tracefile_size_current = 0;
+
+                       DBG("UST consumer snapshot stream %s/%s (%" PRIu64 ")", path,
+                                       stream->name, stream->key);
+               }
+
+               ustctl_flush_buffer(stream->ustream, 1);
+
+               ret = lttng_ustconsumer_take_snapshot(stream);
+               if (ret < 0) {
+                       ERR("Taking UST snapshot");
+                       goto error_unlock;
+               }
+
+               ret = lttng_ustconsumer_get_produced_snapshot(stream, &produced_pos);
+               if (ret < 0) {
+                       ERR("Produced UST snapshot position");
+                       goto error_unlock;
+               }
+
+               ret = lttng_ustconsumer_get_consumed_snapshot(stream, &consumed_pos);
+               if (ret < 0) {
+                       ERR("Consumerd UST snapshot position");
+                       goto error_unlock;
+               }
+
+               while (consumed_pos < produced_pos) {
+                       ssize_t read_len;
+                       unsigned long len, padded_len;
+
+                       DBG("UST consumer taking snapshot at pos %lu", consumed_pos);
+
+                       ret = ustctl_get_subbuf(stream->ustream, &consumed_pos);
+                       if (ret < 0) {
+                               if (ret != -EAGAIN) {
+                                       PERROR("ustctl_get_subbuf snapshot");
+                                       goto error_close_stream;
+                               }
+                               DBG("UST consumer get subbuf failed. Skipping it.");
+                               consumed_pos += stream->max_sb_size;
+                               continue;
+                       }
+
+                       ret = ustctl_get_subbuf_size(stream->ustream, &len);
+                       if (ret < 0) {
+                               ERR("Snapshot ustctl_get_subbuf_size");
+                               goto error_put_subbuf;
+                       }
+
+                       ret = ustctl_get_padded_subbuf_size(stream->ustream, &padded_len);
+                       if (ret < 0) {
+                               ERR("Snapshot ustctl_get_padded_subbuf_size");
+                               goto error_put_subbuf;
+                       }
+
+                       read_len = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len,
+                                       padded_len - len);
+                       if (use_relayd) {
+                               if (read_len != len) {
+                                       ret = -1;
+                                       goto error_put_subbuf;
+                               }
+                       } else {
+                               if (read_len != padded_len) {
+                                       ret = -1;
+                                       goto error_put_subbuf;
+                               }
+                       }
+
+                       ret = ustctl_put_subbuf(stream->ustream);
+                       if (ret < 0) {
+                               ERR("Snapshot ustctl_put_subbuf");
+                               goto error_close_stream;
+                       }
+                       consumed_pos += stream->max_sb_size;
+               }
+
+               /* Simply close the stream so we can use it on the next snapshot. */
+               consumer_stream_close(stream);
+               pthread_mutex_unlock(&stream->lock);
+       }
+
+       rcu_read_unlock();
+       return 0;
+
+error_put_subbuf:
+       if (ustctl_put_subbuf(stream->ustream) < 0) {
+               ERR("Snapshot ustctl_put_subbuf");
+       }
+error_close_stream:
+       consumer_stream_close(stream);
+error_unlock:
+       pthread_mutex_unlock(&stream->lock);
+error:
+       rcu_read_unlock();
        return ret;
 }
 
@@ -940,6 +1224,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                attr.switch_timer_interval = msg.u.ask_channel.switch_timer_interval;
                attr.read_timer_interval = msg.u.ask_channel.read_timer_interval;
                attr.chan_id = msg.u.ask_channel.chan_id;
+               attr.output = msg.u.ask_channel.output;
                memcpy(attr.uuid, msg.u.ask_channel.uuid, sizeof(attr.uuid));
 
                /* Translate and save channel type. */
@@ -1043,6 +1328,14 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        goto error_fatal;
                }
 
+               /*
+                * In no monitor mode, the streams ownership is kept inside the channel
+                * so don't send them to the data thread.
+                */
+               if (!channel->monitor) {
+                       goto end_msg_sessiond;
+               }
+
                ret = send_streams_to_thread(channel, ctx);
                if (ret < 0) {
                        /*
@@ -1053,7 +1346,6 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                }
                /* List MUST be empty after or else it could be reused. */
                assert(cds_list_empty(&channel->streams.head));
-
                goto end_msg_sessiond;
        }
        case LTTNG_CONSUMER_DESTROY_CHANNEL:
@@ -1142,6 +1434,26 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        }
        case LTTNG_CONSUMER_SNAPSHOT_CHANNEL:
        {
+               if (msg.u.snapshot_channel.metadata) {
+                       ret = snapshot_metadata(msg.u.snapshot_channel.key,
+                                       msg.u.snapshot_channel.pathname,
+                                       msg.u.snapshot_channel.relayd_id,
+                                       ctx);
+                       if (ret < 0) {
+                               ERR("Snapshot metadata failed");
+                               ret_code = LTTNG_ERR_UST_META_FAIL;
+                       }
+               } else {
+                       ret = snapshot_channel(msg.u.snapshot_channel.key,
+                                       msg.u.snapshot_channel.pathname,
+                                       msg.u.snapshot_channel.relayd_id,
+                                       ctx);
+                       if (ret < 0) {
+                               ERR("Snapshot channel failed");
+                               ret_code = LTTNG_ERR_UST_CHAN_FAIL;
+                       }
+               }
+
                ret = consumer_send_status_msg(sock, ret_code);
                if (ret < 0) {
                        /* Somehow, the session daemon is not responding anymore. */
@@ -1249,6 +1561,21 @@ int lttng_ustconsumer_get_produced_snapshot(
        return ustctl_snapshot_get_produced(stream->ustream, pos);
 }
 
+/*
+ * Get the consumed position
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int lttng_ustconsumer_get_consumed_snapshot(
+               struct lttng_consumer_stream *stream, unsigned long *pos)
+{
+       assert(stream);
+       assert(stream->ustream);
+       assert(pos);
+
+       return ustctl_snapshot_get_consumed(stream->ustream, pos);
+}
+
 /*
  * Called when the stream signal the consumer that it has hang up.
  */
@@ -1379,8 +1706,10 @@ int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
 {
        int ret;
 
+       assert(stream);
+
        /* Don't create anything if this is set for streaming. */
-       if (stream->net_seq_idx == (uint64_t) -1ULL) {
+       if (stream->net_seq_idx == (uint64_t) -1ULL && stream->chan->monitor) {
                ret = utils_create_stream_file(stream->chan->pathname, stream->name,
                                stream->chan->tracefile_size, stream->tracefile_count_current,
                                stream->uid, stream->gid);
This page took 0.027623 seconds and 4 git commands to generate.