Consumer rotate stream
authorJulien Desfossez <jdesfossez@efficios.com>
Thu, 14 Dec 2017 18:38:20 +0000 (13:38 -0500)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Wed, 14 Mar 2018 21:53:11 +0000 (17:53 -0400)
Perform the action of rotating a stream locally or send the command to
do it on the relayd. Rotating a stream file consists of:
 - closing the current tracefile and index,
 - opening a new tracefile and index in the new chunk folder,
 - resetting the stream rotation flags,
 - updating the counter of streams waiting for a rotation in a channel,

If the stream is a metadata stream, we also need to trigger the action
to re-dump the content of the metadata cache after the rotation has been
performed.

The caller of lttng_consumer_rotate_stream() always calls
consumer_post_rotation() after having released the stream lock to update
the counter of streams waiting for a rotation in a channel and notifying
the session daemon if this counter reaches 0.

Signed-off-by: Julien Desfossez <jdesfossez@efficios.com>
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
src/common/consumer/consumer.c
src/common/consumer/consumer.h
src/common/relayd/relayd.c
src/common/relayd/relayd.h
src/common/sessiond-comm/relayd.h

index 07de6757849f5a01e4c19d4bde771cca11ffe462..8e6b63b011436a06a80a455aad802d23276a4470 100644 (file)
@@ -2276,6 +2276,73 @@ static void validate_endpoint_status_metadata_stream(
        rcu_read_unlock();
 }
 
