relayd: create stream files relative to a session's trace chunk
[lttng-tools.git] / src / bin / lttng-relayd / main.c
index 92f9a5600ac28b7280dcea9f1041b7725b13b748..dddc2a2b4f77b25c9ba968826460147302f25e0a 100644 (file)
@@ -74,6 +74,7 @@
 #include "connection.h"
 #include "tracefile-array.h"
 #include "tcp_keep_alive.h"
+#include "sessiond-trace-chunks.h"
 
 static const char *help_msg =
 #ifdef LTTNG_EMBED_HELP
@@ -85,7 +86,7 @@ NULL
 
 enum relay_connection_status {
        RELAY_CONNECTION_STATUS_OK,
-       /* An error occured while processing an event on the connection. */
+       /* An error occurred while processing an event on the connection. */
        RELAY_CONNECTION_STATUS_ERROR,
        /* Connection closed/shutdown cleanly. */
        RELAY_CONNECTION_STATUS_CLOSED,
@@ -167,6 +168,8 @@ struct lttng_ht *sessions_ht;
 /* Relayd health monitoring */
 struct health_app *health_relayd;
 
+struct sessiond_trace_chunk_registry *sessiond_trace_chunk_registry;
+
 static struct option long_options[] = {
        { "control-port", 1, 0, 'C', },
        { "data-port", 1, 0, 'D', },
@@ -858,14 +861,6 @@ restart:
                        revents = LTTNG_POLL_GETEV(&events, i);
                        pollfd = LTTNG_POLL_GETFD(&events, i);
 
-                       if (!revents) {
-                               /*
-                                * No activity for this FD (poll
-                                * implementation).
-                                */
-                               continue;
-                       }
-
                        /* Thread quit pipe has been closed. Killing thread. */
                        ret = check_thread_quit_pipe(pollfd, revents);
                        if (ret) {
@@ -1083,6 +1078,11 @@ static int set_index_control_data(struct relay_index *index,
        return relay_index_set_data(index, &index_data);
 }
 
+static bool session_streams_have_index(const struct relay_session *session)
+{
+       return session->minor >= 4 && !session->snapshot;
+}
+
 /*
  * Handle the RELAYD_CREATE_SESSION command.
  *
@@ -1094,17 +1094,17 @@ static int relay_create_session(const struct lttcomm_relayd_hdr *recv_hdr,
 {
        int ret = 0;
        ssize_t send_ret;
-       struct relay_session *session;
-       struct lttcomm_relayd_status_session reply;
-       char session_name[LTTNG_NAME_MAX];
-       char hostname[LTTNG_HOST_NAME_MAX];
+       struct relay_session *session = NULL;
+       struct lttcomm_relayd_status_session reply = {};
+       char session_name[LTTNG_NAME_MAX] = {};
+       char hostname[LTTNG_HOST_NAME_MAX] = {};
        uint32_t live_timer = 0;
        bool snapshot = false;
-
-       memset(session_name, 0, LTTNG_NAME_MAX);
-       memset(hostname, 0, LTTNG_HOST_NAME_MAX);
-
-       memset(&reply, 0, sizeof(reply));
+       /* Left nil for peers < 2.11. */
+       lttng_uuid sessiond_uuid = {};
+       LTTNG_OPTIONAL(uint64_t) id_sessiond = {};
+       LTTNG_OPTIONAL(uint64_t) current_chunk_id = {};
+       LTTNG_OPTIONAL(time_t) creation_time = {};
 
        if (conn->minor < 4) {
                /* From 2.1 to 2.3 */
@@ -1114,9 +1114,28 @@ static int relay_create_session(const struct lttcomm_relayd_hdr *recv_hdr,
                ret = cmd_create_session_2_4(payload, session_name,
                        hostname, &live_timer, &snapshot);
        } else {
+               bool has_current_chunk;
+               uint64_t current_chunk_id_value;
+               time_t creation_time_value;
+               uint64_t id_sessiond_value;
+
                /* From 2.11 to ... */
-               ret = cmd_create_session_2_11(payload, session_name,
-                       hostname, &live_timer, &snapshot);
+               ret = cmd_create_session_2_11(payload, session_name, hostname,
+                               &live_timer, &snapshot, &id_sessiond_value,
+                               sessiond_uuid, &has_current_chunk,
+                               &current_chunk_id_value, &creation_time_value);
+               if (lttng_uuid_is_nil(sessiond_uuid)) {
+                       /* The nil UUID is reserved for pre-2.11 clients. */
+                       ERR("Illegal nil UUID announced by peer in create session command");
+                       ret = -1;
+                       goto send_reply;
+               }
+               LTTNG_OPTIONAL_SET(&id_sessiond, id_sessiond_value);
+               LTTNG_OPTIONAL_SET(&creation_time, creation_time_value);
+               if (has_current_chunk) {
+                       LTTNG_OPTIONAL_SET(&current_chunk_id,
+                                       current_chunk_id_value);
+               }
        }
 
        if (ret < 0) {
@@ -1124,7 +1143,11 @@ static int relay_create_session(const struct lttcomm_relayd_hdr *recv_hdr,
        }
 
        session = session_create(session_name, hostname, live_timer,
-                       snapshot, conn->major, conn->minor);
+                       snapshot, sessiond_uuid,
+                       id_sessiond.is_set ? &id_sessiond.value : NULL,
+                       current_chunk_id.is_set ? &current_chunk_id.value : NULL,
+                       creation_time.is_set ? &creation_time.value : NULL,
+                       conn->major, conn->minor);
        if (!session) {
                ret = -1;
                goto send_reply;
@@ -1148,7 +1171,9 @@ send_reply:
                                send_ret);
                ret = -1;
        }
-
+       if (ret < 0 && session) {
+               session_put(session);
+       }
        return ret;
 }
 
@@ -1182,6 +1207,34 @@ static void publish_connection_local_streams(struct relay_connection *conn)
        pthread_mutex_unlock(&session->lock);
 }
 
+static int conform_channel_path(char *channel_path)
+{
+       int ret = 0;
+
+       if (strstr("../", channel_path)) {
+               ERR("Refusing channel path as it walks up the path hierarchy: \"%s\"",
+                               channel_path);
+               ret = -1;
+               goto end;
+       }
+
+       if (*channel_path == '/') {
+               const size_t len = strlen(channel_path);
+
+               /*
+                * Channel paths from peers prior to 2.11 are expressed as an
+                * absolute path that is, in reality, relative to the relay
+                * daemon's output directory. Remove the leading slash so it
+                * is correctly interpreted as a relative path later on.
+                *
+                * len (and not len - 1) is used to copy the trailing NULL.
+                */
+               bcopy(channel_path + 1, channel_path, len);
+       }
+end:
+       return ret;
+}
+
 /*
  * relay_add_stream: allocate a new stream for a session
  */
@@ -1198,7 +1251,7 @@ static int relay_add_stream(const struct lttcomm_relayd_hdr *recv_hdr,
        uint64_t stream_handle = -1ULL;
        char *path_name = NULL, *channel_name = NULL;
        uint64_t tracefile_size = 0, tracefile_count = 0;
-       struct relay_stream_chunk_id stream_chunk_id = { 0 };
+       LTTNG_OPTIONAL(uint64_t) stream_chunk_id = {};
 
        if (!session || !conn->version_check_done) {
                ERR("Trying to add a stream before version check");
@@ -1226,6 +1279,10 @@ static int relay_add_stream(const struct lttcomm_relayd_hdr *recv_hdr,
                goto send_reply;
        }
 
+       if (conform_channel_path(path_name)) {
+               goto send_reply;
+       }
+
        trace = ctf_trace_get_by_path_or_create(session, path_name);
        if (!trace) {
                goto send_reply;
@@ -1238,8 +1295,7 @@ static int relay_add_stream(const struct lttcomm_relayd_hdr *recv_hdr,
 
        /* We pass ownership of path_name and channel_name. */
        stream = stream_create(trace, stream_handle, path_name,
-               channel_name, tracefile_size, tracefile_count,
-               &stream_chunk_id);
+               channel_name, tracefile_size, tracefile_count);
        path_name = NULL;
        channel_name = NULL;
 
@@ -1534,11 +1590,14 @@ end:
  * Return 0 on success, -1 on error.
  */
 static
-int create_rotate_index_file(struct relay_stream *stream)
+int create_rotate_index_file(struct relay_stream *stream,
+               const char *channel_path)
 {
        int ret;
        uint32_t major, minor;
 
+       ASSERT_LOCKED(stream->lock);
+
        /* Put ref on previous index_file. */
        if (stream->index_file) {
                lttng_index_file_put(stream->index_file);
@@ -1546,12 +1605,26 @@ int create_rotate_index_file(struct relay_stream *stream)
        }
        major = stream->trace->session->major;
        minor = stream->trace->session->minor;
-       stream->index_file = lttng_index_file_create(stream->path_name,
-                       stream->channel_name,
-                       -1, -1, stream->tracefile_size,
-                       tracefile_array_get_file_index_head(stream->tfa),
+       if (!stream->trace->index_folder_created) {
+               char *index_subpath = NULL;
+
+               ret = asprintf(&index_subpath, "%s/%s", channel_path, DEFAULT_INDEX_DIR);
+               if (ret < 0) {
+                       goto end;
+               }
+
+               ret = lttng_trace_chunk_create_subdirectory(stream->trace_chunk, index_subpath);
+               free(index_subpath);
+               if (ret) {
+                       goto end;
+               }
+               stream->trace->index_folder_created = true;
+       }
+       stream->index_file = lttng_index_file_create_from_trace_chunk(
+                       stream->trace_chunk, channel_path, stream->channel_name,
+                       stream->tracefile_size, stream->tracefile_count,
                        lttng_to_index_major(major, minor),
-                       lttng_to_index_minor(major, minor));
+                       lttng_to_index_minor(major, minor), true);
        if (!stream->index_file) {
                ret = -1;
                goto end;
@@ -1564,10 +1637,12 @@ end:
 }
 
 static
-int do_rotate_stream(struct relay_stream *stream)
+int do_rotate_stream_data(struct relay_stream *stream)
 {
        int ret;
 
+       DBG("Rotating stream %" PRIu64 " data file",
+                       stream->stream_handle);
        /* Perform the stream rotation. */
        ret = utils_rotate_stream_file(stream->path_name,
                        stream->channel_name, stream->tracefile_size,
@@ -1579,19 +1654,17 @@ int do_rotate_stream(struct relay_stream *stream)
                goto end;
        }
        stream->tracefile_size_current = 0;
-
-       /* Rotate also the index if the stream is not a metadata stream. */
-       if (!stream->is_metadata) {
-               ret = create_rotate_index_file(stream);
-               if (ret < 0) {
-                       ERR("Failed to rotate index file");
-                       goto end;
-               }
-       }
-
-       stream->rotate_at_seq_num = -1ULL;
        stream->pos_after_last_complete_data_index = 0;
+       stream->data_rotated = true;
 
+       if (stream->data_rotated && stream->index_rotated) {
+               /* Rotation completed; reset its state. */
+               DBG("Rotation completed for stream %" PRIu64,
+                               stream->stream_handle);
+               stream->rotate_at_seq_num = -1ULL;
+               stream->data_rotated = false;
+               stream->index_rotated = false;
+       }
 end:
        return ret;
 }
@@ -1603,9 +1676,7 @@ end:
  * connections are separate, the indexes as well as the commands arrive from
  * the control connection and we have no control over the order so we could be
  * in a situation where too much data has been received on the data connection
- * before the rotation command on the control connection arrives. We don't need
- * to update the index because its order is guaranteed with the rotation
- * command message.
+ * before the rotation command on the control connection arrives.
  */
 static
 int rotate_truncate_stream(struct relay_stream *stream)
@@ -1636,7 +1707,7 @@ int rotate_truncate_stream(struct relay_stream *stream)
 
        /*
         * Rewind the current tracefile to the position at which the rotation
-        * should have occured.
+        * should have occurred.
         */
        lseek_ret = lseek(stream->stream_fd->fd,
                        stream->pos_after_last_complete_data_index, SEEK_SET);
@@ -1704,12 +1775,6 @@ int rotate_truncate_stream(struct relay_stream *stream)
                goto end;
        }
 
-       ret = create_rotate_index_file(stream);
-       if (ret < 0) {
-               ERR("Rotate stream index file");
-               goto end;
-       }
-
        /*
         * Update the offset and FD of all the eventual indexes created by the
         * data connection before the rotation command arrived.
@@ -1732,27 +1797,100 @@ end:
 }
 
 /*
- * Check if a stream should perform a rotation (for session rotation).
+ * Check if a stream's index file should be rotated (for session rotation).
  * Must be called with the stream lock held.
  *
  * Return 0 on success, a negative value on error.
  */
 static
-int try_rotate_stream(struct relay_stream *stream)
+int try_rotate_stream_index(struct relay_stream *stream)
 {
        int ret = 0;
 
-       /* No rotation expected. */
        if (stream->rotate_at_seq_num == -1ULL) {
+               /* No rotation expected. */
                goto end;
        }
 
-       if (stream->prev_seq < stream->rotate_at_seq_num ||
-                       stream->prev_seq == -1ULL) {
-               DBG("Stream %" PRIu64 " no yet ready for rotation",
+       if (stream->index_rotated) {
+               /* Rotation of the index has already occurred. */
+               goto end;
+       }
+
+       if (stream->prev_index_seq == -1ULL ||
+                       stream->prev_index_seq < stream->rotate_at_seq_num) {
+               DBG("Stream %" PRIu64 " index not yet ready for rotation (rotate_at_seq_num = %" PRIu64 ", prev_index_seq = %" PRIu64 ")",
+                               stream->stream_handle,
+                               stream->rotate_at_seq_num,
+                               stream->prev_index_seq);
+               goto end;
+       } else if (stream->prev_index_seq != stream->rotate_at_seq_num) {
+               /*
+                * Unexpected, protocol error/bug.
+                * It could mean that we received a rotation position
+                * that is in the past.
+                */
+               ERR("Stream %" PRIu64 " index is in an inconsistent state (rotate_at_seq_num = %" PRIu64 ", prev_data_seq = %" PRIu64 ", prev_index_seq = %" PRIu64 ")",
+                               stream->stream_handle,
+                               stream->rotate_at_seq_num,
+                               stream->prev_data_seq,
+                               stream->prev_index_seq);
+               ret = -1;
+               goto end;
+       } else {
+               DBG("Rotating stream %" PRIu64 " index file",
                                stream->stream_handle);
+               ret = create_rotate_index_file(stream, stream->path_name);
+               stream->index_rotated = true;
+
+               if (stream->data_rotated && stream->index_rotated) {
+                       /* Rotation completed; reset its state. */
+                       DBG("Rotation completed for stream %" PRIu64,
+                                       stream->stream_handle);
+                       stream->rotate_at_seq_num = -1ULL;
+                       stream->data_rotated = false;
+                       stream->index_rotated = false;
+               }
+       }
+
+end:
+       return ret;
+}
+
+/*
+ * Check if a stream's data file (as opposed to index) should be rotated
+ * (for session rotation).
+ * Must be called with the stream lock held.
+ *
+ * Return 0 on success, a negative value on error.
+ */
+static
+int try_rotate_stream_data(struct relay_stream *stream)
+{
+       int ret = 0;
+
+       if (stream->rotate_at_seq_num == -1ULL) {
+               /* No rotation expected. */
+               goto end;
+       }
+
+       if (stream->data_rotated) {
+               /* Rotation of the data file has already occurred. */
+               goto end;
+       }
+
+       if (stream->prev_data_seq == -1ULL ||
+                       stream->prev_data_seq < stream->rotate_at_seq_num) {
+               DBG("Stream %" PRIu64 " not yet ready for rotation (rotate_at_seq_num = %" PRIu64 ", prev_data_seq = %" PRIu64 ")",
+                               stream->stream_handle,
+                               stream->rotate_at_seq_num,
+                               stream->prev_data_seq);
                goto end;
-       } else if (stream->prev_seq > stream->rotate_at_seq_num) {
+       } else if (stream->prev_data_seq > stream->rotate_at_seq_num) {
+               /*
+                * prev_data_seq is checked here since indexes and rotation
+                * commands are serialized with respect to each other.
+                */
                DBG("Rotation after too much data has been written in tracefile "
                                "for stream %" PRIu64 ", need to truncate before "
                                "rotating", stream->stream_handle);
@@ -1761,11 +1899,20 @@ int try_rotate_stream(struct relay_stream *stream)
                        ERR("Failed to truncate stream");
                        goto end;
                }
+       } else if (stream->prev_data_seq != stream->rotate_at_seq_num) {
+               /*
+                * Unexpected, protocol error/bug.
+                * It could mean that we received a rotation position
+                * that is in the past.
+                */
+               ERR("Stream %" PRIu64 " data is in an inconsistent state (rotate_at_seq_num = %" PRIu64 ", prev_data_seq = %" PRIu64 ")",
+                               stream->stream_handle,
+                               stream->rotate_at_seq_num,
+                               stream->prev_data_seq);
+               ret = -1;
+               goto end;
        } else {
-               /* stream->prev_seq == stream->rotate_at_seq_num */
-               DBG("Stream %" PRIu64 " ready for rotation",
-                               stream->stream_handle);
-               ret = do_rotate_stream(stream);
+               ret = do_rotate_stream_data(stream);
        }
 
 end:
@@ -1836,7 +1983,7 @@ static int relay_recv_metadata(const struct lttcomm_relayd_hdr *recv_hdr,
        DBG2("Relay metadata written. Updated metadata_received %" PRIu64,
                metadata_stream->metadata_received);
 
-       ret = try_rotate_stream(metadata_stream);
+       ret = try_rotate_stream_data(metadata_stream);
        if (ret < 0) {
                goto end_put;
        }
@@ -1931,6 +2078,7 @@ static int relay_data_pending(const struct lttcomm_relayd_hdr *recv_hdr,
        struct relay_stream *stream;
        ssize_t send_ret;
        int ret;
+       uint64_t stream_seq;
 
        DBG("Data pending command received");
 
@@ -1958,12 +2106,23 @@ static int relay_data_pending(const struct lttcomm_relayd_hdr *recv_hdr,
 
        pthread_mutex_lock(&stream->lock);
 
-       DBG("Data pending for stream id %" PRIu64 " prev_seq %" PRIu64
-                       " and last_seq %" PRIu64, msg.stream_id,
-                       stream->prev_seq, msg.last_net_seq_num);
+       if (session_streams_have_index(session)) {
+               /*
+                * Ensure that both the index and stream data have been
+                * flushed up to the requested point.
+                */
+               stream_seq = min(stream->prev_data_seq, stream->prev_index_seq);
+       } else {
+               stream_seq = stream->prev_data_seq;
+       }
+       DBG("Data pending for stream id %" PRIu64 ": prev_data_seq %" PRIu64
+                       ", prev_index_seq %" PRIu64
+                       ", and last_seq %" PRIu64, msg.stream_id,
+                       stream->prev_data_seq, stream->prev_index_seq,
+                       msg.last_net_seq_num);
 
        /* Avoid wrapping issue */
-       if (((int64_t) (stream->prev_seq - msg.last_net_seq_num)) >= 0) {
+       if (((int64_t) (stream_seq - msg.last_net_seq_num)) >= 0) {
                /* Data has in fact been written and is NOT pending */
                ret = 0;
        } else {
@@ -2183,7 +2342,18 @@ static int relay_end_data_pending(const struct lttcomm_relayd_hdr *recv_hdr,
                }
                pthread_mutex_lock(&stream->lock);
                if (!stream->data_pending_check_done) {
-                       if (!stream->closed || !(((int64_t) (stream->prev_seq - stream->last_net_seq_num)) >= 0)) {
+                       uint64_t stream_seq;
+
+                       if (session_streams_have_index(conn->session)) {
+                               /*
+                                * Ensure that both the index and stream data have been
+                                * flushed up to the requested point.
+                                */
+                               stream_seq = min(stream->prev_data_seq, stream->prev_index_seq);
+                       } else {
+                               stream_seq = stream->prev_data_seq;
+                       }
+                       if (!stream->closed || !(((int64_t) (stream_seq - stream->last_net_seq_num)) >= 0)) {
                                is_data_inflight = 1;
                                DBG("Data is still in flight for stream %" PRIu64,
                                                stream->stream_handle);
@@ -2315,6 +2485,11 @@ static int relay_recv_index(const struct lttcomm_relayd_hdr *recv_hdr,
                stream->index_received_seqcount++;
                stream->pos_after_last_complete_data_index += index->total_size;
                stream->prev_index_seq = index_info.net_seq_num;
+
+               ret = try_rotate_stream_index(stream);
+               if (ret < 0) {
+                       goto end_stream_put;
+               }
        } else if (ret > 0) {
                /* no flush. */
                ret = 0;
@@ -2414,8 +2589,6 @@ static int relay_rotate_session_stream(const struct lttcomm_relayd_hdr *recv_hdr
        size_t path_len;
        struct lttng_buffer_view new_path_view;
 
-       DBG("Rotate stream received");
-
        if (!session || !conn->version_check_done) {
                ERR("Trying to rotate a stream before version check");
                ret = -1;
@@ -2476,7 +2649,8 @@ static int relay_rotate_session_stream(const struct lttcomm_relayd_hdr *recv_hdr
         * Update the trace path (just the folder, the stream name does not
         * change).
         */
-       free(stream->path_name);
+       free(stream->prev_path_name);
+       stream->prev_path_name = stream->path_name;
        stream->path_name = create_output_path(new_path_view.data);
        if (!stream->path_name) {
                ERR("Failed to create a new output path");
@@ -2491,22 +2665,29 @@ static int relay_rotate_session_stream(const struct lttcomm_relayd_hdr *recv_hdr
                goto end_stream_unlock;
        }
 
-       assert(stream->current_chunk_id.is_set);
-       stream->current_chunk_id.value = stream_info.new_chunk_id;
-
        if (stream->is_metadata) {
+               /*
+                * Metadata streams have no index; consider its rotation
+                * complete.
+                */
+               stream->index_rotated = true;
                /*
                 * The metadata stream is sent only over the control connection
                 * so we know we have all the data to perform the stream
                 * rotation.
                 */
-               ret = do_rotate_stream(stream);
+               ret = do_rotate_stream_data(stream);
        } else {
                stream->rotate_at_seq_num = stream_info.rotate_at_seq_num;
-               ret = try_rotate_stream(stream);
-       }
-       if (ret < 0) {
-               goto end_stream_unlock;
+               ret = try_rotate_stream_data(stream);
+               if (ret < 0) {
+                       goto end_stream_unlock;
+               }
+
+               ret = try_rotate_stream_index(stream);
+               if (ret < 0) {
+                       goto end_stream_unlock;
+               }
        }
 
 end_stream_unlock:
@@ -2531,357 +2712,308 @@ end_no_reply:
        return ret;
 }
 
-/*
- * relay_mkdir: Create a folder on the disk.
- */
-static int relay_mkdir(const struct lttcomm_relayd_hdr *recv_hdr,
-               struct relay_connection *conn,
-               const struct lttng_buffer_view *payload)
+static int init_session_output_directory_handle(struct relay_session *session,
+               struct lttng_directory_handle *handle)
 {
        int ret;
-       struct relay_session *session = conn->session;
-       struct lttcomm_relayd_mkdir path_info_header;
-       struct lttcomm_relayd_generic_reply reply;
-       char *path = NULL;
-       size_t header_len;
-       ssize_t send_ret;
-       struct lttng_buffer_view path_view;
-
-       if (!session || !conn->version_check_done) {
-               ERR("Trying to create a directory before version check");
-               ret = -1;
-               goto end_no_session;
-       }
+       /* hostname/session_name */
+       char *session_directory = NULL;
+       /*
+        * base path + session_directory
+        * e.g. /home/user/lttng-traces/hostname/session_name
+        */
+       char *full_session_path = NULL;
+       char creation_time_str[16];
+       struct tm *timeinfo;
 
-       if (session->major == 2 && session->minor < 11) {
-               /*
-                * This client is not supposed to use this command since
-                * it predates its introduction.
-                */
-               ERR("relay_mkdir command is unsupported before LTTng 2.11");
+       assert(session->creation_time.is_set);
+       timeinfo = localtime(&session->creation_time.value);
+       if (!timeinfo) {
                ret = -1;
-               goto end_no_session;
+               goto end;
        }
+       strftime(creation_time_str, sizeof(creation_time_str), "%Y%m%d-%H%M%S",
+                       timeinfo);
 
-       header_len = sizeof(path_info_header);
-       if (payload->size < header_len) {
-               ERR("Unexpected payload size in \"relay_mkdir\": expected >= %zu bytes, got %zu bytes",
-                               header_len, payload->size);
-               ret = -1;
-               goto end_no_session;
+       pthread_mutex_lock(&session->lock);
+       ret = asprintf(&session_directory, "%s/%s-%s", session->hostname,
+                       session->session_name, creation_time_str);
+       pthread_mutex_unlock(&session->lock);
+       if (ret < 0) {
+               PERROR("Failed to format session directory name");
+               goto end;
        }
 
-       memcpy(&path_info_header, payload->data, header_len);
-
-       path_info_header.length = be32toh(path_info_header.length);
-
-       if (payload->size < header_len + path_info_header.length) {
-               ERR("Unexpected payload size in \"relay_mkdir\" including path: expected >= %zu bytes, got %zu bytes",
-                               header_len + path_info_header.length, payload->size);
+       full_session_path = create_output_path(session_directory);
+       if (!full_session_path) {
                ret = -1;
-               goto end_no_session;
-       }
-
-       /* Ensure that it fits in local path length. */
-       if (path_info_header.length >= LTTNG_PATH_MAX) {
-               ret = -ENAMETOOLONG;
-               ERR("Path name argument of mkdir command (%" PRIu32 " bytes) exceeds the maximal length allowed (%d bytes)",
-                               path_info_header.length, LTTNG_PATH_MAX);
                goto end;
        }
 
-       path_view = lttng_buffer_view_from_view(payload, header_len,
-                       path_info_header.length);
-
-       path = create_output_path(path_view.data);
-       if (!path) {
-               ERR("Failed to create output path");
-               ret = -1;
+       ret = utils_mkdir_recursive(
+                       full_session_path, S_IRWXU | S_IRWXG, -1, -1);
+       if (ret) {
+               ERR("Failed to create session output path \"%s\"",
+                               full_session_path);
                goto end;
        }
 
-       ret = utils_mkdir_recursive(path, S_IRWXU | S_IRWXG, -1, -1);
-       if (ret < 0) {
-               ERR("relay creating output directory");
+       ret = lttng_directory_handle_init(handle, full_session_path);
+       if (ret) {
                goto end;
        }
-
-       ret = 0;
-
 end:
-       memset(&reply, 0, sizeof(reply));
-       if (ret < 0) {
-               reply.ret_code = htobe32(LTTNG_ERR_UNK);
-       } else {
-               reply.ret_code = htobe32(LTTNG_OK);
-       }
-       send_ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(reply), 0);
-       if (send_ret < (ssize_t) sizeof(reply)) {
-               ERR("Failed to send \"mkdir\" command reply (ret = %zd)", send_ret);
-               ret = -1;
-       }
-
-end_no_session:
-       free(path);
-       return ret;
-}
-
-static int validate_rotate_rename_path_length(const char *path_type,
-               uint32_t path_length)
-{
-       int ret = 0;
-
-       if (path_length > LTTNG_PATH_MAX) {
-               ret = -ENAMETOOLONG;
-               ERR("rotate rename \"%s\" path name length (%" PRIu32 " bytes) exceeds the allowed size of %i bytes",
-                               path_type, path_length, LTTNG_PATH_MAX);
-       } else if (path_length == 0) {
-               ret = -EINVAL;
-               ERR("rotate rename \"%s\" path name has an illegal length of 0", path_type);
-       }
+       free(session_directory);
+       free(full_session_path);
        return ret;
 }
 
 /*
- * relay_rotate_rename: rename the trace folder after a rotation is
- * completed. We are not closing any fd here, just moving the folder, so it
- * works even if data is still in-flight.
+ * relay_create_trace_chunk: create a new trace chunk
  */
-static int relay_rotate_rename(const struct lttcomm_relayd_hdr *recv_hdr,
+static int relay_create_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr,
                struct relay_connection *conn,
                const struct lttng_buffer_view *payload)
 {
-       int ret;
+       int ret = 0;
        ssize_t send_ret;
        struct relay_session *session = conn->session;
-       struct lttcomm_relayd_generic_reply reply;
-       struct lttcomm_relayd_rotate_rename header;
-       size_t header_len;
-       size_t received_paths_size;
-       char *complete_old_path = NULL, *complete_new_path = NULL;
-       struct lttng_buffer_view old_path_view;
-       struct lttng_buffer_view new_path_view;
+       struct lttcomm_relayd_create_trace_chunk *msg;
+       struct lttcomm_relayd_generic_reply reply = {};
+       struct lttng_buffer_view header_view;
+       struct lttng_buffer_view chunk_name_view;
+       struct lttng_trace_chunk *chunk = NULL, *published_chunk = NULL;
+       enum lttng_error_code reply_code = LTTNG_OK;
+       enum lttng_trace_chunk_status chunk_status;
+       struct lttng_directory_handle session_output;
 
        if (!session || !conn->version_check_done) {
-               ERR("Trying to rename a trace folder before version check");
+               ERR("Trying to create a trace chunk before version check");
                ret = -1;
                goto end_no_reply;
        }
 
        if (session->major == 2 && session->minor < 11) {
-               ERR("relay_rotate_rename command is unsupported before LTTng 2.11");
+               ERR("Chunk creation command is unsupported before 2.11");
                ret = -1;
                goto end_no_reply;
        }
 
-       header_len = sizeof(header);
-       if (payload->size < header_len) {
-               ERR("Unexpected payload size in \"relay_rotate_rename\": expected >= %zu bytes, got %zu bytes",
-                               header_len, payload->size);
+       header_view = lttng_buffer_view_from_view(payload, 0, sizeof(*msg));
+       if (!header_view.data) {
+               ERR("Failed to receive payload of chunk creation command");
                ret = -1;
                goto end_no_reply;
        }
 
-       memcpy(&header, payload->data, header_len);
+       /* Convert to host endianness. */
+       msg = (typeof(msg)) header_view.data;
+       msg->chunk_id = be64toh(msg->chunk_id);
+       msg->creation_timestamp = be64toh(msg->creation_timestamp);
+       msg->override_name_length = be32toh(msg->override_name_length);
 
-       header.old_path_length = be32toh(header.old_path_length);
-       header.new_path_length = be32toh(header.new_path_length);
-       received_paths_size = header.old_path_length + header.new_path_length;
-
-       if (payload->size < header_len + received_paths_size) {
-               ERR("Unexpected payload size in \"relay_rotate_rename\" including paths: expected >= %zu bytes, got %zu bytes",
-                               header_len, payload->size);
+       chunk = lttng_trace_chunk_create(
+                       msg->chunk_id, msg->creation_timestamp);
+       if (!chunk) {
+               ERR("Failed to create trace chunk in trace chunk creation command");
                ret = -1;
-               goto end_no_reply;
-       }
-
-       /* Ensure the paths don't exceed their allowed size. */
-       ret = validate_rotate_rename_path_length("old", header.old_path_length);
-       if (ret) {
-               goto end;
-       }
-       ret = validate_rotate_rename_path_length("new", header.new_path_length);
-       if (ret) {
+               reply_code = LTTNG_ERR_NOMEM;
                goto end;
        }
 
-       old_path_view = lttng_buffer_view_from_view(payload, header_len,
-                       header.old_path_length);
-       new_path_view = lttng_buffer_view_from_view(payload,
-                       header_len + header.old_path_length,
-                       header.new_path_length);
+       if (msg->override_name_length) {
+               const char *name;
 
-       /* Validate that both paths received are NULL terminated. */
-       if (old_path_view.data[old_path_view.size - 1] != '\0') {
-               ERR("relay_rotate_rename command's \"old\" path is invalid (not NULL terminated)");
-               ret = -1;
-               goto end;
+               chunk_name_view = lttng_buffer_view_from_view(payload,
+                               sizeof(*msg),
+                               msg->override_name_length);
+               name = chunk_name_view.data;
+               if (!name || name[msg->override_name_length - 1]) {
+                       ERR("Failed to receive payload of chunk creation command");
+                       ret = -1;
+                       reply_code = LTTNG_ERR_INVALID;
+                       goto end;
+               }
+
+               chunk_status = lttng_trace_chunk_override_name(
+                               chunk, chunk_name_view.data);
+               switch (chunk_status) {
+               case LTTNG_TRACE_CHUNK_STATUS_OK:
+                       break;
+               case LTTNG_TRACE_CHUNK_STATUS_INVALID_ARGUMENT:
+                       ERR("Failed to set the name of new trace chunk in trace chunk creation command (invalid name)");
+                       reply_code = LTTNG_ERR_INVALID;
+                       ret = -1;
+                       goto end;
+               default:
+                       ERR("Failed to set the name of new trace chunk in trace chunk creation command (unknown error)");
+                       reply_code = LTTNG_ERR_UNK;
+                       ret = -1;
+                       goto end;
+               }
        }
-       if (new_path_view.data[new_path_view.size - 1] != '\0') {
-               ERR("relay_rotate_rename command's \"new\" path is invalid (not NULL terminated)");
-               ret = -1;
+
+       ret = init_session_output_directory_handle(
+                       conn->session, &session_output);
+       if (ret) {
+               reply_code = LTTNG_ERR_CREATE_DIR_FAIL;
                goto end;
        }
 
-       complete_old_path = create_output_path(old_path_view.data);
-       if (!complete_old_path) {
-               ERR("Failed to build old output path in rotate_rename command");
+       chunk_status = lttng_trace_chunk_set_credentials_current_user(chunk);
+       if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+               reply_code = LTTNG_ERR_UNK;
                ret = -1;
                goto end;
        }
 
-       complete_new_path = create_output_path(new_path_view.data);
-       if (!complete_new_path) {
-               ERR("Failed to build new output path in rotate_rename command");
+       chunk_status = lttng_trace_chunk_set_as_owner(chunk, &session_output);
+       if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+               reply_code = LTTNG_ERR_UNK;
                ret = -1;
                goto end;
        }
 
-       ret = utils_mkdir_recursive(complete_new_path, S_IRWXU | S_IRWXG,
-                       -1, -1);
-       if (ret < 0) {
-               ERR("Failed to mkdir() rotate_rename's \"new\" output directory at \"%s\"",
-                               complete_new_path);
-               goto end;
-       }
+       published_chunk = sessiond_trace_chunk_registry_publish_chunk(
+                       sessiond_trace_chunk_registry,
+                       conn->session->sessiond_uuid,
+                       conn->session->id,
+                       chunk);
+       if (!published_chunk) {
+               char uuid_str[UUID_STR_LEN];
 
-       /*
-        * If a domain has not yet created its channel, the domain-specific
-        * folder might not exist, but this is not an error.
-        */
-       ret = rename(complete_old_path, complete_new_path);
-       if (ret < 0 && errno != ENOENT) {
-               PERROR("Renaming chunk in rotate_rename command from \"%s\" to \"%s\"",
-                               complete_old_path, complete_new_path);
+               lttng_uuid_to_str(conn->session->sessiond_uuid, uuid_str);
+               ERR("Failed to publish chunk: sessiond_uuid = %s, session_id = %" PRIu64 ", chunk_id = %" PRIu64,
+                               uuid_str,
+                               conn->session->id,
+                               msg->chunk_id);
+               ret = -1;
+               reply_code = LTTNG_ERR_NOMEM;
                goto end;
        }
-       ret = 0;
+
+       pthread_mutex_lock(&conn->session->lock);
+       lttng_trace_chunk_put(conn->session->current_trace_chunk);
+       conn->session->current_trace_chunk = published_chunk;
+       pthread_mutex_unlock(&conn->session->lock);
+       published_chunk = NULL;
 
 end:
-       memset(&reply, 0, sizeof(reply));
-       if (ret < 0) {
-               reply.ret_code = htobe32(LTTNG_ERR_UNK);
-       } else {
-               reply.ret_code = htobe32(LTTNG_OK);
-       }
-       send_ret = conn->sock->ops->sendmsg(conn->sock, &reply,
-                       sizeof(reply), 0);
-       if (send_ret < sizeof(reply)) {
-               ERR("Failed to send \"rotate rename\" command reply (ret = %zd)",
+       reply.ret_code = htobe32((uint32_t) reply_code);
+       send_ret = conn->sock->ops->sendmsg(conn->sock,
+                       &reply,
+                       sizeof(struct lttcomm_relayd_generic_reply),
+                       0);
+       if (send_ret < (ssize_t) sizeof(reply)) {
+               ERR("Failed to send \"create trace chunk\" command reply (ret = %zd)",
                                send_ret);
                ret = -1;
        }
-
 end_no_reply:
-       free(complete_old_path);
-       free(complete_new_path);
+       lttng_trace_chunk_put(chunk);
+       lttng_trace_chunk_put(published_chunk);
+       lttng_directory_handle_fini(&session_output);
        return ret;
 }
 
 /*
- * Check if all the streams in the session have completed the last rotation.
- * The chunk_id value is used to distinguish the cases where a stream was
- * closed on the consumerd before the rotation started but it still active on
- * the relayd, and the case where a stream appeared on the consumerd/relayd
- * after the last rotation started (in that case, it is already writing in the
- * new chunk folder).
+ * relay_close_trace_chunk: close a trace chunk
  */
-static
-int relay_rotate_pending(const struct lttcomm_relayd_hdr *recv_hdr,
+static int relay_close_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr,
                struct relay_connection *conn,
                const struct lttng_buffer_view *payload)
 {
-       struct relay_session *session = conn->session;
-       struct lttcomm_relayd_rotate_pending msg;
-       struct lttcomm_relayd_rotate_pending_reply reply;
-       struct lttng_ht_iter iter;
-       struct relay_stream *stream;
        int ret = 0;
        ssize_t send_ret;
+       struct relay_session *session = conn->session;
+       struct lttcomm_relayd_close_trace_chunk *msg;
+       struct lttcomm_relayd_generic_reply reply = {};
+       struct lttng_buffer_view header_view;
+       struct lttng_trace_chunk *chunk = NULL;
+       enum lttng_error_code reply_code = LTTNG_OK;
+       enum lttng_trace_chunk_status chunk_status;
        uint64_t chunk_id;
-        bool rotate_pending = false;
-
-       DBG("Rotate pending command received");
+       LTTNG_OPTIONAL(enum lttng_trace_chunk_command_type) close_command;
+       time_t close_timestamp;
 
        if (!session || !conn->version_check_done) {
-               ERR("Trying to check for data before version check");
+               ERR("Trying to close a trace chunk before version check");
                ret = -1;
                goto end_no_reply;
        }
 
        if (session->major == 2 && session->minor < 11) {
-               ERR("Unsupported feature before 2.11");
+               ERR("Chunk close command is unsupported before 2.11");
                ret = -1;
                goto end_no_reply;
        }
 
-       if (payload->size < sizeof(msg)) {
-               ERR("Unexpected payload size in \"relay_rotate_pending\": expected >= %zu bytes, got %zu bytes",
-                               sizeof(msg), payload->size);
+       header_view = lttng_buffer_view_from_view(payload, 0, sizeof(*msg));
+       if (!header_view.data) {
+               ERR("Failed to receive payload of chunk close command");
                ret = -1;
                goto end_no_reply;
        }
 
-       memcpy(&msg, payload->data, sizeof(msg));
-
-       chunk_id = be64toh(msg.chunk_id);
+       /* Convert to host endianness. */
+       msg = (typeof(msg)) header_view.data;
+       chunk_id = be64toh(msg->chunk_id);
+       close_timestamp = (time_t) be64toh(msg->close_timestamp);
+       close_command = (typeof(close_command)){
+               .value = be32toh(msg->close_command.value),
+               .is_set = msg->close_command.is_set,
+       };
+
+       chunk = sessiond_trace_chunk_registry_get_chunk(
+                       sessiond_trace_chunk_registry,
+                       conn->session->sessiond_uuid,
+                       conn->session->id,
+                       chunk_id);
+       if (!chunk) {
+               char uuid_str[UUID_STR_LEN];
+
+               lttng_uuid_to_str(conn->session->sessiond_uuid, uuid_str);
+               ERR("Failed to find chunk to close: sessiond_uuid = %s, session_id = %" PRIu64 ", chunk_id = %" PRIu64,
+                               uuid_str,
+                               conn->session->id,
+                               msg->chunk_id);
+               ret = -1;
+               reply_code = LTTNG_ERR_NOMEM;
+               goto end;
+       }
 
-       DBG("Evaluating rotate pending for session \"%s\" and  chunk id %" PRIu64,
-                       session->session_name, chunk_id);
+       chunk_status = lttng_trace_chunk_set_close_timestamp(
+                       chunk, close_timestamp);
+       if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+               ERR("Failed to set trace chunk close timestamp");
+               ret = -1;
+               reply_code = LTTNG_ERR_UNK;
+               goto end;
+       }
 
-       /*
-        * Iterate over all the streams in the session and check if they are
-        * still waiting for data to perform their rotation.
-        */
-       rcu_read_lock();
-       cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream,
-                       node.node) {
-               if (!stream_get(stream)) {
-                       continue;
-               }
-               if (stream->trace->session != session) {
-                       stream_put(stream);
-                       continue;
-               }
-               pthread_mutex_lock(&stream->lock);
-               if (stream->rotate_at_seq_num != -1ULL) {
-                       /* We have not yet performed the rotation. */
-                       rotate_pending = true;
-                       DBG("Stream %" PRIu64 " is still rotating",
-                                       stream->stream_handle);
-               } else if (stream->current_chunk_id.value < chunk_id) {
-                       /*
-                        * Stream closed on the consumer but still active on the
-                        * relay.
-                        */
-                       rotate_pending = true;
-                       DBG("Stream %" PRIu64 " did not exist on the consumer "
-                                       "when the last rotation started, but is"
-                                       "still waiting for data before getting"
-                                       "closed",
-                                       stream->stream_handle);
-               }
-               pthread_mutex_unlock(&stream->lock);
-               stream_put(stream);
-               if (rotate_pending) {
-                       goto send_reply;
+       if (close_command.is_set) {
+               chunk_status = lttng_trace_chunk_set_close_command(
+                               chunk, close_command.value);
+               if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+                       ret = -1;
+                       reply_code = LTTNG_ERR_INVALID;
+                       goto end;
                }
        }
 
-send_reply:
-       rcu_read_unlock();
-       memset(&reply, 0, sizeof(reply));
-       reply.generic.ret_code = htobe32((uint32_t) LTTNG_OK);
-       reply.is_pending = (uint8_t) !!rotate_pending;
-       send_ret = conn->sock->ops->sendmsg(conn->sock, &reply,
-                       sizeof(reply), 0);
+end:
+       reply.ret_code = htobe32((uint32_t) reply_code);
+       send_ret = conn->sock->ops->sendmsg(conn->sock,
+                       &reply,
+                       sizeof(struct lttcomm_relayd_generic_reply),
+                       0);
        if (send_ret < (ssize_t) sizeof(reply)) {
-               ERR("Failed to send \"rotate pending\" command reply (ret = %zd)",
+               ERR("Failed to send \"create trace chunk\" command reply (ret = %zd)",
                                send_ret);
                ret = -1;
        }
-
 end_no_reply:
+       lttng_trace_chunk_put(chunk);
        return ret;
 }
 
@@ -2951,17 +3083,13 @@ static int relay_process_control_command(struct relay_connection *conn,
                DBG_CMD("RELAYD_ROTATE_STREAM", conn);
                ret = relay_rotate_session_stream(header, conn, payload);
                break;
-       case RELAYD_ROTATE_RENAME:
-               DBG_CMD("RELAYD_ROTATE_RENAME", conn);
-               ret = relay_rotate_rename(header, conn, payload);
-               break;
-       case RELAYD_ROTATE_PENDING:
-               DBG_CMD("RELAYD_ROTATE_PENDING", conn);
-               ret = relay_rotate_pending(header, conn, payload);
+       case RELAYD_CREATE_TRACE_CHUNK:
+               DBG_CMD("RELAYD_CREATE_TRACE_CHUNK", conn);
+               ret = relay_create_trace_chunk(header, conn, payload);
                break;
-       case RELAYD_MKDIR:
-               DBG_CMD("RELAYD_MKDIR", conn);
-               ret = relay_mkdir(header, conn, payload);
+       case RELAYD_CLOSE_TRACE_CHUNK:
+               DBG_CMD("RELAYD_CLOSE_TRACE_CHUNK", conn);
+               ret = relay_close_trace_chunk(header, conn, payload);
                break;
        case RELAYD_UPDATE_SYNC_INFO:
        default:
@@ -3195,7 +3323,34 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
        }
 
        if (rotate_index || !stream->index_file) {
-               ret = create_rotate_index_file(stream);
+               const char *stream_path;
+
+               /*
+                * The data connection creates the stream's first index file.
+                *
+                * This can happen _after_ a ROTATE_STREAM command. In
+                * other words, the data of the first packet of this stream
+                * can be received after a ROTATE_STREAM command.
+                *
+                * The ROTATE_STREAM command changes the stream's path_name
+                * to point to the "next" chunk. If a rotation is pending for
+                * this stream, as indicated by "rotate_at_seq_num != -1ULL",
+                * it means that we are still receiving data that belongs in the
+                * stream's former path.
+                *
+                * In this very specific case, we must ensure that the index
+                * file is created in the streams's former path,
+                * "prev_path_name".
+                *
+                * All other rotations beyond the first one are not affected
+                * by this problem since the actual rotation operation creates
+                * the new chunk's index file.
+                */
+               stream_path = stream->rotate_at_seq_num == -1ULL ?
+                               stream->path_name:
+                               stream->prev_path_name;
+
+               ret = create_rotate_index_file(stream, stream_path);
                if (ret < 0) {
                        ERR("Failed to rotate index");
                        /* Put self-ref for this index due to error. */
@@ -3462,7 +3617,7 @@ static enum relay_connection_status relay_process_data_receive_payload(
        }
 
 
-       if (session->minor >= 4 && !session->snapshot) {
+       if (session_streams_have_index(session)) {
                ret = handle_index_data(stream, state->header.net_seq_num,
                                state->rotate_index, &index_flushed, state->header.data_size + state->header.padding_size);
                if (ret < 0) {
@@ -3477,16 +3632,20 @@ static enum relay_connection_status relay_process_data_receive_payload(
        stream->tracefile_size_current += state->header.data_size +
                        state->header.padding_size;
 
-       if (stream->prev_seq == -1ULL) {
+       if (stream->prev_data_seq == -1ULL) {
                new_stream = true;
        }
        if (index_flushed) {
                stream->pos_after_last_complete_data_index =
                                stream->tracefile_size_current;
                stream->prev_index_seq = state->header.net_seq_num;
+               ret = try_rotate_stream_index(stream);
+               if (ret < 0) {
+                       goto end_stream_unlock;
+               }
        }
 
-       stream->prev_seq = state->header.net_seq_num;
+       stream->prev_data_seq = state->header.net_seq_num;
 
        /*
         * Resetting the protocol state (to RECEIVE_HEADER) will trash the
@@ -3496,7 +3655,7 @@ static enum relay_connection_status relay_process_data_receive_payload(
        connection_reset_protocol_state(conn);
        state = NULL;
 
-       ret = try_rotate_stream(stream);
+       ret = try_rotate_stream_data(stream);
        if (ret < 0) {
                status = RELAY_CONNECTION_STATUS_ERROR;
                goto end_stream_unlock;
@@ -3656,14 +3815,6 @@ restart:
 
                        health_code_update();
 
-                       if (!revents) {
-                               /*
-                                * No activity for this FD (poll
-                                * implementation).
-                                */
-                               continue;
-                       }
-
                        /* Thread quit pipe has been closed. Killing thread. */
                        ret = check_thread_quit_pipe(pollfd, revents);
                        if (ret) {
@@ -3975,6 +4126,13 @@ int main(int argc, char **argv)
                }
        }
 
+       sessiond_trace_chunk_registry = sessiond_trace_chunk_registry_create();
+       if (!sessiond_trace_chunk_registry) {
+               ERR("Failed to initialize session daemon trace chunk registry");
+               retval = -1;
+               goto exit_sessiond_trace_chunk_registry;
+       }
+
        /* Initialize thread health monitoring */
        health_relayd = health_app_create(NR_HEALTH_RELAYD_TYPES);
        if (!health_relayd) {
@@ -4124,7 +4282,9 @@ exit_health_quit_pipe:
 
 exit_init_data:
        health_app_destroy(health_relayd);
+       sessiond_trace_chunk_registry_destroy(sessiond_trace_chunk_registry);
 exit_health_app_create:
+exit_sessiond_trace_chunk_registry:
 exit_options:
        /*
         * Wait for all pending call_rcu work to complete before tearing
This page took 0.038015 seconds and 4 git commands to generate.