relayd: create stream files relative to a session's trace chunk
[lttng-tools.git] / src / bin / lttng-relayd / main.c
index db021f43fc5b28a9ac0de322752880a3c9a171c9..dddc2a2b4f77b25c9ba968826460147302f25e0a 100644 (file)
@@ -1095,18 +1095,16 @@ static int relay_create_session(const struct lttcomm_relayd_hdr *recv_hdr,
        int ret = 0;
        ssize_t send_ret;
        struct relay_session *session = NULL;
-       struct lttcomm_relayd_status_session reply;
-       char session_name[LTTNG_NAME_MAX];
-       char hostname[LTTNG_HOST_NAME_MAX];
+       struct lttcomm_relayd_status_session reply = {};
+       char session_name[LTTNG_NAME_MAX] = {};
+       char hostname[LTTNG_HOST_NAME_MAX] = {};
        uint32_t live_timer = 0;
        bool snapshot = false;
        /* Left nil for peers < 2.11. */
        lttng_uuid sessiond_uuid = {};
-
-       memset(session_name, 0, LTTNG_NAME_MAX);
-       memset(hostname, 0, LTTNG_HOST_NAME_MAX);
-
-       memset(&reply, 0, sizeof(reply));
+       LTTNG_OPTIONAL(uint64_t) id_sessiond = {};
+       LTTNG_OPTIONAL(uint64_t) current_chunk_id = {};
+       LTTNG_OPTIONAL(time_t) creation_time = {};
 
        if (conn->minor < 4) {
                /* From 2.1 to 2.3 */
@@ -1116,16 +1114,28 @@ static int relay_create_session(const struct lttcomm_relayd_hdr *recv_hdr,
                ret = cmd_create_session_2_4(payload, session_name,
                        hostname, &live_timer, &snapshot);
        } else {
+               bool has_current_chunk;
+               uint64_t current_chunk_id_value;
+               time_t creation_time_value;
+               uint64_t id_sessiond_value;
+
                /* From 2.11 to ... */
-               ret = cmd_create_session_2_11(payload, session_name,
-                               hostname, &live_timer, &snapshot,
-                               sessiond_uuid);
+               ret = cmd_create_session_2_11(payload, session_name, hostname,
+                               &live_timer, &snapshot, &id_sessiond_value,
+                               sessiond_uuid, &has_current_chunk,
+                               &current_chunk_id_value, &creation_time_value);
                if (lttng_uuid_is_nil(sessiond_uuid)) {
                        /* The nil UUID is reserved for pre-2.11 clients. */
                        ERR("Illegal nil UUID announced by peer in create session command");
                        ret = -1;
                        goto send_reply;
                }
+               LTTNG_OPTIONAL_SET(&id_sessiond, id_sessiond_value);
+               LTTNG_OPTIONAL_SET(&creation_time, creation_time_value);
+               if (has_current_chunk) {
+                       LTTNG_OPTIONAL_SET(&current_chunk_id,
+                                       current_chunk_id_value);
+               }
        }
 
        if (ret < 0) {
@@ -1133,7 +1143,11 @@ static int relay_create_session(const struct lttcomm_relayd_hdr *recv_hdr,
        }
 
        session = session_create(session_name, hostname, live_timer,
-                       snapshot, sessiond_uuid, conn->major, conn->minor);
+                       snapshot, sessiond_uuid,
+                       id_sessiond.is_set ? &id_sessiond.value : NULL,
+                       current_chunk_id.is_set ? &current_chunk_id.value : NULL,
+                       creation_time.is_set ? &creation_time.value : NULL,
+                       conn->major, conn->minor);
        if (!session) {
                ret = -1;
                goto send_reply;
@@ -1144,15 +1158,6 @@ static int relay_create_session(const struct lttcomm_relayd_hdr *recv_hdr,
 
        reply.session_id = htobe64(session->id);
 
-       session->current_trace_chunk =
-                       sessiond_trace_chunk_registry_get_anonymous_chunk(
-                               sessiond_trace_chunk_registry, sessiond_uuid,
-                               session->id,
-                               opt_output_path);
-       if (!session->current_trace_chunk) {
-               ret = -1;
-       }
-
 send_reply:
        if (ret < 0) {
                reply.ret_code = htobe32(LTTNG_ERR_FATAL);
@@ -1202,6 +1207,34 @@ static void publish_connection_local_streams(struct relay_connection *conn)
        pthread_mutex_unlock(&session->lock);
 }
 
+static int conform_channel_path(char *channel_path)
+{
+       int ret = 0;
+
+       if (strstr("../", channel_path)) {
+               ERR("Refusing channel path as it walks up the path hierarchy: \"%s\"",
+                               channel_path);
+               ret = -1;
+               goto end;
+       }
+
+       if (*channel_path == '/') {
+               const size_t len = strlen(channel_path);
+
+               /*
+                * Channel paths from peers prior to 2.11 are expressed as an
+                * absolute path that is, in reality, relative to the relay
+                * daemon's output directory. Remove the leading slash so it
+                * is correctly interpreted as a relative path later on.
+                *
+                * len (and not len - 1) is used to copy the trailing NULL.
+                */
+               bcopy(channel_path + 1, channel_path, len);
+       }
+end:
+       return ret;
+}
+
 /*
  * relay_add_stream: allocate a new stream for a session
  */
@@ -1218,7 +1251,7 @@ static int relay_add_stream(const struct lttcomm_relayd_hdr *recv_hdr,
        uint64_t stream_handle = -1ULL;
        char *path_name = NULL, *channel_name = NULL;
        uint64_t tracefile_size = 0, tracefile_count = 0;
-       struct relay_stream_chunk_id stream_chunk_id = { 0 };
+       LTTNG_OPTIONAL(uint64_t) stream_chunk_id = {};
 
        if (!session || !conn->version_check_done) {
                ERR("Trying to add a stream before version check");
@@ -1246,6 +1279,10 @@ static int relay_add_stream(const struct lttcomm_relayd_hdr *recv_hdr,
                goto send_reply;
        }
 
+       if (conform_channel_path(path_name)) {
+               goto send_reply;
+       }
+
        trace = ctf_trace_get_by_path_or_create(session, path_name);
        if (!trace) {
                goto send_reply;
@@ -1258,8 +1295,7 @@ static int relay_add_stream(const struct lttcomm_relayd_hdr *recv_hdr,
 
        /* We pass ownership of path_name and channel_name. */
        stream = stream_create(trace, stream_handle, path_name,
-               channel_name, tracefile_size, tracefile_count,
-               &stream_chunk_id);
+               channel_name, tracefile_size, tracefile_count);
        path_name = NULL;
        channel_name = NULL;
 
@@ -1555,11 +1591,13 @@ end:
  */
 static
 int create_rotate_index_file(struct relay_stream *stream,
-               const char *stream_path)
+               const char *channel_path)
 {
        int ret;
        uint32_t major, minor;
 
+       ASSERT_LOCKED(stream->lock);
+
        /* Put ref on previous index_file. */
        if (stream->index_file) {
                lttng_index_file_put(stream->index_file);
@@ -1567,12 +1605,26 @@ int create_rotate_index_file(struct relay_stream *stream,
        }
        major = stream->trace->session->major;
        minor = stream->trace->session->minor;
-       stream->index_file = lttng_index_file_create(stream_path,
-                       stream->channel_name,
-                       -1, -1, stream->tracefile_size,
-                       tracefile_array_get_file_index_head(stream->tfa),
+       if (!stream->trace->index_folder_created) {
+               char *index_subpath = NULL;
+
+               ret = asprintf(&index_subpath, "%s/%s", channel_path, DEFAULT_INDEX_DIR);
+               if (ret < 0) {
+                       goto end;
+               }
+
+               ret = lttng_trace_chunk_create_subdirectory(stream->trace_chunk, index_subpath);
+               free(index_subpath);
+               if (ret) {
+                       goto end;
+               }
+               stream->trace->index_folder_created = true;
+       }
+       stream->index_file = lttng_index_file_create_from_trace_chunk(
+                       stream->trace_chunk, channel_path, stream->channel_name,
+                       stream->tracefile_size, stream->tracefile_count,
                        lttng_to_index_major(major, minor),
-                       lttng_to_index_minor(major, minor));
+                       lttng_to_index_minor(major, minor), true);
        if (!stream->index_file) {
                ret = -1;
                goto end;
@@ -2537,8 +2589,6 @@ static int relay_rotate_session_stream(const struct lttcomm_relayd_hdr *recv_hdr
        size_t path_len;
        struct lttng_buffer_view new_path_view;
 
-       DBG("Rotate stream received");
-
        if (!session || !conn->version_check_done) {
                ERR("Trying to rotate a stream before version check");
                ret = -1;
@@ -2615,9 +2665,6 @@ static int relay_rotate_session_stream(const struct lttcomm_relayd_hdr *recv_hdr
                goto end_stream_unlock;
        }
 
-       assert(stream->current_chunk_id.is_set);
-       stream->current_chunk_id.value = stream_info.new_chunk_id;
-
        if (stream->is_metadata) {
                /*
                 * Metadata streams have no index; consider its rotation
@@ -2665,6 +2712,311 @@ end_no_reply:
        return ret;
 }
 
+static int init_session_output_directory_handle(struct relay_session *session,
+               struct lttng_directory_handle *handle)
+{
+       int ret;
+       /* hostname/session_name */
+       char *session_directory = NULL;
+       /*
+        * base path + session_directory
+        * e.g. /home/user/lttng-traces/hostname/session_name
+        */
+       char *full_session_path = NULL;
+       char creation_time_str[16];
+       struct tm *timeinfo;
+
+       assert(session->creation_time.is_set);
+       timeinfo = localtime(&session->creation_time.value);
+       if (!timeinfo) {
+               ret = -1;
+               goto end;
+       }
+       strftime(creation_time_str, sizeof(creation_time_str), "%Y%m%d-%H%M%S",
+                       timeinfo);
+
+       pthread_mutex_lock(&session->lock);
+       ret = asprintf(&session_directory, "%s/%s-%s", session->hostname,
+                       session->session_name, creation_time_str);
+       pthread_mutex_unlock(&session->lock);
+       if (ret < 0) {
+               PERROR("Failed to format session directory name");
+               goto end;
+       }
+
+       full_session_path = create_output_path(session_directory);
+       if (!full_session_path) {
+               ret = -1;
+               goto end;
+       }
+
+       ret = utils_mkdir_recursive(
+                       full_session_path, S_IRWXU | S_IRWXG, -1, -1);
+       if (ret) {
+               ERR("Failed to create session output path \"%s\"",
+                               full_session_path);
+               goto end;
+       }
+
+       ret = lttng_directory_handle_init(handle, full_session_path);
+       if (ret) {
+               goto end;
+       }
+end:
+       free(session_directory);
+       free(full_session_path);
+       return ret;
+}
+
+/*
+ * relay_create_trace_chunk: create a new trace chunk
+ */
+static int relay_create_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr,
+               struct relay_connection *conn,
+               const struct lttng_buffer_view *payload)
+{
+       int ret = 0;
+       ssize_t send_ret;
+       struct relay_session *session = conn->session;
+       struct lttcomm_relayd_create_trace_chunk *msg;
+       struct lttcomm_relayd_generic_reply reply = {};
+       struct lttng_buffer_view header_view;
+       struct lttng_buffer_view chunk_name_view;
+       struct lttng_trace_chunk *chunk = NULL, *published_chunk = NULL;
+       enum lttng_error_code reply_code = LTTNG_OK;
+       enum lttng_trace_chunk_status chunk_status;
+       struct lttng_directory_handle session_output;
+
+       if (!session || !conn->version_check_done) {
+               ERR("Trying to create a trace chunk before version check");
+               ret = -1;
+               goto end_no_reply;
+       }
+
+       if (session->major == 2 && session->minor < 11) {
+               ERR("Chunk creation command is unsupported before 2.11");
+               ret = -1;
+               goto end_no_reply;
+       }
+
+       header_view = lttng_buffer_view_from_view(payload, 0, sizeof(*msg));
+       if (!header_view.data) {
+               ERR("Failed to receive payload of chunk creation command");
+               ret = -1;
+               goto end_no_reply;
+       }
+
+       /* Convert to host endianness. */
+       msg = (typeof(msg)) header_view.data;
+       msg->chunk_id = be64toh(msg->chunk_id);
+       msg->creation_timestamp = be64toh(msg->creation_timestamp);
+       msg->override_name_length = be32toh(msg->override_name_length);
+
+       chunk = lttng_trace_chunk_create(
+                       msg->chunk_id, msg->creation_timestamp);
+       if (!chunk) {
+               ERR("Failed to create trace chunk in trace chunk creation command");
+               ret = -1;
+               reply_code = LTTNG_ERR_NOMEM;
+               goto end;
+       }
+
+       if (msg->override_name_length) {
+               const char *name;
+
+               chunk_name_view = lttng_buffer_view_from_view(payload,
+                               sizeof(*msg),
+                               msg->override_name_length);
+               name = chunk_name_view.data;
+               if (!name || name[msg->override_name_length - 1]) {
+                       ERR("Failed to receive payload of chunk creation command");
+                       ret = -1;
+                       reply_code = LTTNG_ERR_INVALID;
+                       goto end;
+               }
+
+               chunk_status = lttng_trace_chunk_override_name(
+                               chunk, chunk_name_view.data);
+               switch (chunk_status) {
+               case LTTNG_TRACE_CHUNK_STATUS_OK:
+                       break;
+               case LTTNG_TRACE_CHUNK_STATUS_INVALID_ARGUMENT:
+                       ERR("Failed to set the name of new trace chunk in trace chunk creation command (invalid name)");
+                       reply_code = LTTNG_ERR_INVALID;
+                       ret = -1;
+                       goto end;
+               default:
+                       ERR("Failed to set the name of new trace chunk in trace chunk creation command (unknown error)");
+                       reply_code = LTTNG_ERR_UNK;
+                       ret = -1;
+                       goto end;
+               }
+       }
+
+       ret = init_session_output_directory_handle(
+                       conn->session, &session_output);
+       if (ret) {
+               reply_code = LTTNG_ERR_CREATE_DIR_FAIL;
+               goto end;
+       }
+
+       chunk_status = lttng_trace_chunk_set_credentials_current_user(chunk);
+       if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+               reply_code = LTTNG_ERR_UNK;
+               ret = -1;
+               goto end;
+       }
+
+       chunk_status = lttng_trace_chunk_set_as_owner(chunk, &session_output);
+       if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+               reply_code = LTTNG_ERR_UNK;
+               ret = -1;
+               goto end;
+       }
+
+       published_chunk = sessiond_trace_chunk_registry_publish_chunk(
+                       sessiond_trace_chunk_registry,
+                       conn->session->sessiond_uuid,
+                       conn->session->id,
+                       chunk);
+       if (!published_chunk) {
+               char uuid_str[UUID_STR_LEN];
+
+               lttng_uuid_to_str(conn->session->sessiond_uuid, uuid_str);
+               ERR("Failed to publish chunk: sessiond_uuid = %s, session_id = %" PRIu64 ", chunk_id = %" PRIu64,
+                               uuid_str,
+                               conn->session->id,
+                               msg->chunk_id);
+               ret = -1;
+               reply_code = LTTNG_ERR_NOMEM;
+               goto end;
+       }
+
+       pthread_mutex_lock(&conn->session->lock);
+       lttng_trace_chunk_put(conn->session->current_trace_chunk);
+       conn->session->current_trace_chunk = published_chunk;
+       pthread_mutex_unlock(&conn->session->lock);
+       published_chunk = NULL;
+
+end:
+       reply.ret_code = htobe32((uint32_t) reply_code);
+       send_ret = conn->sock->ops->sendmsg(conn->sock,
+                       &reply,
+                       sizeof(struct lttcomm_relayd_generic_reply),
+                       0);
+       if (send_ret < (ssize_t) sizeof(reply)) {
+               ERR("Failed to send \"create trace chunk\" command reply (ret = %zd)",
+                               send_ret);
+               ret = -1;
+       }
+end_no_reply:
+       lttng_trace_chunk_put(chunk);
+       lttng_trace_chunk_put(published_chunk);
+       lttng_directory_handle_fini(&session_output);
+       return ret;
+}
+
+/*
+ * relay_close_trace_chunk: close a trace chunk
+ */
+static int relay_close_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr,
+               struct relay_connection *conn,
+               const struct lttng_buffer_view *payload)
+{
+       int ret = 0;
+       ssize_t send_ret;
+       struct relay_session *session = conn->session;
+       struct lttcomm_relayd_close_trace_chunk *msg;
+       struct lttcomm_relayd_generic_reply reply = {};
+       struct lttng_buffer_view header_view;
+       struct lttng_trace_chunk *chunk = NULL;
+       enum lttng_error_code reply_code = LTTNG_OK;
+       enum lttng_trace_chunk_status chunk_status;
+       uint64_t chunk_id;
+       LTTNG_OPTIONAL(enum lttng_trace_chunk_command_type) close_command;
+       time_t close_timestamp;
+
+       if (!session || !conn->version_check_done) {
+               ERR("Trying to close a trace chunk before version check");
+               ret = -1;
+               goto end_no_reply;
+       }
+
+       if (session->major == 2 && session->minor < 11) {
+               ERR("Chunk close command is unsupported before 2.11");
+               ret = -1;
+               goto end_no_reply;
+       }
+
+       header_view = lttng_buffer_view_from_view(payload, 0, sizeof(*msg));
+       if (!header_view.data) {
+               ERR("Failed to receive payload of chunk close command");
+               ret = -1;
+               goto end_no_reply;
+       }
+
+       /* Convert to host endianness. */
+       msg = (typeof(msg)) header_view.data;
+       chunk_id = be64toh(msg->chunk_id);
+       close_timestamp = (time_t) be64toh(msg->close_timestamp);
+       close_command = (typeof(close_command)){
+               .value = be32toh(msg->close_command.value),
+               .is_set = msg->close_command.is_set,
+       };
+
+       chunk = sessiond_trace_chunk_registry_get_chunk(
+                       sessiond_trace_chunk_registry,
+                       conn->session->sessiond_uuid,
+                       conn->session->id,
+                       chunk_id);
+       if (!chunk) {
+               char uuid_str[UUID_STR_LEN];
+
+               lttng_uuid_to_str(conn->session->sessiond_uuid, uuid_str);
+               ERR("Failed to find chunk to close: sessiond_uuid = %s, session_id = %" PRIu64 ", chunk_id = %" PRIu64,
+                               uuid_str,
+                               conn->session->id,
+                               msg->chunk_id);
+               ret = -1;
+               reply_code = LTTNG_ERR_NOMEM;
+               goto end;
+       }
+
+       chunk_status = lttng_trace_chunk_set_close_timestamp(
+                       chunk, close_timestamp);
+       if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+               ERR("Failed to set trace chunk close timestamp");
+               ret = -1;
+               reply_code = LTTNG_ERR_UNK;
+               goto end;
+       }
+
+       if (close_command.is_set) {
+               chunk_status = lttng_trace_chunk_set_close_command(
+                               chunk, close_command.value);
+               if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+                       ret = -1;
+                       reply_code = LTTNG_ERR_INVALID;
+                       goto end;
+               }
+       }
+
+end:
+       reply.ret_code = htobe32((uint32_t) reply_code);
+       send_ret = conn->sock->ops->sendmsg(conn->sock,
+                       &reply,
+                       sizeof(struct lttcomm_relayd_generic_reply),
+                       0);
+       if (send_ret < (ssize_t) sizeof(reply)) {
+               ERR("Failed to send \"create trace chunk\" command reply (ret = %zd)",
+                               send_ret);
+               ret = -1;
+       }
+end_no_reply:
+       lttng_trace_chunk_put(chunk);
+       return ret;
+}
+
 #define DBG_CMD(cmd_name, conn) \
                DBG3("Processing \"%s\" command for socket %i", cmd_name, conn->sock->fd);
 
@@ -2731,6 +3083,14 @@ static int relay_process_control_command(struct relay_connection *conn,
                DBG_CMD("RELAYD_ROTATE_STREAM", conn);
                ret = relay_rotate_session_stream(header, conn, payload);
                break;
+       case RELAYD_CREATE_TRACE_CHUNK:
+               DBG_CMD("RELAYD_CREATE_TRACE_CHUNK", conn);
+               ret = relay_create_trace_chunk(header, conn, payload);
+               break;
+       case RELAYD_CLOSE_TRACE_CHUNK:
+               DBG_CMD("RELAYD_CLOSE_TRACE_CHUNK", conn);
+               ret = relay_close_trace_chunk(header, conn, payload);
+               break;
        case RELAYD_UPDATE_SYNC_INFO:
        default:
                ERR("Received unknown command (%u)", header->cmd);
This page took 0.02891 seconds and 4 git commands to generate.