+static
+int rotate_notify_sessiond(struct lttng_consumer_local_data *ctx,
+               uint64_t key)
+{
+       ssize_t ret;
+
+       do {
+               ret = write(ctx->channel_rotate_pipe, &key, sizeof(key));
+       } while (ret == -1 && errno == EINTR);
+       if (ret == -1) {
+               PERROR("Failed to write to the channel rotation pipe");
+       } else {
+               DBG("Sent channel rotation notification for channel key %"
+                               PRIu64, key);
+               ret = 0;
+       }
+
+       return (int) ret;
+}
+
+/*
+ * Perform operations that need to be done after a stream has
+ * rotated and released the stream lock.
+ *
+ * Multiple rotations cannot occur simultaneously, so we know the state of the
+ * "rotated" stream flag cannot change.
+ *
+ * This MUST be called WITHOUT the stream lock held.
+ */
+static
+int consumer_post_rotation(struct lttng_consumer_stream *stream,
+               struct lttng_consumer_local_data *ctx)
+{
+       int ret = 0;
+
+       pthread_mutex_lock(&stream->chan->lock);
+
+       switch (consumer_data.type) {
+               case LTTNG_CONSUMER_KERNEL:
+                       break;
+               case LTTNG_CONSUMER32_UST:
+               case LTTNG_CONSUMER64_UST:
+                       /*
+                        * The ust_metadata_pushed counter has been reset to 0, so now
+                        * we can wakeup the metadata thread so it dumps the metadata
+                        * cache to the new file.
+                        */
+                       if (stream->metadata_flag) {
+                               consumer_metadata_wakeup_pipe(stream->chan);
+                       }
+                       break;
+               default:
+                       ERR("Unknown consumer_data type");
+                       abort();
+       }
+
+       if (--stream->chan->nr_stream_rotate_pending == 0) {
+               DBG("Rotation of channel \"%s\" completed, notifying the session daemon",
+                               stream->chan->name);
+               ret = rotate_notify_sessiond(ctx, stream->chan->key);
+       }
+       assert(stream->chan->nr_stream_rotate_pending >= 0);
+       pthread_mutex_unlock(&stream->chan->lock);
+
+       return ret;
+}
+
 /*
  * Thread polls on metadata file descriptor and write them on disk or on the
  * network.
@@ -3836,6 +3903,167 @@ unsigned long consumer_get_consume_start_pos(unsigned long consumed_pos,
        return start_pos;
 }
 
+/*
+ * Reset the state for a stream after a rotation occurred.
+ */
+void lttng_consumer_reset_stream_rotate_state(struct lttng_consumer_stream *stream)
+{
+       stream->rotate_position = 0;
+       stream->rotate_ready = false;
+}
+
+/*
+ * Perform the rotation a local stream file.
+ */
+int rotate_local_stream(struct lttng_consumer_local_data *ctx,
+               struct lttng_consumer_stream *stream)
+{
+       int ret;
+
+       DBG("Rotate local stream: stream key %" PRIu64 ", channel key %" PRIu64 " at path %s",
+                       stream->key,
+                       stream->chan->key,
+                       stream->channel_read_only_attributes.path);
+
+       ret = close(stream->out_fd);
+       if (ret < 0) {
+               PERROR("Closing trace file (fd %d), stream %" PRIu64,
+                               stream->out_fd, stream->key);
+               assert(0);
+               goto error;
+       }
+
+       ret = utils_create_stream_file(
+                       stream->channel_read_only_attributes.path,
+                       stream->name,
+                       stream->channel_read_only_attributes.tracefile_size,
+                       stream->tracefile_count_current,
+                       stream->uid, stream->gid, NULL);
+       if (ret < 0) {
+               ERR("Rotate create stream file");
+               goto error;
+       }
+       stream->out_fd = ret;
+       stream->tracefile_size_current = 0;
+
+       if (!stream->metadata_flag) {
+               struct lttng_index_file *index_file;
+
+               lttng_index_file_put(stream->index_file);
+
+               index_file = lttng_index_file_create(
+                               stream->channel_read_only_attributes.path,
+                               stream->name, stream->uid, stream->gid,
+                               stream->channel_read_only_attributes.tracefile_size,
+                               stream->tracefile_count_current,
+                               CTF_INDEX_MAJOR, CTF_INDEX_MINOR);
+               if (!index_file) {
+                       ERR("Create index file during rotation");
+                       goto error;
+               }
+               stream->index_file = index_file;
+               stream->out_fd_offset = 0;
+       }
+
+       ret = 0;
+       goto end;
+
+error:
+       ret = -1;
+end:
+       return ret;
+
+}
+
+/*
+ * Perform the rotation a stream file on the relay.
+ */
+int rotate_relay_stream(struct lttng_consumer_local_data *ctx,
+               struct lttng_consumer_stream *stream)
+{
+       int ret;
+       struct consumer_relayd_sock_pair *relayd;
+
+       DBG("Rotate relay stream");
+       relayd = consumer_find_relayd(stream->net_seq_idx);
+       if (!relayd) {
+               ERR("Failed to find relayd");
+               ret = -1;
+               goto end;
+       }
+
+       pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+       ret = relayd_rotate_stream(&relayd->control_sock,
+                       stream->relayd_stream_id,
+                       stream->channel_read_only_attributes.path,
+                       stream->chan->current_chunk_id,
+                       stream->last_sequence_number);
+       pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+       if (ret) {
+               ERR("Rotate relay stream");
+       }
+
+end:
+       return ret;
+}
+
+/*
+ * Performs the stream rotation for the rotate session feature if needed.
+ * It must be called with the stream lock held.
+ *
+ * Return 0 on success, a negative number of error.
+ */
+int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx,
+               struct lttng_consumer_stream *stream)
+{
+       int ret;
+
+       DBG("Consumer rotate stream %" PRIu64, stream->key);
+
+       if (stream->net_seq_idx != (uint64_t) -1ULL) {
+               ret = rotate_relay_stream(ctx, stream);
+       } else {
+               ret = rotate_local_stream(ctx, stream);
+       }
+       if (ret < 0) {
+               ERR("Rotate stream");
+               goto error;
+       }
+
+       if (stream->metadata_flag) {
+               switch (consumer_data.type) {
+               case LTTNG_CONSUMER_KERNEL:
+                       /*
+                        * Reset the position of what has been read from the metadata
+                        * cache to 0 so we can dump it again.
+                        */
+                       ret = kernctl_metadata_cache_dump(stream->wait_fd);
+                       if (ret < 0) {
+                               ERR("Failed to dump the kernel metadata cache after rotation");
+                               goto error;
+                       }
+                       break;
+               case LTTNG_CONSUMER32_UST:
+               case LTTNG_CONSUMER64_UST:
+                       /*
+                        * Reset the position pushed from the metadata cache so it
+                        * will write from the beginning on the next push.
+                        */
+                       stream->ust_metadata_pushed = 0;
+                       break;
+               default:
+                       ERR("Unknown consumer_data type");
+                       abort();
+               }
+       }
+       lttng_consumer_reset_stream_rotate_state(stream);
+
+       ret = 0;
+
+error:
+       return ret;
+}
+
 static
 int rotate_rename_local(const char *old_path, const char *new_path,
                uid_t uid, gid_t gid)
