X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.c;h=11277f29346e99104cfbedd6453ace99a76493f6;hp=85c4fe9b9aca282ca2d35af5bfa18c910c261b55;hb=e5add6d004793894ef4c7e047bc0f8885763b205;hpb=3aa937ab446c323bec53fc3581c424a3cc5c020e diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index 85c4fe9b9..11277f293 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -2534,8 +2534,6 @@ static int relay_rotate_session_stream(const struct lttcomm_relayd_hdr *recv_hdr size_t path_len; struct lttng_buffer_view new_path_view; - DBG("Rotate stream received"); - if (!session || !conn->version_check_done) { ERR("Trying to rotate a stream before version check"); ret = -1; @@ -2662,6 +2660,199 @@ end_no_reply: return ret; } +static int init_session_output_directory_handle(struct relay_session *session, + struct lttng_directory_handle *handle) +{ + int ret; + /* hostname/session_name */ + char *session_directory = NULL; + /* + * base path + session_directory + * e.g. /home/user/lttng-traces/hostname/session_name + */ + char *full_session_path = NULL; + + pthread_mutex_lock(&session->lock); + ret = asprintf(&session_directory, "%s/%s", session->hostname, + session->session_name); + pthread_mutex_unlock(&session->lock); + if (ret < 0) { + PERROR("Failed to format session directory name"); + goto end; + } + + full_session_path = create_output_path(session_directory); + if (!full_session_path) { + ret = -1; + goto end; + } + + ret = utils_mkdir_recursive( + full_session_path, S_IRWXU | S_IRWXG, -1, -1); + if (ret) { + ERR("Failed to create session output path \"%s\"", + full_session_path); + goto end; + } + + ret = lttng_directory_handle_init(handle, full_session_path); + if (ret) { + goto end; + } +end: + free(session_directory); + free(full_session_path); + return ret; +} + +/* + * relay_create_trace_chunk: create a new trace chunk + */ +static int relay_create_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr, + struct relay_connection *conn, + const struct lttng_buffer_view *payload) +{ + int ret = 0; + ssize_t send_ret; + struct relay_session *session = conn->session; + struct lttcomm_relayd_create_trace_chunk *msg; + struct lttcomm_relayd_generic_reply reply = {}; + struct lttng_buffer_view header_view; + struct lttng_buffer_view chunk_name_view; + struct lttng_trace_chunk *chunk = NULL, *published_chunk = NULL; + enum lttng_error_code reply_code = LTTNG_OK; + enum lttng_trace_chunk_status chunk_status; + struct lttng_directory_handle session_output; + + if (!session || !conn->version_check_done) { + ERR("Trying to create a trace chunk before version check"); + ret = -1; + goto end_no_reply; + } + + if (session->major == 2 && session->minor < 11) { + ERR("Chunk creation command is unsupported before 2.11"); + ret = -1; + goto end_no_reply; + } + + header_view = lttng_buffer_view_from_view(payload, 0, sizeof(*msg)); + if (!header_view.data) { + ERR("Failed to receive payload of chunk creation command"); + ret = -1; + goto end_no_reply; + } + + /* Convert to host endianness. */ + msg = (typeof(msg)) header_view.data; + msg->chunk_id = be64toh(msg->chunk_id); + msg->creation_timestamp = be64toh(msg->creation_timestamp); + msg->override_name_length = be32toh(msg->override_name_length); + + chunk = lttng_trace_chunk_create( + msg->chunk_id, msg->creation_timestamp); + if (!chunk) { + ERR("Failed to create trace chunk in trace chunk creation command"); + ret = -1; + reply_code = LTTNG_ERR_NOMEM; + goto end; + } + + if (msg->override_name_length) { + const char *name; + + chunk_name_view = lttng_buffer_view_from_view(payload, + sizeof(*msg), + msg->override_name_length); + name = chunk_name_view.data; + if (!name || name[msg->override_name_length - 1]) { + ERR("Failed to receive payload of chunk creation command"); + ret = -1; + reply_code = LTTNG_ERR_INVALID; + goto end; + } + + chunk_status = lttng_trace_chunk_override_name( + chunk, chunk_name_view.data); + switch (chunk_status) { + case LTTNG_TRACE_CHUNK_STATUS_OK: + break; + case LTTNG_TRACE_CHUNK_STATUS_INVALID_ARGUMENT: + ERR("Failed to set the name of new trace chunk in trace chunk creation command (invalid name)"); + reply_code = LTTNG_ERR_INVALID; + ret = -1; + goto end; + default: + ERR("Failed to set the name of new trace chunk in trace chunk creation command (unknown error)"); + reply_code = LTTNG_ERR_UNK; + ret = -1; + goto end; + } + } + + ret = init_session_output_directory_handle( + conn->session, &session_output); + if (ret) { + reply_code = LTTNG_ERR_CREATE_DIR_FAIL; + goto end; + } + + chunk_status = lttng_trace_chunk_set_credentials_current_user(chunk); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + reply_code = LTTNG_ERR_UNK; + ret = -1; + goto end; + } + + chunk_status = lttng_trace_chunk_set_as_owner(chunk, &session_output); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + reply_code = LTTNG_ERR_UNK; + ret = -1; + goto end; + } + + published_chunk = sessiond_trace_chunk_registry_publish_chunk( + sessiond_trace_chunk_registry, + conn->session->sessiond_uuid, + conn->session->id, + chunk); + if (!published_chunk) { + char uuid_str[UUID_STR_LEN]; + + lttng_uuid_to_str(conn->session->sessiond_uuid, uuid_str); + ERR("Failed to publish chunk: sessiond_uuid = %s, session_id = %" PRIu64 ", chunk_id = %" PRIu64, + uuid_str, + conn->session->id, + msg->chunk_id); + ret = -1; + reply_code = LTTNG_ERR_NOMEM; + goto end; + } + + pthread_mutex_lock(&conn->session->lock); + lttng_trace_chunk_put(conn->session->current_trace_chunk); + conn->session->current_trace_chunk = published_chunk; + pthread_mutex_unlock(&conn->session->lock); + published_chunk = NULL; + +end: + reply.ret_code = htobe32((uint32_t) reply_code); + send_ret = conn->sock->ops->sendmsg(conn->sock, + &reply, + sizeof(struct lttcomm_relayd_generic_reply), + 0); + if (send_ret < (ssize_t) sizeof(reply)) { + ERR("Failed to send \"create trace chunk\" command reply (ret = %zd)", + send_ret); + ret = -1; + } +end_no_reply: + lttng_trace_chunk_put(chunk); + lttng_trace_chunk_put(published_chunk); + lttng_directory_handle_fini(&session_output); + return ret; +} + #define DBG_CMD(cmd_name, conn) \ DBG3("Processing \"%s\" command for socket %i", cmd_name, conn->sock->fd); @@ -2728,6 +2919,10 @@ static int relay_process_control_command(struct relay_connection *conn, DBG_CMD("RELAYD_ROTATE_STREAM", conn); ret = relay_rotate_session_stream(header, conn, payload); break; + case RELAYD_CREATE_TRACE_CHUNK: + DBG_CMD("RELAYD_CREATE_TRACE_CHUNK", conn); + ret = relay_create_trace_chunk(header, conn, payload); + break; case RELAYD_UPDATE_SYNC_INFO: default: ERR("Received unknown command (%u)", header->cmd);