index 1187942577366f8f061ea1795b67b5e5e6e3ce0c..9e77ff999601f1ed3efddd67e5dcf378eb5c2760 100644 (file)
@@ -229,6 +229,20 @@ struct lttng_consumer_channel {
        uint64_t lost_packets;
 
        bool streams_sent_to_relayd;
+
+       /*
+        * Account how many streams are waiting for their rotation to be
+        * complete. When this number reaches 0, we inform the session
+        * daemon that this channel has finished its rotation.
+        */
+       uint64_t nr_stream_rotate_pending;
+
+       /*
+        * The chunk id where we currently write the data. This value is sent
+        * to the relay when we add a stream and when a stream rotates. This
+        * allows to keep track of where each stream on the relay is writing.
+        */
+       uint64_t current_chunk_id;
 };
 
 /*
@@ -789,8 +803,11 @@ void consumer_del_stream_for_data(struct lttng_consumer_stream *stream);
 void consumer_add_metadata_stream(struct lttng_consumer_stream *stream);
 void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream);
 int consumer_create_index_file(struct lttng_consumer_stream *stream);
+int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx,
+               struct lttng_consumer_stream *stream);
 int lttng_consumer_rotate_rename(const char *current_path, const char *new_path,
                uid_t uid, gid_t gid, uint64_t relayd_id);
+void lttng_consumer_reset_stream_rotate_state(struct lttng_consumer_stream *stream);
 int lttng_consumer_mkdir(const char *path, uid_t uid, gid_t gid,
                uint64_t relayd_id);
 
index 458553bfe997069706aa9b43987a99d1cfcf0ece..242abbedff15c5c2f079460cb8805dbc567268cd 100644 (file)
@@ -942,6 +942,84 @@ error:
        return ret;
 }
 
+int relayd_rotate_stream(struct lttcomm_relayd_sock *rsock, uint64_t stream_id,
+               const char *new_pathname, uint64_t new_chunk_id,
+               uint64_t seq_num)
+{
+       int ret;
+       struct lttcomm_relayd_rotate_stream *msg = NULL;
+       struct lttcomm_relayd_generic_reply reply;
+       size_t len;
+       int msg_len;
+
+       /* Code flow error. Safety net. */
+       assert(rsock);
+
+       DBG("Sending rotate stream id %" PRIu64 " command to relayd", stream_id);
+
+       /* Account for the trailing NULL. */
+       len = strnlen(new_pathname, LTTNG_PATH_MAX) + 1;
+       if (len > LTTNG_PATH_MAX) {
+               ERR("Path used in relayd rotate stream command exceeds the maximal allowed length");
+               ret = -1;
+               goto error;
+       }
+
+       msg_len = offsetof(struct lttcomm_relayd_rotate_stream, new_pathname) + len;
+       msg = zmalloc(msg_len);
+       if (!msg) {
+               PERROR("Failed to allocate relayd rotate stream command of %d bytes",
+                               msg_len);
+               ret = -1;
+               goto error;
+       }
+
+       if (lttng_strncpy(msg->new_pathname, new_pathname, len)) {
+               ret = -1;
+               ERR("Failed to copy relayd rotate stream command's new path name");
+               goto error;
+       }
+
+       msg->pathname_length = htobe32(len);
+       msg->stream_id = htobe64(stream_id);
+       msg->new_chunk_id = htobe64(new_chunk_id);
+       /*
+        * The seq_num is invalid for metadata streams, but it is ignored on
+        * the relay.
+        */
+       msg->rotate_at_seq_num = htobe64(seq_num);
+
+       /* Send command. */
+       ret = send_command(rsock, RELAYD_ROTATE_STREAM, (void *) msg, msg_len, 0);
+       if (ret < 0) {
+               ERR("Send rotate command");
+               goto error;
+       }
+
+       /* Receive response. */
+       ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
+       if (ret < 0) {
+               ERR("Receive rotate reply");
+               goto error;
+       }
+
+       reply.ret_code = be32toh(reply.ret_code);
+
+       /* Return session id or negative ret code. */
+       if (reply.ret_code != LTTNG_OK) {
+               ret = -1;
+               ERR("Relayd rotate stream replied error %d", reply.ret_code);
+       } else {
+               /* Success. */
+               ret = 0;
+               DBG("Relayd rotated stream id %" PRIu64 " successfully", stream_id);
+       }
+
+error:
+       free(msg);
+       return ret;
+}
+
 int relayd_rotate_rename(struct lttcomm_relayd_sock *rsock,
                const char *old_path, const char *new_path)
 {
index 5360ae7c8f0697ae48c81346815288cee4608da8..f6329eb5b8f2b6912c87b224a12080a594ba2ef8 100644 (file)
@@ -51,6 +51,8 @@ int relayd_send_index(struct lttcomm_relayd_sock *rsock,
                uint64_t net_seq_num);
 int relayd_reset_metadata(struct lttcomm_relayd_sock *rsock,
                uint64_t stream_id, uint64_t version);
+int relayd_rotate_stream(struct lttcomm_relayd_sock *sock, uint64_t stream_id,
+               const char *new_pathname, uint64_t new_chunk_id, uint64_t seq_num);
 int relayd_rotate_rename(struct lttcomm_relayd_sock *sock,
                const char *current_path, const char *new_path);
 int relayd_mkdir(struct lttcomm_relayd_sock *rsock, const char *path);
index 9a733df1e7aaa68c5c6e252988012fca0eda6eda..e9a7e9ff2534a6783f80e8564a28063c8bea08b0 100644 (file)
@@ -197,6 +197,7 @@ struct lttcomm_relayd_reset_metadata {
 
 struct lttcomm_relayd_rotate_stream {
        uint64_t stream_id;
+       /* Ignored for metadata streams. */
        uint64_t rotate_at_seq_num;
        uint64_t new_chunk_id;
        /* Includes trailing NULL. */
This page took 0.030964 seconds and 4 git commands to generate.