From: Jérémie Galarneau Date: Thu, 21 Feb 2019 23:59:31 +0000 (-0500) Subject: Create stream files relative to a stream's current trace chunk X-Git-Tag: v2.12.0-rc1~555 X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=commitdiff_plain;h=d295668767ac8234e83984e1812d342d03293d88 Create stream files relative to a stream's current trace chunk Create stream (and metadata) files relative to a session's current trace chunk using the lttng_trace_chunk_open/unlink[...] functions in the consumer daemons. Four new commands are added to the sessiond <-> consumerd protocol: - CREATE_TRACE_CHUNK Command parameters: - relayd_id: uint64_t Unique ìd of the session's associated relay daemon connection - override_name: optional char[] Overriden chunk name. This field is not used by the consumer daemon; it is forwarded to the relay daemon in order to set the name of a trace chunk's directory when it should not follow the `--` form used by trace archives (i.e. as produced by session rotations). This is used to preserve the existing format of snapshot output directory names. - sessiond_id: uint64_t Unique id of the session (from the sessiond's perspective) to which this trace chunk belongs. - chunk_id: uint64_t Unique id the the session's trace chunk. - credentials: pair of uint32_t uid/gid Credentials the consumer daemon should use in order to create files within the trace chunk. The session daemon maintains the current lttng_trace_chunk of an ltt_session. When a session that has an output (`output_traces` == 1), an lttng_trace_chunk is created. In local tracing modes, the current trace chunk of a session, on the session daemon's end, holds the ownership of the chunk's output directory. The CREATE_TRACE_CHUNK command is used to replicate the session daemon's current trace chunk in the consumer daemon. This representation of the current trace chunk has a different role. It is created in "user" mode. Essentialy, the trace chunk's location is opaque to the consumer daemon; it receives a directory file descriptor from which a number of stream files will be created. The trace chunk registry, as used by the consumer daemon, implicitly owns the trace chunks on behalf of the session daemon. This is only needed in the consumer since the consumer has no notion of a session beyond session IDs being used to identify other objects. When a channel is created, its session_id and initial chunk_id are provided. This allows the consumer daemon to retrieve the session's current trace chunk and associate it with the newly-created channel. The channel holds a reference to its current trace chunk. Streams created from a channel also hold a reference to their current trace chunk, as retrived from their "parent" channel. The life time of trace chunks in the consumer daemon is cooperatively managed with the session daemon. This means session daemon through the LTTNG_CONSUMER_CREATE_TRACE_CHUNK and LTTNG_CONSUMER_CLOSE_TRACE_CHUNK commands. - CLOSE_TRACE_CHUNK [... TODO ...] This command is used to release the global reference to a given trace chunk in the consumer daemon. Relasing the consumer daemon's global reference to the trace chunk leaves only the streams to hold references until the moment when they are either closed or they switch-over to another chunk in the event of a session rotation. - TRACE_CHUNK_EXISTS [... TODO ...] - ADD_TRACE_CHUNK_CLOSE_COMMAND [... TODO ...] This commit changes a lot of code since it essentialy changes how files and directories are created. A number of commands no longer need to specify a `trace_archive_id` since the CREATE_TRACE_CHUNK and CLOSE_TRACE_CHUNK allow the consumer daemon to keep track of the current trace chunk of a channel at any given time. Creation and ownership of channel sub-directories --- The path expressed in consumer channel objects is now relative to the current trace chunk rather than being absolute. For example, the `pathname` of a consumer channel is now of the form `ust/1000/64-bit` rather than containing the full output path `/home/me/lttng-traces/session-[...]/ust/1000/64-bit/`. The subdirectory of a channel (relative to a trace chunk, e.g. `ust/1000/64-bit`) is lazily created when a stream's output files are created. To do so, the `lttng_consumer_channel` now has a `most_recent_chunk_id` attribute. When a stream creates its output files (i.e. at the beginning of a session, or during a session rotation), the stream's current trace chunk `id` is compared to the channel's `most_recent_chunk_id`. If it is determined that the channel is entering a new trace chunk, its channel subdirectory is created relative to the stream's chunk. Since this new state is within the `lttng_consumer_channel`, the channel lock must be held on code paths that may result in the creation of a new set of output files for a given stream. Note that as of this commit, there is now a clear ownership boundary between directories, owned by the session daemon through its trace chunk, and files, owned by the consumer daemon. Down-scoping of channel credentials --- Since files are now created relative to their stream's current trace chunk (which has credentials set), the fewer sites need access to the channel's credentials. The only reason credentials are kept as part of the consumer channel structure is the need to open and unlink UST shared memory mappings. Since the credentials must only be used for this purpose, they are now stored as an `LTTNG_OPTIONAL` field, buffer_credentials, that is only set for UST channels. Stream files should never need those credentials to be created. The following commands sessiond <-> consumerd commands have been removed: - LTTNG_CONSUMER_ROTATE_RENAME - LTTNG_CONSUMER_CHECK_ROTATION_PENDING_LOCAL - LTTNG_CONSUMER_CHECK_ROTATION_PENDING_RELAY - LTTNG_CONSUMER_MKDIR Signed-off-by: Jérémie Galarneau --- diff --git a/include/lttng/lttng-error.h b/include/lttng/lttng-error.h index edd9fc39e..efbd06625 100644 --- a/include/lttng/lttng-error.h +++ b/include/lttng/lttng-error.h @@ -169,6 +169,9 @@ enum lttng_error_code { LTTNG_ERR_CHAN_NOT_FOUND = 146, /* Channel not found */ LTTNG_ERR_SNAPSHOT_UNSUPPORTED = 147, /* Session configuration does not allow the use of snapshots */ LTTNG_ERR_SESSION_NOT_EXIST = 148, /* The session does not exist on the session daemon */ + LTTNG_ERR_CREATE_TRACE_CHUNK_FAIL_CONSUMER = 149, /* trace chunk creation failure on consumer */ + LTTNG_ERR_CLOSE_TRACE_CHUNK_FAIL_CONSUMER = 150, /* trace chunk close failure on consumer */ + LTTNG_ERR_TRACE_CHUNK_EXISTS_FAIL_CONSUMER = 151, /* failed to query consumer for trace chunk existence */ /* MUST be last element */ LTTNG_ERR_NR, /* Last element */ diff --git a/src/bin/lttng-sessiond/cmd.c b/src/bin/lttng-sessiond/cmd.c index 3cf8c518c..b5bde4a9d 100644 --- a/src/bin/lttng-sessiond/cmd.c +++ b/src/bin/lttng-sessiond/cmd.c @@ -22,6 +22,7 @@ #include #include #include +#include #include #include @@ -2509,58 +2510,6 @@ error: return -ret; } -static -int domain_mkdir(const struct consumer_output *output, - const struct ltt_session *session, - uid_t uid, gid_t gid) -{ - struct consumer_socket *socket; - struct lttng_ht_iter iter; - int ret; - char path[LTTNG_PATH_MAX]; - - if (!output || !output->socks) { - ERR("No consumer output found"); - ret = -1; - goto end; - } - - ret = snprintf(path, sizeof(path), "%s/%s%s", - session_get_base_path(session), - output->chunk_path, - output->domain_subdir); - if (ret < 0 || ret >= LTTNG_PATH_MAX) { - ERR("Failed to format path new chunk domain path"); - ret = -1; - goto end; - } - - DBG("Domain mkdir %s for session %" PRIu64, path, session->id); - rcu_read_lock(); - /* - * We have to iterate to find a socket, but we only need to send the - * rename command to one consumer, so we break after the first one. - */ - cds_lfht_for_each_entry(output->socks->ht, &iter.iter, socket, node.node) { - pthread_mutex_lock(socket->lock); - ret = consumer_mkdir(socket, session->id, output, path, uid, gid); - pthread_mutex_unlock(socket->lock); - if (ret) { - ERR("Failed to create directory at \"%s\"", path); - ret = -1; - goto end_unlock; - } - break; - } - - ret = 0; - -end_unlock: - rcu_read_unlock(); -end: - return ret; -} - /* * Command LTTNG_START_TRACE processed by the client thread. * @@ -2600,9 +2549,20 @@ int cmd_start_trace(struct ltt_session *session) goto error; } - if (!session->has_been_started && session->output_traces) { - ret = session_switch_trace_chunk(session, NULL, NULL); - if (ret != LTTNG_OK) { + if (session->output_traces && !session->current_trace_chunk) { + struct lttng_trace_chunk *trace_chunk; + + trace_chunk = session_create_new_trace_chunk( + session, NULL, NULL); + if (!trace_chunk) { + ret = LTTNG_ERR_CREATE_DIR_FAIL; + goto error; + } + assert(!session->current_trace_chunk); + ret = session_set_trace_chunk(session, trace_chunk, NULL); + lttng_trace_chunk_put(trace_chunk); + if (ret) { + ret = LTTNG_ERR_CREATE_TRACE_CHUNK_FAIL_CONSUMER; goto error; } } @@ -3071,26 +3031,13 @@ int cmd_destroy_session(struct ltt_session *session, session->rotate_size = 0; } - if (session->current_archive_id != 0) { - if (!session->rotated_after_last_stop) { - ret = cmd_rotate_session(session, NULL); - if (ret != LTTNG_OK) { - ERR("Failed to perform an implicit rotation as part of the rotation: %s", lttng_strerror(-ret)); - } - } else { - /* - * Rename the active chunk to ensure it has a name - * of the form ts_begin-ts_end-id. - * - * Note that no trace data has been produced since - * the last rotation; the directory should be - * removed. - */ - ret = rename_active_chunk(session); - if (ret) { - ERR("Failed to rename active chunk during the destruction of session \"%s\"", - session->name); - } + if (session->most_recent_chunk_id.is_set && + session->most_recent_chunk_id.value != 0 && + session->current_trace_chunk) { + ret = cmd_rotate_session(session, NULL); + if (ret != LTTNG_OK) { + ERR("Failed to perform an implicit rotation as part of the destruction of session \"%s\": %s", + session->name, lttng_strerror(-ret)); } } @@ -4331,11 +4278,43 @@ int64_t get_session_nb_packets_per_stream(const struct ltt_session *session, } static -enum lttng_error_code snapshot_record(const struct ltt_session *session, +enum lttng_error_code snapshot_record(struct ltt_session *session, const struct snapshot_output *snapshot_output, int wait) { + int fmt_ret; int64_t nb_packets_per_stream; + char snapshot_chunk_name[LTTNG_NAME_MAX]; enum lttng_error_code ret = LTTNG_OK; + struct lttng_trace_chunk *snapshot_trace_chunk; + + fmt_ret = snprintf(snapshot_chunk_name, sizeof(snapshot_chunk_name), + "%s-%s-%" PRIu64, + snapshot_output->name, + snapshot_output->datetime, + snapshot_output->nb_snapshot); + if (fmt_ret < 0 || fmt_ret >= sizeof(snapshot_chunk_name)) { + ERR("Failed to format snapshot name"); + ret = LTTNG_ERR_INVALID; + goto end; + } + DBG("Recording snapshot \"%s\" for session \"%s\" with chunk name \"%s\"", + snapshot_output->name, session->name, + snapshot_chunk_name); + snapshot_trace_chunk = session_create_new_trace_chunk(session, + snapshot_output_get_base_path(snapshot_output), + snapshot_chunk_name); + if (!snapshot_trace_chunk) { + ret = LTTNG_ERR_CREATE_DIR_FAIL; + goto end; + } + assert(!session->current_trace_chunk); + ret = session_set_trace_chunk(session, snapshot_trace_chunk, NULL); + lttng_trace_chunk_put(snapshot_trace_chunk); + snapshot_trace_chunk = NULL; + if (ret) { + ret = LTTNG_ERR_CREATE_TRACE_CHUNK_FAIL_CONSUMER; + goto end; + } nb_packets_per_stream = get_session_nb_packets_per_stream(session, snapshot_output->max_size); @@ -4361,6 +4340,21 @@ enum lttng_error_code snapshot_record(const struct ltt_session *session, goto end; } } + + if (session_close_trace_chunk(session, session->current_trace_chunk)) { + /* + * Don't goto end; make sure the chunk is closed for the session + * to allow future snapshots. + */ + ERR("Failed to close snapshot trace chunk of session \"%s\"", + session->name); + ret = -1; + } + if (session_set_trace_chunk(session, NULL, NULL)) { + ERR("Failed to release the current trace chunk of session \"%s\"", + session->name); + ret = -1; + } end: return ret; } @@ -4525,16 +4519,11 @@ int cmd_rotate_session(struct ltt_session *session, struct lttng_rotate_session_return *rotate_return) { int ret; + uint64_t ongoing_rotation_chunk_id; enum lttng_error_code cmd_ret = LTTNG_OK; - size_t strf_ret; - struct tm *timeinfo; - char datetime[21]; - time_t now; - /* - * Used to roll-back timestamps in case of failure to launch the - * rotation. - */ - time_t original_last_chunk_start_ts, original_current_chunk_start_ts; + struct lttng_trace_chunk *chunk_being_archived = NULL; + struct lttng_trace_chunk *new_trace_chunk = NULL; + enum lttng_trace_chunk_status chunk_status; assert(session); @@ -4548,9 +4537,7 @@ int cmd_rotate_session(struct ltt_session *session, goto end; } - /* - * Unsupported feature in lttng-relayd before 2.11. - */ + /* Unsupported feature in lttng-relayd before 2.11. */ if (session->consumer->type == CONSUMER_DST_NET && (session->consumer->relay_major_version == 2 && session->consumer->relay_minor_version < 11)) { @@ -4576,156 +4563,57 @@ int cmd_rotate_session(struct ltt_session *session, goto end; } - /* Special case for the first rotation. */ - if (session->current_archive_id == 0) { - const char *base_path = NULL; - - assert(session->kernel_session || session->ust_session); - /* Either one of the two sessions is enough to get the root path. */ - base_path = session_get_base_path(session); - assert(base_path); + session->rotation_state = LTTNG_ROTATION_STATE_ONGOING; - ret = lttng_strncpy(session->rotation_chunk.current_rotate_path, - base_path, - sizeof(session->rotation_chunk.current_rotate_path)); - if (ret) { - ERR("Failed to copy session base path to current rotation chunk path"); - cmd_ret = LTTNG_ERR_UNK; - goto end; - } - } else { - /* - * The currently active tracing path is now the folder we - * want to rotate. - */ - ret = lttng_strncpy(session->rotation_chunk.current_rotate_path, - session->rotation_chunk.active_tracing_path, - sizeof(session->rotation_chunk.current_rotate_path)); - if (ret) { - ERR("Failed to copy the active tracing path to the current rotate path"); - cmd_ret = LTTNG_ERR_UNK; - goto end; + if (session->active) { + new_trace_chunk = session_create_new_trace_chunk(session, + NULL, NULL); + if (!new_trace_chunk) { + cmd_ret = LTTNG_ERR_CREATE_DIR_FAIL; + goto error; } - } - DBG("Current rotate path %s", session->rotation_chunk.current_rotate_path); - - /* - * Channels created after this point will belong to the next - * archive id. - */ - session->current_archive_id++; - - now = time(NULL); - if (now == (time_t) -1) { - cmd_ret = LTTNG_ERR_UNK; - goto end; - } - - /* Sample chunk bounds for roll-back in case of error. */ - original_last_chunk_start_ts = session->last_chunk_start_ts; - original_current_chunk_start_ts = session->current_chunk_start_ts; - - session->last_chunk_start_ts = session->current_chunk_start_ts; - session->current_chunk_start_ts = now; - - timeinfo = localtime(&now); - if (!timeinfo) { - PERROR("Failed to sample local time in rotate session command"); - cmd_ret = LTTNG_ERR_UNK; - goto end; - } - strf_ret = strftime(datetime, sizeof(datetime), "%Y%m%dT%H%M%S%z", - timeinfo); - if (!strf_ret) { - ERR("Failed to format local time timestamp in rotate session command"); - cmd_ret = LTTNG_ERR_UNK; - goto end; - } + } - /* Current chunk directory, ex: 20170922-111754-42 */ - ret = snprintf(session->consumer->chunk_path, - sizeof(session->consumer->chunk_path), - "%s-%" PRIu64, datetime, - session->current_archive_id + 1); - if (ret < 0 || ret >= sizeof(session->consumer->chunk_path)) { - ERR("Failed to format the new chunk's directory in rotate session command"); - cmd_ret = LTTNG_ERR_UNK; + /* The current trace chunk becomes the chunk being archived. */ + ret = session_set_trace_chunk(session, new_trace_chunk, + &chunk_being_archived); + if (ret) { + cmd_ret = LTTNG_ERR_CREATE_TRACE_CHUNK_FAIL_CONSUMER; goto error; } - /* - * The active path for the next rotation/destroy. - * Ex: ~/lttng-traces/auto-20170922-111748/20170922-111754-42 - */ - ret = snprintf(session->rotation_chunk.active_tracing_path, - sizeof(session->rotation_chunk.active_tracing_path), - "%s/%s", - session_get_base_path(session), - session->consumer->chunk_path); - if (ret < 0 || ret >= sizeof(session->rotation_chunk.active_tracing_path)) { - ERR("Failed to format active tracing path in rotate session command"); - cmd_ret = LTTNG_ERR_UNK; + assert(chunk_being_archived); + chunk_status = lttng_trace_chunk_get_id(chunk_being_archived, + &ongoing_rotation_chunk_id); + assert(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK); + + chunk_status = lttng_trace_chunk_set_close_command( + chunk_being_archived, + LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + cmd_ret = LTTNG_ERR_FATAL; goto error; } - /* - * A rotation has a local step even if the destination is a relay - * daemon; the buffers must be consumed by the consumer daemon. - */ - session->rotation_pending_local = true; - session->rotation_pending_relay = - session_get_consumer_destination_type(session) == CONSUMER_DST_NET; - session->rotation_state = LTTNG_ROTATION_STATE_ONGOING; - if (session->kernel_session) { - ret = lttng_strncpy( - session->kernel_session->consumer->chunk_path, - session->consumer->chunk_path, - sizeof(session->kernel_session->consumer->chunk_path)); - if (ret) { - ERR("Failed to copy current chunk directory to kernel session"); - cmd_ret = LTTNG_ERR_UNK; - goto error; - } - /* - * Create the new chunk folder, before the rotation begins so we - * don't race with the consumer/tracer activity. - */ - ret = domain_mkdir(session->kernel_session->consumer, session, - session->kernel_session->uid, - session->kernel_session->gid); - if (ret) { - cmd_ret = LTTNG_ERR_CREATE_DIR_FAIL; - goto error; - } cmd_ret = kernel_rotate_session(session); if (cmd_ret != LTTNG_OK) { goto error; } } if (session->ust_session) { - ret = lttng_strncpy( - session->ust_session->consumer->chunk_path, - session->consumer->chunk_path, - sizeof(session->ust_session->consumer->chunk_path)); - if (ret) { - ERR("Failed to copy current chunk directory to userspace session"); - cmd_ret = LTTNG_ERR_UNK; - goto error; - } - ret = domain_mkdir(session->ust_session->consumer, session, - session->ust_session->uid, - session->ust_session->gid); - if (ret) { - cmd_ret = LTTNG_ERR_CREATE_DIR_FAIL; - goto error; - } cmd_ret = ust_app_rotate_session(session); if (cmd_ret != LTTNG_OK) { goto error; } } + ret = session_close_trace_chunk(session, chunk_being_archived); + if (ret) { + cmd_ret = LTTNG_ERR_CLOSE_TRACE_CHUNK_FAIL_CONSUMER; + goto error; + } + ret = timer_session_rotation_pending_check_start(session, DEFAULT_ROTATE_PENDING_TIMER); if (ret) { @@ -4738,13 +4626,15 @@ int cmd_rotate_session(struct ltt_session *session, } if (rotate_return) { - rotate_return->rotation_id = session->current_archive_id; + rotate_return->rotation_id = ongoing_rotation_chunk_id; } + session->chunk_being_archived = chunk_being_archived; + chunk_being_archived = NULL; ret = notification_thread_command_session_rotation_ongoing( notification_thread_handle, session->name, session->uid, session->gid, - session->current_archive_id - 1); + ongoing_rotation_chunk_id); if (ret != LTTNG_OK) { ERR("Failed to notify notification thread that a session rotation is ongoing for session %s", session->name); @@ -4752,15 +4642,15 @@ int cmd_rotate_session(struct ltt_session *session, } DBG("Cmd rotate session %s, archive_id %" PRIu64 " sent", - session->name, session->current_archive_id - 1); + session->name, ongoing_rotation_chunk_id); end: + lttng_trace_chunk_put(new_trace_chunk); + lttng_trace_chunk_put(chunk_being_archived); ret = (cmd_ret == LTTNG_OK) ? cmd_ret : -((int) cmd_ret); return ret; error: - session->last_chunk_start_ts = original_last_chunk_start_ts; - session->current_archive_id = original_current_chunk_start_ts; if (session_reset_rotation_state(session, - LTTNG_ROTATION_STATE_NO_ROTATION)) { + LTTNG_ROTATION_STATE_ERROR)) { ERR("Failed to reset rotation state of session \"%s\"", session->name); } @@ -4772,35 +4662,62 @@ error: * * Check if the session has finished its rotation. * - * Return 0 on success or else a LTTNG_ERR code. + * Return LTTNG_OK on success or else an LTTNG_ERR code. */ int cmd_rotate_get_info(struct ltt_session *session, struct lttng_rotation_get_info_return *info_return, uint64_t rotation_id) { - int ret; - - assert(session); + enum lttng_error_code cmd_ret = LTTNG_OK; + enum lttng_rotation_state rotation_state; DBG("Cmd rotate_get_info session %s, rotation id %" PRIu64, session->name, - session->current_archive_id); + session->most_recent_chunk_id.value); - if (session->current_archive_id != rotation_id) { - info_return->status = (int32_t) LTTNG_ROTATION_STATE_EXPIRED; - ret = LTTNG_OK; - goto end; + if (session->chunk_being_archived) { + enum lttng_trace_chunk_status chunk_status; + uint64_t chunk_id; + + chunk_status = lttng_trace_chunk_get_id( + session->chunk_being_archived, + &chunk_id); + assert(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK); + + rotation_state = rotation_id == chunk_id ? + LTTNG_ROTATION_STATE_ONGOING : + LTTNG_ROTATION_STATE_EXPIRED; + } else { + if (session->last_archived_chunk_id.is_set && + rotation_id != session->last_archived_chunk_id.value) { + rotation_state = LTTNG_ROTATION_STATE_EXPIRED; + } else { + rotation_state = session->rotation_state; + } } - switch (session->rotation_state) { + switch (rotation_state) { + case LTTNG_ROTATION_STATE_NO_ROTATION: + DBG("Reporting that no rotation has occured within the lifetime of session \"%s\"", + session->name); + goto end; + case LTTNG_ROTATION_STATE_EXPIRED: + DBG("Reporting that the rotation state of rotation id %" PRIu64 " of session \"%s\" has expired", + rotation_id, session->name); + break; case LTTNG_ROTATION_STATE_ONGOING: - DBG("Reporting that rotation id %" PRIu64 " of session %s is still pending", + DBG("Reporting that rotation id %" PRIu64 " of session \"%s\" is still pending", rotation_id, session->name); break; case LTTNG_ROTATION_STATE_COMPLETED: { + int fmt_ret; + char *chunk_path; char *current_tracing_path_reply; size_t current_tracing_path_reply_len; + DBG("Reporting that rotation id %" PRIu64 " of session \"%s\" is completed", + rotation_id, session->name); + switch (session_get_consumer_destination_type(session)) { case CONSUMER_DST_LOCAL: current_tracing_path_reply = @@ -4819,13 +4736,13 @@ int cmd_rotate_get_info(struct ltt_session *session, info_return->location.relay.protocol = (int8_t) LTTNG_TRACE_ARCHIVE_LOCATION_RELAY_PROTOCOL_TYPE_TCP; - ret = lttng_strncpy(info_return->location.relay.host, + fmt_ret = lttng_strncpy(info_return->location.relay.host, session_get_net_consumer_hostname(session), sizeof(info_return->location.relay.host)); - if (ret) { - ERR("Failed to host name to rotate_get_info reply"); + if (fmt_ret) { + ERR("Failed to copy host name to rotate_get_info reply"); info_return->status = LTTNG_ROTATION_STATUS_ERROR; - ret = -LTTNG_ERR_UNK; + cmd_ret = LTTNG_ERR_SET_URL; goto end; } @@ -4838,30 +4755,41 @@ int cmd_rotate_get_info(struct ltt_session *session, default: abort(); } - ret = lttng_strncpy(current_tracing_path_reply, - session->rotation_chunk.current_rotate_path, - current_tracing_path_reply_len); - if (ret) { - ERR("Failed to copy current tracing path to rotate_get_info reply"); + fmt_ret = asprintf(&chunk_path, + "%s/" DEFAULT_ARCHIVED_TRACE_CHUNKS_DIRECTORY "/%s", + session_get_base_path(session), + session->last_archived_chunk_name); + if (fmt_ret == -1) { + PERROR("Failed to format the path of the last archived trace chunk"); + info_return->status = LTTNG_ROTATION_STATUS_ERROR; + cmd_ret = LTTNG_ERR_UNK; + goto end; + } + + fmt_ret = lttng_strncpy(current_tracing_path_reply, + chunk_path, current_tracing_path_reply_len); + free(chunk_path); + if (fmt_ret) { + ERR("Failed to copy path of the last archived trace chunk to rotate_get_info reply"); info_return->status = LTTNG_ROTATION_STATUS_ERROR; - ret = -LTTNG_ERR_UNK; + cmd_ret = LTTNG_ERR_UNK; goto end; } break; } case LTTNG_ROTATION_STATE_ERROR: - DBG("Reporting that an error occurred during rotation %" PRIu64 " of session %s", + DBG("Reporting that an error occurred during rotation %" PRIu64 " of session \"%s\"", rotation_id, session->name); break; default: abort(); } - info_return->status = (int32_t) session->rotation_state; - ret = LTTNG_OK; + cmd_ret = LTTNG_OK; end: - return ret; + info_return->status = (int32_t) rotation_state; + return cmd_ret; } /* diff --git a/src/bin/lttng-sessiond/consumer.c b/src/bin/lttng-sessiond/consumer.c index 99b22210d..50d19b074 100644 --- a/src/bin/lttng-sessiond/consumer.c +++ b/src/bin/lttng-sessiond/consumer.c @@ -30,6 +30,7 @@ #include #include #include +#include #include "consumer.h" #include "health-sessiond.h" @@ -870,8 +871,6 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, uint64_t session_id, const char *pathname, const char *name, - uid_t uid, - gid_t gid, uint64_t relayd_id, uint64_t key, unsigned char *uuid, @@ -884,12 +883,36 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, int64_t blocking_timeout, const char *root_shm_path, const char *shm_path, - uint64_t trace_archive_id) + struct lttng_trace_chunk *trace_chunk) { assert(msg); - /* Zeroed structure */ + /* Zeroed structure */ memset(msg, 0, sizeof(struct lttcomm_consumer_msg)); + msg->u.ask_channel.buffer_credentials.uid = UINT32_MAX; + msg->u.ask_channel.buffer_credentials.gid = UINT32_MAX; + + if (monitor) { + assert(trace_chunk); + } + + if (trace_chunk) { + uint64_t chunk_id; + enum lttng_trace_chunk_status chunk_status; + struct lttng_credentials chunk_credentials; + + chunk_status = lttng_trace_chunk_get_id(trace_chunk, &chunk_id); + assert(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK); + LTTNG_OPTIONAL_SET(&msg->u.ask_channel.chunk_id, chunk_id); + + chunk_status = lttng_trace_chunk_get_credentials(trace_chunk, + &chunk_credentials); + assert(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK); + msg->u.ask_channel.buffer_credentials.uid = + chunk_credentials.uid; + msg->u.ask_channel.buffer_credentials.gid = + chunk_credentials.gid; + } msg->cmd_type = LTTNG_CONSUMER_ASK_CHANNEL_CREATION; msg->u.ask_channel.subbuf_size = subbuf_size; @@ -903,8 +926,6 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, msg->u.ask_channel.type = type; msg->u.ask_channel.session_id = session_id; msg->u.ask_channel.session_id_per_pid = session_id_per_pid; - msg->u.ask_channel.uid = uid; - msg->u.ask_channel.gid = gid; msg->u.ask_channel.relayd_id = relayd_id; msg->u.ask_channel.key = key; msg->u.ask_channel.chan_id = chan_id; @@ -913,7 +934,6 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, msg->u.ask_channel.monitor = monitor; msg->u.ask_channel.ust_app_uid = ust_app_uid; msg->u.ask_channel.blocking_timeout = blocking_timeout; - msg->u.ask_channel.trace_archive_id = trace_archive_id; memcpy(msg->u.ask_channel.uuid, uuid, sizeof(msg->u.ask_channel.uuid)); @@ -956,19 +976,27 @@ void consumer_init_add_channel_comm_msg(struct lttcomm_consumer_msg *msg, uint64_t tracefile_count, unsigned int monitor, unsigned int live_timer_interval, - unsigned int monitor_timer_interval) + unsigned int monitor_timer_interval, + struct lttng_trace_chunk *trace_chunk) { assert(msg); /* Zeroed structure */ memset(msg, 0, sizeof(struct lttcomm_consumer_msg)); + if (trace_chunk) { + uint64_t chunk_id; + enum lttng_trace_chunk_status chunk_status; + + chunk_status = lttng_trace_chunk_get_id(trace_chunk, &chunk_id); + assert(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK); + LTTNG_OPTIONAL_SET(&msg->u.channel.chunk_id, chunk_id); + } + /* Send channel */ msg->cmd_type = LTTNG_CONSUMER_ADD_CHANNEL; msg->u.channel.channel_key = channel_key; msg->u.channel.session_id = session_id; - msg->u.channel.uid = uid; - msg->u.channel.gid = gid; msg->u.channel.relayd_id = relayd_id; msg->u.channel.nb_init_streams = nb_init_streams; msg->u.channel.output = output; @@ -993,8 +1021,7 @@ void consumer_init_add_channel_comm_msg(struct lttcomm_consumer_msg *msg, void consumer_init_add_stream_comm_msg(struct lttcomm_consumer_msg *msg, uint64_t channel_key, uint64_t stream_key, - int32_t cpu, - uint64_t trace_archive_id) + int32_t cpu) { assert(msg); @@ -1004,7 +1031,6 @@ void consumer_init_add_stream_comm_msg(struct lttcomm_consumer_msg *msg, msg->u.stream.channel_key = channel_key; msg->u.stream.stream_key = stream_key; msg->u.stream.cpu = cpu; - msg->u.stream.trace_archive_id = trace_archive_id; } void consumer_init_streams_sent_comm_msg(struct lttcomm_consumer_msg *msg, @@ -1420,8 +1446,8 @@ end: */ enum lttng_error_code consumer_snapshot_channel(struct consumer_socket *socket, uint64_t key, const struct snapshot_output *output, int metadata, - uid_t uid, gid_t gid, const char *session_path, int wait, - uint64_t nb_packets_per_stream, uint64_t trace_archive_id) + uid_t uid, gid_t gid, const char *channel_path, int wait, + uint64_t nb_packets_per_stream) { int ret; enum lttng_error_code status = LTTNG_OK; @@ -1438,66 +1464,24 @@ enum lttng_error_code consumer_snapshot_channel(struct consumer_socket *socket, msg.u.snapshot_channel.key = key; msg.u.snapshot_channel.nb_packets_per_stream = nb_packets_per_stream; msg.u.snapshot_channel.metadata = metadata; - msg.u.snapshot_channel.trace_archive_id = trace_archive_id; if (output->consumer->type == CONSUMER_DST_NET) { - msg.u.snapshot_channel.relayd_id = output->consumer->net_seq_index; + msg.u.snapshot_channel.relayd_id = + output->consumer->net_seq_index; msg.u.snapshot_channel.use_relayd = 1; - ret = snprintf(msg.u.snapshot_channel.pathname, - sizeof(msg.u.snapshot_channel.pathname), - "%s/%s/%s-%s-%" PRIu64 "%s", - output->consumer->dst.net.base_dir, - output->consumer->domain_subdir, - output->name, output->datetime, - output->nb_snapshot, - session_path); - if (ret < 0) { - status = LTTNG_ERR_INVALID; - goto error; - } else if (ret >= sizeof(msg.u.snapshot_channel.pathname)) { - ERR("Snapshot path exceeds the maximal allowed length of %zu bytes (%i bytes required) with path \"%s/%s/%s-%s-%" PRIu64 "%s\"", - sizeof(msg.u.snapshot_channel.pathname), - ret, output->consumer->dst.net.base_dir, - output->consumer->domain_subdir, - output->name, output->datetime, - output->nb_snapshot, - session_path); - status = LTTNG_ERR_SNAPSHOT_FAIL; - goto error; - } } else { - ret = snprintf(msg.u.snapshot_channel.pathname, - sizeof(msg.u.snapshot_channel.pathname), - "%s/%s-%s-%" PRIu64 "%s", - output->consumer->dst.session_root_path, - output->name, output->datetime, - output->nb_snapshot, - session_path); - if (ret < 0) { - status = LTTNG_ERR_NOMEM; - goto error; - } else if (ret >= sizeof(msg.u.snapshot_channel.pathname)) { - ERR("Snapshot path exceeds the maximal allowed length of %zu bytes (%i bytes required) with path \"%s/%s-%s-%" PRIu64 "%s\"", - sizeof(msg.u.snapshot_channel.pathname), - ret, output->consumer->dst.session_root_path, - output->name, output->datetime, output->nb_snapshot, - session_path); - status = LTTNG_ERR_SNAPSHOT_FAIL; - goto error; - } - msg.u.snapshot_channel.relayd_id = (uint64_t) -1ULL; - - /* Create directory. Ignore if exist. */ - ret = run_as_mkdir_recursive(msg.u.snapshot_channel.pathname, - S_IRWXU | S_IRWXG, uid, gid); - if (ret < 0) { - if (errno != EEXIST) { - status = LTTNG_ERR_CREATE_DIR_FAIL; - PERROR("Trace directory creation error"); - goto error; - } - } + } + ret = lttng_strncpy(msg.u.snapshot_channel.pathname, + channel_path, + sizeof(msg.u.snapshot_channel.pathname)); + if (ret < 0) { + ERR("Snapshot path exceeds the maximal allowed length of %zu bytes (%zu bytes required) with path \"%s\"", + sizeof(msg.u.snapshot_channel.pathname), + strlen(channel_path), + channel_path); + status = LTTNG_ERR_SNAPSHOT_FAIL; + goto error; } health_code_update(); @@ -1637,8 +1621,6 @@ end: /* * Ask the consumer to rotate a channel. - * domain_path contains "/kernel" for kernel or the complete path for UST - * (ex: /ust/uid/1000/64-bit); * * The new_chunk_id is the session->rotate_count that has been incremented * when the rotation started. On the relay, this allows to keep track in which @@ -1646,8 +1628,7 @@ end: */ int consumer_rotate_channel(struct consumer_socket *socket, uint64_t key, uid_t uid, gid_t gid, struct consumer_output *output, - const char *domain_path, bool is_metadata_channel, - uint64_t new_chunk_id) + bool is_metadata_channel) { int ret; struct lttcomm_consumer_msg msg; @@ -1661,30 +1642,11 @@ int consumer_rotate_channel(struct consumer_socket *socket, uint64_t key, msg.cmd_type = LTTNG_CONSUMER_ROTATE_CHANNEL; msg.u.rotate_channel.key = key; msg.u.rotate_channel.metadata = !!is_metadata_channel; - msg.u.rotate_channel.new_chunk_id = new_chunk_id; if (output->type == CONSUMER_DST_NET) { msg.u.rotate_channel.relayd_id = output->net_seq_index; - ret = snprintf(msg.u.rotate_channel.pathname, - sizeof(msg.u.rotate_channel.pathname), "%s%s%s", - output->dst.net.base_dir, - output->chunk_path, domain_path); - if (ret < 0 || ret >= sizeof(msg.u.rotate_channel.pathname)) { - ERR("Failed to format channel path name when asking consumer to rotate channel"); - ret = -LTTNG_ERR_INVALID; - goto error; - } } else { msg.u.rotate_channel.relayd_id = (uint64_t) -1ULL; - ret = snprintf(msg.u.rotate_channel.pathname, - sizeof(msg.u.rotate_channel.pathname), "%s/%s%s", - output->dst.session_root_path, - output->chunk_path, domain_path); - if (ret < 0 || ret >= sizeof(msg.u.rotate_channel.pathname)) { - ERR("Failed to format channel path name when asking consumer to rotate channel"); - ret = -LTTNG_ERR_INVALID; - goto error; - } } health_code_update(); @@ -1706,55 +1668,22 @@ error: return ret; } -int consumer_rotate_rename(struct consumer_socket *socket, uint64_t session_id, - const struct consumer_output *output, const char *old_path, - const char *new_path, uid_t uid, gid_t gid) +int consumer_init(struct consumer_socket *socket, + const lttng_uuid sessiond_uuid) { int ret; - struct lttcomm_consumer_msg msg; - size_t old_path_length, new_path_length; + struct lttcomm_consumer_msg msg = { + .cmd_type = LTTNG_CONSUMER_INIT, + }; assert(socket); - assert(old_path); - assert(new_path); - - DBG("Consumer rotate rename session %" PRIu64 ", old path = \"%s\", new_path = \"%s\"", - session_id, old_path, new_path); - old_path_length = strlen(old_path); - if (old_path_length >= sizeof(msg.u.rotate_rename.old_path)) { - ERR("consumer_rotate_rename: old path length (%zu bytes) exceeds the maximal length allowed by the consumer protocol (%zu bytes)", - old_path_length + 1, sizeof(msg.u.rotate_rename.old_path)); - ret = -LTTNG_ERR_INVALID; - goto error; - } - - new_path_length = strlen(new_path); - if (new_path_length >= sizeof(msg.u.rotate_rename.new_path)) { - ERR("consumer_rotate_rename: new path length (%zu bytes) exceeds the maximal length allowed by the consumer protocol (%zu bytes)", - new_path_length + 1, sizeof(msg.u.rotate_rename.new_path)); - ret = -LTTNG_ERR_INVALID; - goto error; - } - - memset(&msg, 0, sizeof(msg)); - msg.cmd_type = LTTNG_CONSUMER_ROTATE_RENAME; - msg.u.rotate_rename.session_id = session_id; - msg.u.rotate_rename.uid = uid; - msg.u.rotate_rename.gid = gid; - strcpy(msg.u.rotate_rename.old_path, old_path); - strcpy(msg.u.rotate_rename.new_path, new_path); - - if (output->type == CONSUMER_DST_NET) { - msg.u.rotate_rename.relayd_id = output->net_seq_index; - } else { - msg.u.rotate_rename.relayd_id = -1ULL; - } + DBG("Sending consumer initialization command"); + lttng_uuid_copy(msg.u.init.sessiond_uuid, sessiond_uuid); health_code_update(); ret = consumer_send_msg(socket, &msg); if (ret < 0) { - ret = -LTTNG_ERR_ROTATE_RENAME_FAIL_CONSUMER; goto error; } @@ -1764,132 +1693,203 @@ error: } /* - * Ask the consumer if a rotation is locally pending. Must be called with the - * socket lock held. + * Ask the consumer to create a new chunk for a given session. * - * Return 1 if the rotation is still pending, 0 if finished, a negative value - * on error. + * Called with the consumer socket lock held. */ -int consumer_check_rotation_pending_local(struct consumer_socket *socket, - uint64_t session_id, uint64_t chunk_id) +int consumer_create_trace_chunk(struct consumer_socket *socket, + uint64_t relayd_id, uint64_t session_id, + struct lttng_trace_chunk *chunk) { int ret; - struct lttcomm_consumer_msg msg; - uint32_t pending = 0; + enum lttng_trace_chunk_status chunk_status; + struct lttng_credentials chunk_credentials; + const struct lttng_directory_handle *chunk_directory_handle; + int chunk_dirfd; + const char *chunk_name; + bool chunk_name_overriden; + uint64_t chunk_id; + time_t creation_timestamp; + char creation_timestamp_buffer[ISO8601_STR_LEN]; + const char *creation_timestamp_str = "(none)"; + const bool chunk_has_local_output = relayd_id == -1ULL; + struct lttcomm_consumer_msg msg = { + .cmd_type = LTTNG_CONSUMER_CREATE_TRACE_CHUNK, + .u.create_trace_chunk.session_id = session_id, + }; assert(socket); + assert(chunk); - DBG("Asking consumer to locally check for pending rotation for session %" PRIu64 ", chunk id %" PRIu64, - session_id, chunk_id); - - memset(&msg, 0, sizeof(msg)); - msg.cmd_type = LTTNG_CONSUMER_CHECK_ROTATION_PENDING_LOCAL; - msg.u.check_rotation_pending_local.session_id = session_id; - msg.u.check_rotation_pending_local.chunk_id = chunk_id; + if (relayd_id != -1ULL) { + LTTNG_OPTIONAL_SET(&msg.u.create_trace_chunk.relayd_id, + relayd_id); + } - health_code_update(); - ret = consumer_send_msg(socket, &msg); - if (ret < 0) { - ret = -LTTNG_ERR_ROTATION_PENDING_LOCAL_FAIL_CONSUMER; + chunk_status = lttng_trace_chunk_get_name(chunk, &chunk_name, + &chunk_name_overriden); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK && + chunk_status != LTTNG_TRACE_CHUNK_STATUS_NONE) { + ERR("Failed to get name of trace chunk"); + ret = -LTTNG_ERR_FATAL; goto error; } + if (chunk_name_overriden) { + ret = lttng_strncpy(msg.u.create_trace_chunk.override_name, + chunk_name, + sizeof(msg.u.create_trace_chunk.override_name)); + if (ret) { + ERR("Trace chunk name \"%s\" exceeds the maximal length allowed by the consumer protocol", + chunk_name); + ret = -LTTNG_ERR_FATAL; + goto error; + } + } - ret = consumer_socket_recv(socket, &pending, sizeof(pending)); - if (ret < 0) { + chunk_status = lttng_trace_chunk_get_creation_timestamp(chunk, + &creation_timestamp); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + ret = -LTTNG_ERR_FATAL; goto error; } + msg.u.create_trace_chunk.creation_timestamp = + (uint64_t) creation_timestamp; + /* Only used for logging purposes. */ + ret = time_to_iso8601_str(creation_timestamp, + creation_timestamp_buffer, + sizeof(creation_timestamp_buffer)); + creation_timestamp_str = !ret ? creation_timestamp_buffer : + "(formatting error)"; + + chunk_status = lttng_trace_chunk_get_id(chunk, &chunk_id); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + /* + * Anonymous trace chunks should never be transmitted + * to remote peers (consumerd and relayd). They are used + * internally for backward-compatibility purposes. + */ + ret = -LTTNG_ERR_FATAL; + goto error; + } + msg.u.create_trace_chunk.chunk_id = chunk_id; + /* Only used for logging purposes. */ - ret = pending; - -error: - health_code_update(); - return ret; -} - -/* - * Ask the consumer if a rotation is pending on the relayd. Must be called with - * the socket lock held. - * - * Return 1 if the rotation is still pending, 0 if finished, a negative value - * on error. - */ -int consumer_check_rotation_pending_relay(struct consumer_socket *socket, - const struct consumer_output *output, uint64_t session_id, - uint64_t chunk_id) -{ - int ret; - struct lttcomm_consumer_msg msg; - uint32_t pending = 0; - - assert(socket); + if (chunk_has_local_output) { + chunk_status = lttng_trace_chunk_get_chunk_directory_handle( + chunk, &chunk_directory_handle); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + ret = -LTTNG_ERR_FATAL; + goto error; + } - DBG("Asking consumer to check for pending rotation on relay for session %" PRIu64 ", chunk id %" PRIu64, - session_id, chunk_id); - assert(output->type == CONSUMER_DST_NET); + /* + * This will only compile on platforms that support + * dirfd (POSIX.2008). This is fine as the session daemon + * is only built for such platforms. + * + * The ownership of the chunk directory handle's is maintained + * by the trace chunk. + */ + chunk_dirfd = lttng_directory_handle_get_dirfd( + chunk_directory_handle); + assert(chunk_dirfd >= 0); + } - memset(&msg, 0, sizeof(msg)); - msg.cmd_type = LTTNG_CONSUMER_CHECK_ROTATION_PENDING_RELAY; - msg.u.check_rotation_pending_relay.session_id = session_id; - msg.u.check_rotation_pending_relay.relayd_id = output->net_seq_index; - msg.u.check_rotation_pending_relay.chunk_id = chunk_id; + chunk_status = lttng_trace_chunk_get_credentials(chunk, + &chunk_credentials); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + /* + * Not associating credentials to a sessiond chunk is a fatal + * internal error. + */ + ret = -LTTNG_ERR_FATAL; + goto error; + } + msg.u.create_trace_chunk.credentials.uid = chunk_credentials.uid; + msg.u.create_trace_chunk.credentials.gid = chunk_credentials.gid; + DBG("Sending consumer create trace chunk command: relayd_id = %" PRId64 + ", session_id = %" PRIu64 ", chunk_id = %" PRIu64 + ", creation_timestamp = %s", + relayd_id, session_id, chunk_id, + creation_timestamp_str); health_code_update(); ret = consumer_send_msg(socket, &msg); + health_code_update(); if (ret < 0) { - ret = -LTTNG_ERR_ROTATION_PENDING_RELAY_FAIL_CONSUMER; + ERR("Trace chunk creation error on consumer"); + ret = -LTTNG_ERR_CREATE_TRACE_CHUNK_FAIL_CONSUMER; goto error; } - ret = consumer_socket_recv(socket, &pending, sizeof(pending)); - if (ret < 0) { - goto error; + if (chunk_has_local_output) { + DBG("Sending trace chunk directory fd to consumer"); + health_code_update(); + ret = consumer_send_fds(socket, &chunk_dirfd, 1); + health_code_update(); + if (ret < 0) { + ERR("Trace chunk creation error on consumer"); + ret = -LTTNG_ERR_CREATE_TRACE_CHUNK_FAIL_CONSUMER; + goto error; + } } - - ret = pending; - error: - health_code_update(); return ret; } /* - * Ask the consumer to create a directory. + * Ask the consumer to close a trace chunk for a given session. * * Called with the consumer socket lock held. */ -int consumer_mkdir(struct consumer_socket *socket, uint64_t session_id, - const struct consumer_output *output, const char *path, - uid_t uid, gid_t gid) +int consumer_close_trace_chunk(struct consumer_socket *socket, + uint64_t relayd_id, uint64_t session_id, + struct lttng_trace_chunk *chunk) { int ret; - struct lttcomm_consumer_msg msg; + enum lttng_trace_chunk_status chunk_status; + struct lttcomm_consumer_msg msg = { + .cmd_type = LTTNG_CONSUMER_CLOSE_TRACE_CHUNK, + .u.close_trace_chunk.session_id = session_id, + }; + uint64_t chunk_id; + time_t close_timestamp; assert(socket); - DBG("Consumer mkdir %s in session %" PRIu64, path, session_id); - - memset(&msg, 0, sizeof(msg)); - msg.cmd_type = LTTNG_CONSUMER_MKDIR; - msg.u.mkdir.session_id = session_id; - msg.u.mkdir.uid = uid; - msg.u.mkdir.gid = gid; - ret = snprintf(msg.u.mkdir.path, sizeof(msg.u.mkdir.path), "%s", path); - if (ret < 0 || ret >= sizeof(msg.u.mkdir.path)) { - ERR("Format path"); - ret = -LTTNG_ERR_INVALID; - goto error; + if (relayd_id != -1ULL) { + LTTNG_OPTIONAL_SET(&msg.u.close_trace_chunk.relayd_id, + relayd_id); } - if (output->type == CONSUMER_DST_NET) { - msg.u.mkdir.relayd_id = output->net_seq_index; - } else { - msg.u.mkdir.relayd_id = -1ULL; - } + chunk_status = lttng_trace_chunk_get_id(chunk, &chunk_id); + /* + * Anonymous trace chunks should never be transmitted to remote peers + * (consumerd and relayd). They are used internally for + * backward-compatibility purposes. + */ + assert(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK); + msg.u.close_trace_chunk.chunk_id = chunk_id; + + chunk_status = lttng_trace_chunk_get_close_timestamp(chunk, + &close_timestamp); + /* + * A trace chunk should be closed locally before being closed remotely. + * Otherwise, the close timestamp would never be transmitted to the + * peers. + */ + assert(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK); + msg.u.close_trace_chunk.close_timestamp = (uint64_t) close_timestamp; + + DBG("Sending consumer close trace chunk command: relayd_id = %" PRId64 + ", session_id = %" PRIu64 + ", chunk_id = %" PRIu64, + relayd_id, session_id, chunk_id); health_code_update(); ret = consumer_send_msg(socket, &msg); if (ret < 0) { - ret = -LTTNG_ERR_MKDIR_FAIL_CONSUMER; + ret = -LTTNG_ERR_CLOSE_TRACE_CHUNK_FAIL_CONSUMER; goto error; } @@ -1898,25 +1898,72 @@ error: return ret; } -int consumer_init(struct consumer_socket *socket, - const lttng_uuid sessiond_uuid) +/* + * Ask the consumer if a trace chunk exists. + * + * Called with the consumer socket lock held. + * Returns 0 on success, or a negative value on error. + */ +int consumer_trace_chunk_exists(struct consumer_socket *socket, + uint64_t relayd_id, uint64_t session_id, + struct lttng_trace_chunk *chunk, + enum consumer_trace_chunk_exists_status *result) { int ret; + enum lttng_trace_chunk_status chunk_status; struct lttcomm_consumer_msg msg = { - .cmd_type = LTTNG_CONSUMER_INIT, + .cmd_type = LTTNG_CONSUMER_TRACE_CHUNK_EXISTS, + .u.trace_chunk_exists.session_id = session_id, }; + uint64_t chunk_id; + const char *consumer_reply_str; assert(socket); - DBG("Sending consumer initialization command"); - lttng_uuid_copy(msg.u.init.sessiond_uuid, sessiond_uuid); + if (relayd_id != -1ULL) { + LTTNG_OPTIONAL_SET(&msg.u.trace_chunk_exists.relayd_id, + relayd_id); + } + + chunk_status = lttng_trace_chunk_get_id(chunk, &chunk_id); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + /* + * Anonymous trace chunks should never be transmitted + * to remote peers (consumerd and relayd). They are used + * internally for backward-compatibility purposes. + */ + ret = -LTTNG_ERR_FATAL; + goto error; + } + msg.u.trace_chunk_exists.chunk_id = chunk_id; + + DBG("Sending consumer trace chunk exists command: relayd_id = %" PRId64 + ", session_id = %" PRIu64 + ", chunk_id = %" PRIu64, relayd_id, session_id, chunk_id); health_code_update(); ret = consumer_send_msg(socket, &msg); - if (ret < 0) { + switch (-ret) { + case LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK: + consumer_reply_str = "unknown trace chunk"; + *result = CONSUMER_TRACE_CHUNK_EXISTS_STATUS_UNKNOWN_CHUNK; + break; + case LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_LOCAL: + consumer_reply_str = "trace chunk exists locally"; + *result = CONSUMER_TRACE_CHUNK_EXISTS_STATUS_EXISTS_LOCAL; + break; + case LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_REMOTE: + consumer_reply_str = "trace chunk exists on remote peer"; + *result = CONSUMER_TRACE_CHUNK_EXISTS_STATUS_EXISTS_REMOTE; + break; + default: + ERR("Consumer returned an error from TRACE_CHUNK_EXISTS command"); + ret = -1; goto error; } - + DBG("Consumer reply to TRACE_CHUNK_EXISTS command: %s", + consumer_reply_str); + ret = 0; error: health_code_update(); return ret; diff --git a/src/bin/lttng-sessiond/consumer.h b/src/bin/lttng-sessiond/consumer.h index c0f7fee36..64f95d026 100644 --- a/src/bin/lttng-sessiond/consumer.h +++ b/src/bin/lttng-sessiond/consumer.h @@ -34,6 +34,12 @@ enum consumer_dst_type { CONSUMER_DST_NET, }; +enum consumer_trace_chunk_exists_status { + CONSUMER_TRACE_CHUNK_EXISTS_STATUS_EXISTS_LOCAL, + CONSUMER_TRACE_CHUNK_EXISTS_STATUS_EXISTS_REMOTE, + CONSUMER_TRACE_CHUNK_EXISTS_STATUS_UNKNOWN_CHUNK, +}; + struct consumer_socket { /* * File descriptor. This is just a reference to the consumer data meaning @@ -148,7 +154,7 @@ struct consumer_output { /* * Subdirectory path name used for both local and network - * consumer ("/kernel", "/ust", or empty). + * consumer ("kernel", "ust", or empty). */ char domain_subdir[max(sizeof(DEFAULT_KERNEL_TRACE_DIR), sizeof(DEFAULT_UST_TRACE_DIR))]; @@ -238,8 +244,6 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, uint64_t session_id, const char *pathname, const char *name, - uid_t uid, - gid_t gid, uint64_t relayd_id, uint64_t key, unsigned char *uuid, @@ -252,12 +256,11 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, int64_t blocking_timeout, const char *root_shm_path, const char *shm_path, - uint64_t trace_archive_id); + struct lttng_trace_chunk *trace_chunk); void consumer_init_add_stream_comm_msg(struct lttcomm_consumer_msg *msg, uint64_t channel_key, uint64_t stream_key, - int32_t cpu, - uint64_t trace_archive_id); + int32_t cpu); void consumer_init_streams_sent_comm_msg(struct lttcomm_consumer_msg *msg, enum lttng_consumer_command cmd, uint64_t channel_key, uint64_t net_seq_idx); @@ -276,7 +279,8 @@ void consumer_init_add_channel_comm_msg(struct lttcomm_consumer_msg *msg, uint64_t tracefile_count, unsigned int monitor, unsigned int live_timer_interval, - unsigned int monitor_timer_interval); + unsigned int monitor_timer_interval, + struct lttng_trace_chunk *trace_chunk); int consumer_is_data_pending(uint64_t session_id, struct consumer_output *consumer); int consumer_close_metadata(struct consumer_socket *socket, @@ -296,26 +300,25 @@ int consumer_get_lost_packets(uint64_t session_id, uint64_t channel_key, /* Snapshot command. */ enum lttng_error_code consumer_snapshot_channel(struct consumer_socket *socket, uint64_t key, const struct snapshot_output *output, int metadata, - uid_t uid, gid_t gid, const char *session_path, int wait, - uint64_t nb_packets_per_stream, uint64_t trace_archive_id); + uid_t uid, gid_t gid, const char *channel_path, int wait, + uint64_t nb_packets_per_stream); /* Rotation commands. */ int consumer_rotate_channel(struct consumer_socket *socket, uint64_t key, uid_t uid, gid_t gid, struct consumer_output *output, - const char *domain_path, bool is_metadata_channel, - uint64_t new_chunk_id); -int consumer_rotate_rename(struct consumer_socket *socket, uint64_t session_id, - const struct consumer_output *output, const char *old_path, - const char *new_path, uid_t uid, gid_t gid); -int consumer_check_rotation_pending_local(struct consumer_socket *socket, - uint64_t session_id, uint64_t chunk_id); -int consumer_check_rotation_pending_relay(struct consumer_socket *socket, - const struct consumer_output *output, uint64_t session_id, - uint64_t chunk_id); -int consumer_mkdir(struct consumer_socket *socket, uint64_t session_id, - const struct consumer_output *output, const char *path, - uid_t uid, gid_t gid); + bool is_metadata_channel); int consumer_init(struct consumer_socket *socket, const lttng_uuid sessiond_uuid); +int consumer_create_trace_chunk(struct consumer_socket *socket, + uint64_t relayd_id, uint64_t session_id, + struct lttng_trace_chunk *chunk); +int consumer_close_trace_chunk(struct consumer_socket *socket, + uint64_t relayd_id, uint64_t session_id, + struct lttng_trace_chunk *chunk); +int consumer_trace_chunk_exists(struct consumer_socket *socket, + uint64_t relayd_id, uint64_t session_id, + struct lttng_trace_chunk *chunk, + enum consumer_trace_chunk_exists_status *result); + #endif /* _CONSUMER_H */ diff --git a/src/bin/lttng-sessiond/kernel-consumer.c b/src/bin/lttng-sessiond/kernel-consumer.c index 993e7b95c..236ce0eb6 100644 --- a/src/bin/lttng-sessiond/kernel-consumer.c +++ b/src/bin/lttng-sessiond/kernel-consumer.c @@ -33,8 +33,7 @@ #include "session.h" #include "lttng-sessiond.h" -static char *create_channel_path(struct consumer_output *consumer, - uid_t uid, gid_t gid) +static char *create_channel_path(struct consumer_output *consumer) { int ret; char tmp_path[PATH_MAX]; @@ -44,37 +43,14 @@ static char *create_channel_path(struct consumer_output *consumer, /* Get the right path name destination */ if (consumer->type == CONSUMER_DST_LOCAL) { - /* Set application path to the destination path */ - ret = snprintf(tmp_path, sizeof(tmp_path), "%s%s%s", - consumer->dst.session_root_path, - consumer->chunk_path, - consumer->domain_subdir); - if (ret < 0) { - PERROR("snprintf kernel channel path"); - goto error; - } else if (ret >= sizeof(tmp_path)) { - ERR("Kernel channel path exceeds the maximal allowed length of of %zu bytes (%i bytes required) with path \"%s%s%s\"", - sizeof(tmp_path), ret, - consumer->dst.session_root_path, - consumer->chunk_path, - consumer->domain_subdir); - goto error; - } - pathname = lttng_strndup(tmp_path, sizeof(tmp_path)); + pathname = strdup(consumer->domain_subdir); if (!pathname) { - PERROR("lttng_strndup"); + PERROR("Failed to copy domain subdirectory string %s", + consumer->domain_subdir); goto error; } - - /* Create directory */ - ret = run_as_mkdir_recursive(pathname, S_IRWXU | S_IRWXG, uid, gid); - if (ret < 0) { - if (errno != EEXIST) { - ERR("Trace directory creation error"); - goto error; - } - } - DBG3("Kernel local consumer tracefile path: %s", pathname); + DBG3("Kernel local consumer trace path relative to current trace chunk: \"%s\"", + pathname); } else { /* Network output. */ ret = snprintf(tmp_path, sizeof(tmp_path), "%s%s", @@ -115,12 +91,13 @@ int kernel_consumer_add_channel(struct consumer_socket *sock, unsigned int monitor) { int ret; - char *pathname; + char *pathname = NULL; struct lttcomm_consumer_msg lkm; struct consumer_output *consumer; enum lttng_error_code status; struct ltt_session *session = NULL; struct lttng_channel_extended *channel_attr_extended; + bool is_local_trace; /* Safety net */ assert(channel); @@ -133,19 +110,39 @@ int kernel_consumer_add_channel(struct consumer_socket *sock, DBG("Kernel consumer adding channel %s to kernel consumer", channel->channel->name); + is_local_trace = consumer->net_seq_index == -1ULL; - if (monitor) { - pathname = create_channel_path(consumer, ksession->uid, - ksession->gid); - } else { - /* Empty path. */ - pathname = strdup(""); - } + pathname = create_channel_path(consumer); if (!pathname) { ret = -1; goto error; } + if (is_local_trace && ksession->current_trace_chunk) { + enum lttng_trace_chunk_status chunk_status; + char *pathname_index; + + ret = asprintf(&pathname_index, "%s/" DEFAULT_INDEX_DIR, + pathname); + if (ret < 0) { + ERR("Failed to format channel index directory"); + ret = -1; + goto error; + } + + /* + * Create the index subdirectory which will take care + * of implicitly creating the channel's path. + */ + chunk_status = lttng_trace_chunk_create_subdirectory( + ksession->current_trace_chunk, pathname_index); + free(pathname_index); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + ret = -1; + goto error; + } + } + /* Prep channel message structure */ consumer_init_add_channel_comm_msg(&lkm, channel->key, @@ -162,7 +159,8 @@ int kernel_consumer_add_channel(struct consumer_socket *sock, channel->channel->attr.tracefile_count, monitor, channel->channel->attr.live_timer_interval, - channel_attr_extended->monitor_timer_interval); + channel_attr_extended->monitor_timer_interval, + ksession->current_trace_chunk); health_code_update(); @@ -209,10 +207,8 @@ int kernel_consumer_add_metadata(struct consumer_socket *sock, struct ltt_kernel_session *ksession, unsigned int monitor) { int ret; - char *pathname; struct lttcomm_consumer_msg lkm; struct consumer_output *consumer; - struct ltt_session *session = NULL; rcu_read_lock(); @@ -227,28 +223,11 @@ int kernel_consumer_add_metadata(struct consumer_socket *sock, /* Get consumer output pointer */ consumer = ksession->consumer; - if (monitor) { - pathname = create_channel_path(consumer, - ksession->uid, ksession->gid); - } else { - /* Empty path. */ - pathname = strdup(""); - } - if (!pathname) { - ret = -1; - goto error; - } - - session = session_find_by_id(ksession->id); - assert(session); - assert(pthread_mutex_trylock(&session->lock)); - assert(session_trylock_list()); - /* Prep channel message structure */ consumer_init_add_channel_comm_msg(&lkm, ksession->metadata->key, ksession->id, - pathname, + DEFAULT_KERNEL_TRACE_DIR, ksession->uid, ksession->gid, consumer->net_seq_index, @@ -257,7 +236,7 @@ int kernel_consumer_add_metadata(struct consumer_socket *sock, DEFAULT_KERNEL_CHANNEL_OUTPUT, CONSUMER_CHANNEL_TYPE_METADATA, 0, 0, - monitor, 0, 0); + monitor, 0, 0, ksession->current_trace_chunk); health_code_update(); @@ -272,8 +251,7 @@ int kernel_consumer_add_metadata(struct consumer_socket *sock, consumer_init_add_stream_comm_msg(&lkm, ksession->metadata->key, ksession->metadata_stream_fd, - 0 /* CPU: 0 for metadata. */, - session->current_archive_id); + 0 /* CPU: 0 for metadata. */); health_code_update(); @@ -288,10 +266,6 @@ int kernel_consumer_add_metadata(struct consumer_socket *sock, error: rcu_read_unlock(); - free(pathname); - if (session) { - session_put(session); - } return ret; } @@ -302,8 +276,7 @@ static int kernel_consumer_add_stream(struct consumer_socket *sock, struct ltt_kernel_channel *channel, struct ltt_kernel_stream *stream, - struct ltt_kernel_session *session, unsigned int monitor, - uint64_t trace_archive_id) + struct ltt_kernel_session *session, unsigned int monitor) { int ret; struct lttcomm_consumer_msg lkm; @@ -325,8 +298,7 @@ int kernel_consumer_add_stream(struct consumer_socket *sock, consumer_init_add_stream_comm_msg(&lkm, channel->key, stream->fd, - stream->cpu, - trace_archive_id); + stream->cpu); health_code_update(); @@ -387,7 +359,6 @@ int kernel_consumer_send_channel_streams(struct consumer_socket *sock, { int ret = LTTNG_OK; struct ltt_kernel_stream *stream; - struct ltt_session *session = NULL; /* Safety net */ assert(channel); @@ -397,11 +368,6 @@ int kernel_consumer_send_channel_streams(struct consumer_socket *sock, rcu_read_lock(); - session = session_find_by_id(ksession->id); - assert(session); - assert(pthread_mutex_trylock(&session->lock)); - assert(session_trylock_list()); - /* Bail out if consumer is disabled */ if (!ksession->consumer->enabled) { ret = LTTNG_OK; @@ -427,7 +393,7 @@ int kernel_consumer_send_channel_streams(struct consumer_socket *sock, /* Add stream on the kernel consumer side. */ ret = kernel_consumer_add_stream(sock, channel, stream, - ksession, monitor, session->current_archive_id); + ksession, monitor); if (ret < 0) { goto error; } @@ -436,9 +402,6 @@ int kernel_consumer_send_channel_streams(struct consumer_socket *sock, error: rcu_read_unlock(); - if (session) { - session_put(session); - } return ret; } diff --git a/src/bin/lttng-sessiond/kernel.c b/src/bin/lttng-sessiond/kernel.c index 9268a2779..c395f4255 100644 --- a/src/bin/lttng-sessiond/kernel.c +++ b/src/bin/lttng-sessiond/kernel.c @@ -1250,8 +1250,6 @@ enum lttng_error_code kernel_snapshot_record( struct consumer_socket *socket; struct lttng_ht_iter iter; struct ltt_kernel_metadata *saved_metadata; - struct ltt_session *session = NULL; - uint64_t trace_archive_id; assert(ksess); assert(ksess->consumer); @@ -1259,12 +1257,6 @@ enum lttng_error_code kernel_snapshot_record( DBG("Kernel snapshot record started"); - session = session_find_by_id(ksess->id); - assert(session); - assert(pthread_mutex_trylock(&session->lock)); - assert(session_trylock_list()); - trace_archive_id = session->current_archive_id; - /* Save current metadata since the following calls will change it. */ saved_metadata = ksess->metadata; saved_metadata_fd = ksess->metadata_stream_fd; @@ -1312,8 +1304,7 @@ enum lttng_error_code kernel_snapshot_record( status = consumer_snapshot_channel(socket, chan->key, output, 0, ksess->uid, ksess->gid, DEFAULT_KERNEL_TRACE_DIR, wait, - nb_packets_per_stream, - trace_archive_id); + nb_packets_per_stream); if (status != LTTNG_OK) { (void) kernel_consumer_destroy_metadata(socket, ksess->metadata); @@ -1324,8 +1315,7 @@ enum lttng_error_code kernel_snapshot_record( /* Snapshot metadata, */ status = consumer_snapshot_channel(socket, ksess->metadata->key, output, 1, ksess->uid, ksess->gid, - DEFAULT_KERNEL_TRACE_DIR, wait, 0, - trace_archive_id); + DEFAULT_KERNEL_TRACE_DIR, wait, 0); if (status != LTTNG_OK) { goto error_consumer; } @@ -1350,9 +1340,6 @@ error: /* Restore metadata state.*/ ksess->metadata = saved_metadata; ksess->metadata_stream_fd = saved_metadata_fd; - if (session) { - session_put(session); - } rcu_read_unlock(); return status; } @@ -1432,15 +1419,13 @@ enum lttng_error_code kernel_rotate_session(struct ltt_session *session) socket, node.node) { struct ltt_kernel_channel *chan; - /* For each channel, ask the consumer to rotate it. */ + /* For each channel, ask the consumer to rotate it. */ cds_list_for_each_entry(chan, &ksess->channel_list.head, list) { DBG("Rotate kernel channel %" PRIu64 ", session %s", chan->key, session->name); ret = consumer_rotate_channel(socket, chan->key, ksess->uid, ksess->gid, ksess->consumer, - ksess->consumer->domain_subdir, - /* is_metadata_channel */ false, - session->current_archive_id); + /* is_metadata_channel */ false); if (ret < 0) { status = LTTNG_ERR_KERN_CONSUMER_FAIL; goto error; @@ -1452,9 +1437,7 @@ enum lttng_error_code kernel_rotate_session(struct ltt_session *session) */ ret = consumer_rotate_channel(socket, ksess->metadata->key, ksess->uid, ksess->gid, ksess->consumer, - ksess->consumer->domain_subdir, - /* is_metadata_channel */ true, - session->current_archive_id); + /* is_metadata_channel */ true); if (ret < 0) { status = LTTNG_ERR_KERN_CONSUMER_FAIL; goto error; @@ -1465,3 +1448,28 @@ error: rcu_read_unlock(); return status; } + +enum lttng_error_code kernel_create_channel_subdirectories( + const struct ltt_kernel_session *ksess) +{ + enum lttng_error_code ret = LTTNG_OK; + enum lttng_trace_chunk_status chunk_status; + + rcu_read_lock(); + assert(ksess->current_trace_chunk); + + /* + * Create the index subdirectory which will take care + * of implicitly creating the channel's path. + */ + chunk_status = lttng_trace_chunk_create_subdirectory( + ksess->current_trace_chunk, + DEFAULT_KERNEL_TRACE_DIR "/" DEFAULT_INDEX_DIR); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + ret = LTTNG_ERR_CREATE_DIR_FAIL; + goto error; + } +error: + rcu_read_unlock(); + return ret; +} diff --git a/src/bin/lttng-sessiond/kernel.h b/src/bin/lttng-sessiond/kernel.h index 5fcd7e4c5..e62cfecd6 100644 --- a/src/bin/lttng-sessiond/kernel.h +++ b/src/bin/lttng-sessiond/kernel.h @@ -71,4 +71,7 @@ ssize_t kernel_list_tracker_pids(struct ltt_kernel_session *session, int **_pids); int kernel_supports_ring_buffer_snapshot_sample_positions(int tracer_fd); +enum lttng_error_code kernel_create_channel_subdirectories( + const struct ltt_kernel_session *ksess); + #endif /* _LTT_KERNEL_CTL_H */ diff --git a/src/bin/lttng-sessiond/rotate.c b/src/bin/lttng-sessiond/rotate.c index f4127c968..c039bf413 100644 --- a/src/bin/lttng-sessiond/rotate.c +++ b/src/bin/lttng-sessiond/rotate.c @@ -49,305 +49,6 @@ #include #include -/* The session's lock must be held by the caller. */ -static -int session_rename_chunk(struct ltt_session *session, char *current_path, - char *new_path) -{ - int ret; - struct consumer_socket *socket; - struct consumer_output *output; - struct lttng_ht_iter iter; - uid_t uid; - gid_t gid; - - DBG("Renaming session chunk path of session \"%s\" from %s to %s", - session->name, current_path, new_path); - - /* - * Either one of the sessions is enough to find the consumer_output - * and uid/gid. - */ - if (session->kernel_session) { - output = session->kernel_session->consumer; - uid = session->kernel_session->uid; - gid = session->kernel_session->gid; - } else if (session->ust_session) { - output = session->ust_session->consumer; - uid = session->ust_session->uid; - gid = session->ust_session->gid; - } else { - assert(0); - } - - if (!output || !output->socks) { - ERR("No consumer output found for session \"%s\"", - session->name); - ret = -1; - goto end; - } - - rcu_read_lock(); - /* - * We have to iterate to find a socket, but we only need to send the - * rename command to one consumer, so we break after the first one. - */ - cds_lfht_for_each_entry(output->socks->ht, &iter.iter, socket, node.node) { - pthread_mutex_lock(socket->lock); - ret = consumer_rotate_rename(socket, session->id, output, - current_path, new_path, uid, gid); - pthread_mutex_unlock(socket->lock); - if (ret) { - ret = -1; - goto end_unlock; - } - break; - } - - ret = 0; - -end_unlock: - rcu_read_unlock(); -end: - return ret; -} - -/* The session's lock must be held by the caller. */ -static -int rename_first_chunk(struct ltt_session *session, - struct consumer_output *consumer, char *new_path) -{ - int ret; - char current_full_path[LTTNG_PATH_MAX], new_full_path[LTTNG_PATH_MAX]; - - if (session->net_handle > 0) { - /* - * Current domain path: - * HOSTNAME/{SESSION-[TIMESTAMP], USER_DIRECTORY}/DOMAIN - */ - ret = snprintf(current_full_path, sizeof(current_full_path), - "%s%s", - consumer->dst.net.base_dir, - consumer->domain_subdir); - if (ret < 0 || ret >= sizeof(current_full_path)) { - ERR("Failed to initialize current full path while renaming first rotation chunk of session \"%s\"", - session->name); - ret = -LTTNG_ERR_UNK; - goto error; - } - } else { - /* - * Current domain path: - * SESSION_OUTPUT_PATH/DOMAIN - */ - ret = snprintf(current_full_path, sizeof(current_full_path), - "%s/%s", - consumer->dst.session_root_path, - consumer->domain_subdir); - if (ret < 0 || ret >= sizeof(current_full_path)) { - ERR("Failed to initialize current full path while renaming first rotation chunk of session \"%s\"", - session->name); - ret = -LTTNG_ERR_UNK; - goto error; - } - } - /* - * New domain path: - * SESSION_BASE_PATH/_-INDEX/DOMAIN - */ - ret = snprintf(new_full_path, sizeof(new_full_path), "%s/%s", - new_path, consumer->domain_subdir); - if (ret < 0 || ret >= sizeof(new_full_path)) { - ERR("Failed to initialize new full path while renaming first rotation chunk of session \"%s\"", - session->name); - ret = -LTTNG_ERR_UNK; - goto error; - } - /* Move the per-domain inside the first rotation chunk path. */ - ret = session_rename_chunk(session, current_full_path, new_full_path); - if (ret < 0) { - ret = -LTTNG_ERR_UNK; - goto error; - } - - ret = 0; - -error: - return ret; -} - -/* - * Rename a chunk folder after a rotation is complete. - * session_lock_list and session lock must be held. - * - * Returns 0 on success, a negative value on error. - */ -int rename_completed_chunk(struct ltt_session *session, time_t end_ts) -{ - int ret; - size_t strf_ret; - struct tm *timeinfo; - char new_path[LTTNG_PATH_MAX]; - char start_datetime[21], end_datetime[21]; - - DBG("Renaming completed chunk for session %s", session->name); - - /* Format chunk start time. */ - timeinfo = localtime(&session->last_chunk_start_ts); - if (!timeinfo) { - ERR("Failed to separate local time while renaming completed chunk"); - ret = -1; - goto end; - } - strf_ret = strftime(start_datetime, sizeof(start_datetime), - "%Y%m%dT%H%M%S%z", timeinfo); - if (strf_ret == 0) { - ERR("Failed to format timestamp while renaming completed session chunk"); - ret = -1; - goto end; - } - - /* Format chunk end time. */ - timeinfo = localtime(&end_ts); - if (!timeinfo) { - ERR("Failed to parse time while renaming completed chunk"); - ret = -1; - goto end; - } - strf_ret = strftime(end_datetime, sizeof(end_datetime), - "%Y%m%dT%H%M%S%z", timeinfo); - if (strf_ret == 0) { - ERR("Failed to format timestamp while renaming completed session chunk"); - ret = -1; - goto end; - } - - /* Format completed chunk's path. */ - ret = snprintf(new_path, sizeof(new_path), "%s/archives/%s-%s-%" PRIu64, - session_get_base_path(session), - start_datetime, end_datetime, - session->current_archive_id); - if (ret < 0 || ret >= sizeof(new_path)) { - ERR("Failed to format new chunk path while renaming chunk of session \"%s\"", - session->name); - ret = -1; - goto error; - } - - if (session->current_archive_id == 1) { - /* - * On the first rotation, the current_rotate_path is the - * session_root_path, so we need to create the chunk folder - * and move the domain-specific folders inside it. - */ - if (session->kernel_session) { - ret = rename_first_chunk(session, - session->kernel_session->consumer, - new_path); - if (ret) { - ERR("Failed to rename kernel session trace folder to \"%s\"", new_path); - /* - * This is not a fatal error for the rotation - * thread, we just need to inform the client - * that a problem occurred with the rotation. - * Returning 0, same for the other errors - * below. - */ - ret = 0; - goto error; - } - } - if (session->ust_session) { - ret = rename_first_chunk(session, - session->ust_session->consumer, - new_path); - if (ret) { - ERR("Failed to rename userspace session trace folder to \"%s\"", new_path); - ret = 0; - goto error; - } - } - } else { - /* - * After the first rotation, all the trace data is already in - * its own chunk folder, we just need to append the suffix. - */ - ret = session_rename_chunk(session, - session->rotation_chunk.current_rotate_path, - new_path); - if (ret) { - ERR("Failed to rename session trace folder from \"%s\" to \"%s\"", - session->rotation_chunk.current_rotate_path, - new_path); - ret = 0; - goto error; - } - } - - /* - * Store the path where the readable chunk is. This path is valid - * and can be queried by the client with rotate_pending until the next - * rotation is started. - */ - ret = lttng_strncpy(session->rotation_chunk.current_rotate_path, - new_path, - sizeof(session->rotation_chunk.current_rotate_path)); - if (ret) { - ERR("Failed the current chunk's path of session \"%s\"", - session->name); - ret = -1; - goto error; - } - - goto end; - -error: - session->rotation_state = LTTNG_ROTATION_STATE_ERROR; -end: - return ret; -} - -int rename_active_chunk(struct ltt_session *session) -{ - int ret; - - session->current_archive_id++; - - /* - * The currently active tracing path is now the folder we - * want to rename. - */ - ret = lttng_strncpy(session->rotation_chunk.current_rotate_path, - session->rotation_chunk.active_tracing_path, - sizeof(session->rotation_chunk.current_rotate_path)); - if (ret) { - ERR("Failed to copy active tracing path"); - goto end; - } - - ret = rename_completed_chunk(session, time(NULL)); - if (ret < 0) { - ERR("Failed to rename current rotation's path"); - goto end; - } - - /* - * We just renamed, the folder, we didn't do an actual rotation, so - * the active tracing path is now the renamed folder and we have to - * restore the rotate count. - */ - ret = lttng_strncpy(session->rotation_chunk.active_tracing_path, - session->rotation_chunk.current_rotate_path, - sizeof(session->rotation_chunk.active_tracing_path)); - if (ret) { - ERR("Failed to rename active session chunk tracing path"); - goto end; - } -end: - session->current_archive_id--; - return ret; -} - int subscribe_session_consumed_size_rotation(struct ltt_session *session, uint64_t size, struct notification_thread_handle *notification_thread_handle) { diff --git a/src/bin/lttng-sessiond/rotate.h b/src/bin/lttng-sessiond/rotate.h index 281213080..2d8c3ebb5 100644 --- a/src/bin/lttng-sessiond/rotate.h +++ b/src/bin/lttng-sessiond/rotate.h @@ -22,9 +22,6 @@ #include "rotation-thread.h" #include -int rename_active_chunk(struct ltt_session *session); -int rename_completed_chunk(struct ltt_session *session, time_t ts); - /* * Subscribe/unsubscribe the notification_channel from the rotation_thread to * session usage notifications to perform size-based rotations. diff --git a/src/bin/lttng-sessiond/rotation-thread.c b/src/bin/lttng-sessiond/rotation-thread.c index 6669372dd..aaa212941 100644 --- a/src/bin/lttng-sessiond/rotation-thread.c +++ b/src/bin/lttng-sessiond/rotation-thread.c @@ -351,74 +351,51 @@ end: } static -int check_session_rotation_pending_local_on_consumer( - const struct ltt_session *session, - struct consumer_socket *socket, bool *rotation_completed) -{ - int ret; - - pthread_mutex_lock(socket->lock); - DBG("[rotation-thread] Checking for locally pending rotation on the %s consumer for session %s", - lttng_consumer_type_str(socket->type), - session->name); - ret = consumer_check_rotation_pending_local(socket, - session->id, - session->current_archive_id - 1); - pthread_mutex_unlock(socket->lock); - - if (ret == 0) { - /* Rotation was completed on this consumer. */ - DBG("[rotation-thread] Local rotation of trace archive %" PRIu64 " of session \"%s\" was completed on the %s consumer", - session->current_archive_id - 1, - session->name, - lttng_consumer_type_str(socket->type)); - *rotation_completed = true; - } else if (ret == 1) { - /* Rotation pending on this consumer. */ - DBG("[rotation-thread] Local rotation of trace archive %" PRIu64 " of session \"%s\" is pending on the %s consumer", - session->current_archive_id - 1, - session->name, - lttng_consumer_type_str(socket->type)); - *rotation_completed = false; - ret = 0; - } else { - /* Not a fatal error. */ - ERR("[rotation-thread] Encountered an error when checking if local rotation of trace archive %" PRIu64 " of session \"%s\" is pending on the %s consumer", - session->current_archive_id - 1, - session->name, - lttng_consumer_type_str(socket->type)); - *rotation_completed = false; - } - return ret; -} - -static -int check_session_rotation_pending_local(struct ltt_session *session) +void check_session_rotation_pending_on_consumers(struct ltt_session *session, + bool *_rotation_completed) { int ret = 0; struct consumer_socket *socket; struct cds_lfht_iter iter; - bool rotation_completed = true; + enum consumer_trace_chunk_exists_status exists_status; + uint64_t relayd_id; + bool chunk_exists_on_peer = false; + enum lttng_trace_chunk_status chunk_status; + + assert(session->chunk_being_archived); /* * Check for a local pending rotation on all consumers (32-bit * user space, 64-bit user space, and kernel). */ - DBG("[rotation-thread] Checking for pending local rotation on session \"%s\", trace archive %" PRIu64, - session->name, session->current_archive_id - 1); - rcu_read_lock(); if (!session->ust_session) { goto skip_ust; } cds_lfht_for_each_entry(session->ust_session->consumer->socks->ht, &iter, socket, node.node) { - ret = check_session_rotation_pending_local_on_consumer(session, - socket, &rotation_completed); - if (ret || !rotation_completed) { + relayd_id = session->ust_session->consumer->type == CONSUMER_DST_LOCAL ? + -1ULL : + session->ust_session->consumer->net_seq_index; + + pthread_mutex_lock(socket->lock); + ret = consumer_trace_chunk_exists(socket, + relayd_id, + session->id, session->chunk_being_archived, + &exists_status); + if (ret) { + pthread_mutex_unlock(socket->lock); + ERR("Error occured while checking rotation status on consumer daemon"); goto end; } - } + + if (exists_status != CONSUMER_TRACE_CHUNK_EXISTS_STATUS_UNKNOWN_CHUNK) { + pthread_mutex_unlock(socket->lock); + chunk_exists_on_peer = true; + goto end; + } + pthread_mutex_unlock(socket->lock); + } skip_ust: if (!session->kernel_session) { @@ -426,22 +403,44 @@ skip_ust: } cds_lfht_for_each_entry(session->kernel_session->consumer->socks->ht, &iter, socket, node.node) { - ret = check_session_rotation_pending_local_on_consumer(session, - socket, &rotation_completed); - if (ret || !rotation_completed) { + pthread_mutex_lock(socket->lock); + relayd_id = session->kernel_session->consumer->type == CONSUMER_DST_LOCAL ? + -1ULL : + session->kernel_session->consumer->net_seq_index; + + ret = consumer_trace_chunk_exists(socket, + relayd_id, + session->id, session->chunk_being_archived, + &exists_status); + if (ret) { + pthread_mutex_unlock(socket->lock); + ERR("Error occured while checking rotation status on consumer daemon"); goto end; } + + if (exists_status != CONSUMER_TRACE_CHUNK_EXISTS_STATUS_UNKNOWN_CHUNK) { + pthread_mutex_unlock(socket->lock); + chunk_exists_on_peer = true; + goto end; + } + pthread_mutex_unlock(socket->lock); } skip_kernel: end: rcu_read_unlock(); - if (rotation_completed) { - DBG("[rotation-thread] Local rotation of trace archive %" PRIu64 " of session \"%s\" is complete on all consumers", - session->current_archive_id - 1, + if (!chunk_exists_on_peer) { + uint64_t chunk_being_archived_id; + + chunk_status = lttng_trace_chunk_get_id( + session->chunk_being_archived, + &chunk_being_archived_id); + assert(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK); + DBG("[rotation-thread] Rotation of trace archive %" PRIu64 " of session \"%s\" is complete on all consumers", + chunk_being_archived_id, session->name); - session->rotation_pending_local = false; } + *_rotation_completed = !chunk_exists_on_peer; if (ret) { ret = session_reset_rotation_state(session, LTTNG_ROTATION_STATE_ERROR); @@ -450,86 +449,12 @@ end: session->name); } } - return 0; -} - -static -int check_session_rotation_pending_relay(struct ltt_session *session) -{ - int ret; - struct consumer_socket *socket; - struct cds_lfht_iter iter; - bool rotation_completed = true; - const struct consumer_output *output; - - /* - * Check for a pending rotation on any consumer as we only use - * it as a "tunnel" to the relayd. - */ - - rcu_read_lock(); - if (session->ust_session) { - cds_lfht_first(session->ust_session->consumer->socks->ht, - &iter); - output = session->ust_session->consumer; - } else { - cds_lfht_first(session->kernel_session->consumer->socks->ht, - &iter); - output = session->kernel_session->consumer; - } - assert(cds_lfht_iter_get_node(&iter)); - - socket = caa_container_of(cds_lfht_iter_get_node(&iter), - typeof(*socket), node.node); - - pthread_mutex_lock(socket->lock); - DBG("[rotation-thread] Checking for pending relay rotation on session \"%s\", trace archive %" PRIu64 " through the %s consumer", - session->name, session->current_archive_id - 1, - lttng_consumer_type_str(socket->type)); - ret = consumer_check_rotation_pending_relay(socket, - output, - session->id, - session->current_archive_id - 1); - pthread_mutex_unlock(socket->lock); - - if (ret == 0) { - /* Rotation was completed on the relay. */ - DBG("[rotation-thread] Relay rotation of trace archive %" PRIu64 " of session \"%s\" was completed", - session->current_archive_id - 1, - session->name); - } else if (ret == 1) { - /* Rotation pending on relay. */ - DBG("[rotation-thread] Relay rotation of trace archive %" PRIu64 " of session \"%s\" is pending", - session->current_archive_id - 1, - session->name); - rotation_completed = false; - } else { - /* Not a fatal error. */ - ERR("[rotation-thread] Encountered an error when checking if rotation of trace archive %" PRIu64 " of session \"%s\" is pending on the relay", - session->current_archive_id - 1, - session->name); - ret = session_reset_rotation_state(session, - LTTNG_ROTATION_STATE_ERROR); - if (ret) { - ERR("Failed to reset rotation state of session \"%s\"", - session->name); - } - rotation_completed = false; - } - - rcu_read_unlock(); - - if (rotation_completed) { - DBG("[rotation-thread] Rotation of trace archive %" PRIu64 " of session \"%s\" is complete on the relay", - session->current_archive_id - 1, - session->name); - session->rotation_pending_relay = false; - } - return 0; } /* * Check if the last rotation was completed, called with session lock held. + * Should only return non-zero in the event of a fatal error. Doing so will + * shutdown the thread. */ static int check_session_rotation_pending(struct ltt_session *session, @@ -537,10 +462,22 @@ int check_session_rotation_pending(struct ltt_session *session, { int ret; struct lttng_trace_archive_location *location; - time_t now; + enum lttng_trace_chunk_status chunk_status; + bool rotation_completed = false; + const char *archived_chunk_name; + uint64_t chunk_being_archived_id; + + chunk_status = lttng_trace_chunk_get_id(session->chunk_being_archived, + &chunk_being_archived_id); + assert(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK); DBG("[rotation-thread] Checking for pending rotation on session \"%s\", trace archive %" PRIu64, - session->name, session->current_archive_id - 1); + session->name, chunk_being_archived_id); + + if (!session->chunk_being_archived) { + ret = 0; + goto end; + } /* * The rotation-pending check timer of a session is launched in @@ -555,72 +492,36 @@ int check_session_rotation_pending(struct ltt_session *session, goto end; } - if (session->rotation_pending_local) { - /* Updates session->rotation_pending_local as needed. */ - ret = check_session_rotation_pending_local(session); - if (ret) { - goto end; - } - - /* - * No need to check for a pending rotation on the relay - * since the rotation is not even completed locally yet. - */ - if (session->rotation_pending_local) { - goto end; - } - } - - if (session->rotation_pending_relay) { - /* Updates session->rotation_pending_relay as needed. */ - ret = check_session_rotation_pending_relay(session); - if (ret) { - goto end; - } + check_session_rotation_pending_on_consumers(session, + &rotation_completed); - if (session->rotation_pending_relay) { - goto end; - } - } - - DBG("[rotation-thread] Rotation of trace archive %" PRIu64 " completed for " - "session %s", session->current_archive_id - 1, - session->name); - - /* Rename the completed trace archive's location. */ - now = time(NULL); - if (now == (time_t) -1) { - ret = session_reset_rotation_state(session, - LTTNG_ROTATION_STATE_ERROR); - if (ret) { - ERR("Failed to reset rotation state of session \"%s\"", - session->name); - } - ret = LTTNG_ERR_UNK; - goto end; - } - - ret = rename_completed_chunk(session, now); - if (ret < 0) { - ERR("Failed to rename completed rotation chunk"); + if (!rotation_completed || + session->rotation_state == LTTNG_ROTATION_STATE_ERROR) { goto end; } - session->last_chunk_start_ts = session->current_chunk_start_ts; /* * Now we can clear the "ONGOING" state in the session. New * rotations can start now. */ - session->rotation_state = LTTNG_ROTATION_STATE_COMPLETED; + chunk_status = lttng_trace_chunk_get_name(session->chunk_being_archived, + &archived_chunk_name, NULL); + assert(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK); + free(session->last_archived_chunk_name); + session->last_archived_chunk_name = strdup(archived_chunk_name); + if (!session->last_archived_chunk_name) { + PERROR("Failed to duplicate archived chunk name"); + } + session_reset_rotation_state(session, LTTNG_ROTATION_STATE_COMPLETED); - /* Ownership of location is transferred. */ location = session_get_trace_archive_location(session); + /* Ownership of location is transferred. */ ret = notification_thread_command_session_rotation_completed( notification_thread_handle, session->name, session->uid, session->gid, - session->current_archive_id, + session->last_archived_chunk_id.value, location); if (ret != LTTNG_OK) { ERR("[rotation-thread] Failed to notify notification thread of completed rotation for session %s", @@ -638,18 +539,12 @@ int check_session_rotation_pending(struct ltt_session *session, session->name, session->uid, session->gid, - session->current_archive_id); + session->most_recent_chunk_id.value); if (ret != LTTNG_OK) { ERR("[rotation-thread] Failed to notify notification thread of completed rotation for session %s", session->name); } - ret = rename_active_chunk(session); - if (ret < 0) { - ERR("[rotation-thread] Failed to rename active rotation chunk"); - goto end; - } - /* Ownership of location is transferred. */ location = session_get_trace_archive_location(session); ret = notification_thread_command_session_rotation_completed( @@ -657,7 +552,7 @@ int check_session_rotation_pending(struct ltt_session *session, session->name, session->uid, session->gid, - session->current_archive_id, + session->most_recent_chunk_id.value, location); if (ret != LTTNG_OK) { ERR("[rotation-thread] Failed to notify notification thread of completed rotation for session %s", @@ -668,12 +563,19 @@ int check_session_rotation_pending(struct ltt_session *session, ret = 0; end: if (session->rotation_state == LTTNG_ROTATION_STATE_ONGOING) { + uint64_t chunk_being_archived_id; + + chunk_status = lttng_trace_chunk_get_id( + session->chunk_being_archived, + &chunk_being_archived_id); + assert(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK); + DBG("[rotation-thread] Rotation of trace archive %" PRIu64 " is still pending for session %s", - session->current_archive_id - 1, session->name); + chunk_being_archived_id, session->name); ret = timer_session_rotation_pending_check_start(session, DEFAULT_ROTATE_PENDING_TIMER); if (ret) { - ERR("Re-enabling rotate pending timer"); + ERR("Failed to re-enable rotation pending timer"); ret = -1; goto end; } diff --git a/src/bin/lttng-sessiond/session.c b/src/bin/lttng-sessiond/session.c index 6c10aaeba..abac2404f 100644 --- a/src/bin/lttng-sessiond/session.c +++ b/src/bin/lttng-sessiond/session.c @@ -67,13 +67,6 @@ static const char *forbidden_name_chars = "/"; /* Global hash table to keep the sessions, indexed by id. */ static struct lttng_ht *ltt_sessions_ht_by_id = NULL; -struct consumer_create_chunk_transaction { - struct consumer_socket *socket; - struct lttng_trace_chunk *new_chunk; - struct lttng_trace_chunk *previous_chunk; - bool new_chunk_created; -}; - /* * Validate the session name for forbidden characters. * @@ -254,16 +247,26 @@ void session_get_net_consumer_ports(const struct ltt_session *session, struct lttng_trace_archive_location *session_get_trace_archive_location( struct ltt_session *session) { + int ret; struct lttng_trace_archive_location *location = NULL; + char *chunk_path = NULL; + + if (session->rotation_state != LTTNG_ROTATION_STATE_COMPLETED || + !session->last_archived_chunk_name) { + goto end; + } - if (session->rotation_state != LTTNG_ROTATION_STATE_COMPLETED) { + ret = asprintf(&chunk_path, "%s/" DEFAULT_ARCHIVED_TRACE_CHUNKS_DIRECTORY "/%s", + session_get_base_path(session), + session->last_archived_chunk_name); + if (ret == -1) { goto end; } switch (session_get_consumer_destination_type(session)) { case CONSUMER_DST_LOCAL: location = lttng_trace_archive_location_local_create( - session->rotation_chunk.current_rotate_path); + chunk_path); break; case CONSUMER_DST_NET: { @@ -277,14 +280,14 @@ struct lttng_trace_archive_location *session_get_trace_archive_location( location = lttng_trace_archive_location_relay_create( hostname, LTTNG_TRACE_ARCHIVE_LOCATION_RELAY_PROTOCOL_TYPE_TCP, - control_port, data_port, - session->rotation_chunk.current_rotate_path); + control_port, data_port, chunk_path); break; } default: abort(); } end: + free(chunk_path); return location; } @@ -411,150 +414,130 @@ void session_unlock(struct ltt_session *session) static int _session_set_trace_chunk_no_lock_check(struct ltt_session *session, - struct lttng_trace_chunk *new_trace_chunk) + struct lttng_trace_chunk *new_trace_chunk, + struct lttng_trace_chunk **_current_trace_chunk) { int ret; unsigned int i, refs_to_acquire = 0, refs_acquired = 0, refs_to_release = 0; - unsigned int consumer_count = 0; - /* - * The maximum amount of consumers to reach is 3 - * (32/64 userspace + kernel). - */ - struct consumer_create_chunk_transaction transactions[3] = {}; struct cds_lfht_iter iter; struct consumer_socket *socket; - bool close_error_occured = false; - - if (new_trace_chunk) { - uint64_t chunk_id; - enum lttng_trace_chunk_status chunk_status = - lttng_trace_chunk_get_id(new_trace_chunk, - &chunk_id); - - assert(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK); - LTTNG_OPTIONAL_SET(&session->last_trace_chunk_id, chunk_id) - } - - if (new_trace_chunk) { - refs_to_acquire = 1; - refs_to_acquire += !!session->ust_session; - refs_to_acquire += !!session->kernel_session; - } + struct lttng_trace_chunk *current_trace_chunk; + uint64_t chunk_id; + enum lttng_trace_chunk_status chunk_status; + const uint64_t relayd_id = session->consumer->net_seq_index; + const bool is_local_trace = relayd_id == -1ULL; + rcu_read_lock(); /* - * Build a list of consumers to reach to announce the new trace chunk. - * - * Rolling back the annoucement in case of an error is important since - * not doing so would result in a leak; the chunk will not be - * "reclaimed" by the consumer(s) since they have no concept of the - * lifetime of a session. + * Ownership of current trace chunk is transferred to + * `current_trace_chunk`. */ + current_trace_chunk = session->current_trace_chunk; + session->current_trace_chunk = NULL; if (session->ust_session) { - cds_lfht_for_each_entry( - session->ust_session->consumer->socks->ht, - &iter, socket, node.node) { - transactions[consumer_count].socket = socket; - transactions[consumer_count].new_chunk = new_trace_chunk; - transactions[consumer_count].previous_chunk = - session->current_trace_chunk; - consumer_count++; - assert(consumer_count <= 3); - } + lttng_trace_chunk_put( + session->ust_session->current_trace_chunk); + session->ust_session->current_trace_chunk = NULL; } if (session->kernel_session) { - cds_lfht_for_each_entry( - session->kernel_session->consumer->socks->ht, - &iter, socket, node.node) { - transactions[consumer_count].socket = socket; - transactions[consumer_count].new_chunk = new_trace_chunk; - transactions[consumer_count].previous_chunk = - session->current_trace_chunk; - consumer_count++; - assert(consumer_count <= 3); - } + lttng_trace_chunk_put( + session->kernel_session->current_trace_chunk); + session->kernel_session->current_trace_chunk = NULL; } - for (refs_acquired = 0; refs_acquired < refs_to_acquire; refs_acquired++) { - if (new_trace_chunk && !lttng_trace_chunk_get(new_trace_chunk)) { - ERR("Failed to acquire reference to new current trace chunk of session \"%s\"", - session->name); - goto error; - } + if (!new_trace_chunk) { + ret = 0; + goto end; } + chunk_status = lttng_trace_chunk_get_id(new_trace_chunk, &chunk_id); + assert(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK); - /* - * Close the previous chunk on remote peers (consumers and relayd). - */ - for (i = 0; i < consumer_count; i++) { - if (!transactions[i].previous_chunk) { - continue; - } - pthread_mutex_lock(transactions[i].socket->lock); - ret = consumer_close_trace_chunk(transactions[i].socket, - session->consumer->net_seq_index, - session->id, - transactions[i].previous_chunk); - pthread_mutex_unlock(transactions[i].socket->lock); - if (ret) { - ERR("Failed to close trace chunk on consumer"); - close_error_occured = true; + refs_to_acquire = 1; + refs_to_acquire += !!session->ust_session; + refs_to_acquire += !!session->kernel_session; + + for (refs_acquired = 0; refs_acquired < refs_to_acquire; + refs_acquired++) { + if (!lttng_trace_chunk_get(new_trace_chunk)) { + ERR("Failed to acquire reference to new trace chunk of session \"%s\"", + session->name); + goto error; } } - if (close_error_occured) { - /* - * Skip the creation of the new trace chunk and report the - * error. - */ - goto error; - } + if (session->ust_session) { + session->ust_session->current_trace_chunk = new_trace_chunk; + if (is_local_trace) { + enum lttng_error_code ret_error_code; - /* Create the new chunk on remote peers (consumers and relayd) */ - if (new_trace_chunk) { - for (i = 0; i < consumer_count; i++) { - pthread_mutex_lock(transactions[i].socket->lock); - ret = consumer_create_trace_chunk(transactions[i].socket, - session->consumer->net_seq_index, - session->id, - transactions[i].new_chunk); - pthread_mutex_unlock(transactions[i].socket->lock); - if (ret) { - ERR("Failed to create trace chunk on consumer"); + ret_error_code = ust_app_create_channel_subdirectories( + session->ust_session); + if (ret_error_code != LTTNG_OK) { + ret = -ret_error_code; goto error; } - /* This will have to be rolled-back on error. */ - transactions[i].new_chunk_created = true; - } - } - - lttng_trace_chunk_put(session->current_trace_chunk); - session->current_trace_chunk = NULL; - if (session->ust_session) { - lttng_trace_chunk_put( - session->ust_session->current_trace_chunk); - session->ust_session->current_trace_chunk = NULL; - } + } + cds_lfht_for_each_entry( + session->ust_session->consumer->socks->ht, + &iter, socket, node.node) { + pthread_mutex_lock(socket->lock); + ret = consumer_create_trace_chunk(socket, + relayd_id, + session->id, new_trace_chunk); + pthread_mutex_unlock(socket->lock); + if (ret) { + goto error; + } + } + } if (session->kernel_session) { - lttng_trace_chunk_put( - session->kernel_session->current_trace_chunk); - session->kernel_session->current_trace_chunk = NULL; - } + session->kernel_session->current_trace_chunk = new_trace_chunk; + if (is_local_trace) { + enum lttng_error_code ret_error_code; + + ret_error_code = kernel_create_channel_subdirectories( + session->kernel_session); + if (ret_error_code != LTTNG_OK) { + ret = -ret_error_code; + goto error; + } + } + cds_lfht_for_each_entry( + session->kernel_session->consumer->socks->ht, + &iter, socket, node.node) { + pthread_mutex_lock(socket->lock); + ret = consumer_create_trace_chunk(socket, + relayd_id, + session->id, new_trace_chunk); + pthread_mutex_unlock(socket->lock); + if (ret) { + goto error; + } + } + } /* * Update local current trace chunk state last, only if all remote - * annoucements succeeded. + * creations succeeded. */ session->current_trace_chunk = new_trace_chunk; + LTTNG_OPTIONAL_SET(&session->most_recent_chunk_id, chunk_id); +end: + if (_current_trace_chunk) { + *_current_trace_chunk = current_trace_chunk; + current_trace_chunk = NULL; + } +end_no_move: + rcu_read_unlock(); + lttng_trace_chunk_put(current_trace_chunk); + return ret; +error: if (session->ust_session) { - session->ust_session->current_trace_chunk = new_trace_chunk; + session->ust_session->current_trace_chunk = NULL; } if (session->kernel_session) { - session->kernel_session->current_trace_chunk = - new_trace_chunk; + session->kernel_session->current_trace_chunk = NULL; } - - return 0; -error: - /* + /* * Release references taken in the case where all references could not * be acquired. */ @@ -562,34 +545,12 @@ error: for (i = 0; i < refs_to_release; i++) { lttng_trace_chunk_put(new_trace_chunk); } - - /* - * Close the newly-created chunk from remote peers (consumers and - * relayd). - */ - DBG("Rolling back the creation of the new trace chunk on consumers"); - for (i = 0; i < consumer_count; i++) { - if (!transactions[i].new_chunk_created) { - continue; - } - - pthread_mutex_lock(transactions[i].socket->lock); - ret = consumer_close_trace_chunk(transactions[i].socket, - session->consumer->net_seq_index, - session->id, - transactions[i].new_chunk); - pthread_mutex_unlock(transactions[i].socket->lock); - if (ret) { - ERR("Failed to close trace chunk on consumer"); - close_error_occured = true; - } - } - - return -1; + ret = -1; + goto end_no_move; } static -bool output_supports_chunks(const struct ltt_session *session) +bool output_supports_trace_chunks(const struct ltt_session *session) { if (session->consumer->type == CONSUMER_DST_LOCAL) { return true; @@ -614,15 +575,15 @@ bool output_supports_chunks(const struct ltt_session *session) return false; } -enum lttng_error_code session_switch_trace_chunk(struct ltt_session *session, +struct lttng_trace_chunk *session_create_new_trace_chunk( + struct ltt_session *session, const char *session_base_path_override, const char *chunk_name_override) { int ret; - enum lttng_error_code ret_code = LTTNG_OK; struct lttng_trace_chunk *trace_chunk = NULL; enum lttng_trace_chunk_status chunk_status; - const time_t timestamp_begin = time(NULL); + const time_t chunk_creation_ts = time(NULL); const bool is_local_trace = session->consumer->type == CONSUMER_DST_LOCAL; const char *base_path = session_base_path_override ? : @@ -634,37 +595,28 @@ enum lttng_error_code session_switch_trace_chunk(struct ltt_session *session, }; uint64_t next_chunk_id; - if (timestamp_begin == (time_t) -1) { - PERROR("Failed to sample time while changing session \"%s\" trace chunk", + if (chunk_creation_ts == (time_t) -1) { + PERROR("Failed to sample time while creation session \"%s\" trace chunk", session->name); - ret_code = LTTNG_ERR_FATAL; goto error; } - session->current_chunk_start_ts = timestamp_begin; - if (!output_supports_chunks(session)) { + if (!output_supports_trace_chunks(session)) { goto end; } - next_chunk_id = session->last_trace_chunk_id.is_set ? - session->last_trace_chunk_id.value + 1 : 0; + next_chunk_id = session->most_recent_chunk_id.is_set ? + session->most_recent_chunk_id.value + 1 : 0; - trace_chunk = lttng_trace_chunk_create(next_chunk_id, timestamp_begin); + trace_chunk = lttng_trace_chunk_create(next_chunk_id, + chunk_creation_ts); if (!trace_chunk) { - ret_code = LTTNG_ERR_FATAL; goto error; } if (chunk_name_override) { chunk_status = lttng_trace_chunk_override_name(trace_chunk, chunk_name_override); - switch (chunk_status) { - case LTTNG_TRACE_CHUNK_STATUS_OK: - break; - case LTTNG_TRACE_CHUNK_STATUS_INVALID_ARGUMENT: - ret_code = LTTNG_ERR_INVALID; - goto error; - default: - ret_code = LTTNG_ERR_NOMEM; + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { goto error; } } @@ -674,49 +626,101 @@ enum lttng_error_code session_switch_trace_chunk(struct ltt_session *session, * No need to set crendentials and output directory * for remote trace chunks. */ - goto publish; + goto end; } chunk_status = lttng_trace_chunk_set_credentials(trace_chunk, &session_credentials); if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { - ret_code = LTTNG_ERR_FATAL; goto error; } - if (!session->current_trace_chunk) { - DBG("Creating base output directory of session \"%s\" at %s", - session->name, base_path); - } + DBG("Creating base output directory of session \"%s\" at %s", + session->name, base_path); ret = utils_mkdir_recursive(base_path, S_IRWXU | S_IRWXG, session->uid, session->gid); if (ret) { - ret = LTTNG_ERR_FATAL; goto error; } ret = lttng_directory_handle_init(&session_output_directory, base_path); if (ret) { - ret = LTTNG_ERR_FATAL; goto error; } chunk_status = lttng_trace_chunk_set_as_owner(trace_chunk, &session_output_directory); lttng_directory_handle_fini(&session_output_directory); if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { - ret = LTTNG_ERR_CREATE_DIR_FAIL; - goto error; - } -publish: - ret = session_set_trace_chunk(session, trace_chunk); - if (ret) { - ret_code = LTTNG_ERR_FATAL; goto error; } +end: + return trace_chunk; error: lttng_trace_chunk_put(trace_chunk); + trace_chunk = NULL; + goto end; +} + +int session_close_trace_chunk(const struct ltt_session *session, + struct lttng_trace_chunk *trace_chunk) +{ + int ret = 0; + bool error_occurred = false; + struct cds_lfht_iter iter; + struct consumer_socket *socket; + enum lttng_trace_chunk_status chunk_status; + const time_t chunk_close_timestamp = time(NULL); + + if (chunk_close_timestamp == (time_t) -1) { + ERR("Failed to sample the close timestamp of the current trace chunk of session \"%s\"", + session->name); + ret = -1; + goto end; + } + chunk_status = lttng_trace_chunk_set_close_timestamp(trace_chunk, + chunk_close_timestamp); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + ERR("Failed to set the close timestamp of the current trace chunk of session \"%s\"", + session->name); + ret = -1; + goto end; + } + + if (session->ust_session) { + cds_lfht_for_each_entry( + session->ust_session->consumer->socks->ht, + &iter, socket, node.node) { + pthread_mutex_lock(socket->lock); + ret = consumer_close_trace_chunk(socket, + session->consumer->net_seq_index, + session->id, + trace_chunk); + pthread_mutex_unlock(socket->lock); + if (ret) { + ERR("Failed to close trace chunk on user space consumer"); + error_occurred = true; + } + } + } + if (session->kernel_session) { + cds_lfht_for_each_entry( + session->kernel_session->consumer->socks->ht, + &iter, socket, node.node) { + pthread_mutex_lock(socket->lock); + ret = consumer_close_trace_chunk(socket, + session->consumer->net_seq_index, + session->id, + trace_chunk); + pthread_mutex_unlock(socket->lock); + if (ret) { + ERR("Failed to close trace chunk on kernel consumer"); + error_occurred = true; + } + } + } + ret = error_occurred ? -1 : 0; end: - return ret_code; + return ret; } /* @@ -725,10 +729,12 @@ end: * Must be called with the session lock held. */ int session_set_trace_chunk(struct ltt_session *session, - struct lttng_trace_chunk *new_trace_chunk) + struct lttng_trace_chunk *new_trace_chunk, + struct lttng_trace_chunk **current_trace_chunk) { ASSERT_LOCKED(session->lock); - return _session_set_trace_chunk_no_lock_check(session, new_trace_chunk); + return _session_set_trace_chunk_no_lock_check(session, new_trace_chunk, + current_trace_chunk); } static @@ -739,11 +745,24 @@ void session_release(struct urcu_ref *ref) struct ltt_kernel_session *ksess; struct ltt_session *session = container_of(ref, typeof(*session), ref); + assert(!session->chunk_being_archived); + usess = session->ust_session; ksess = session->kernel_session; - (void) _session_set_trace_chunk_no_lock_check(session, NULL); + if (session->current_trace_chunk) { + ret = session_close_trace_chunk(session, session->current_trace_chunk); + if (ret) { + ERR("Failed to close the current trace chunk of session \"%s\" during its release", + session->name); + } + ret = _session_set_trace_chunk_no_lock_check(session, NULL, NULL); + if (ret) { + ERR("Failed to release the current trace chunk of session \"%s\" during its release", + session->name); + } + } - /* Clean kernel session teardown */ + /* Clean kernel session teardown */ kernel_destroy_session(ksess); session->kernel_session = NULL; @@ -785,6 +804,7 @@ void session_release(struct urcu_ref *ref) del_session_ht(session); pthread_cond_broadcast(<t_session_list.removal_cond); } + free(session->last_archived_chunk_name); free(session); } @@ -1097,12 +1117,22 @@ int session_reset_rotation_state(struct ltt_session *session, ASSERT_LOCKED(ltt_session_list.lock); ASSERT_LOCKED(session->lock); - session->rotation_pending_local = false; - session->rotation_pending_relay = false; - session->rotated_after_last_stop = false; session->rotation_state = result; if (session->rotation_pending_check_timer_enabled) { ret = timer_session_rotation_pending_check_stop(session); } + if (session->chunk_being_archived) { + uint64_t chunk_id; + enum lttng_trace_chunk_status chunk_status; + + chunk_status = lttng_trace_chunk_get_id( + session->chunk_being_archived, + &chunk_id); + assert(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK); + LTTNG_OPTIONAL_SET(&session->last_archived_chunk_id, + chunk_id); + lttng_trace_chunk_put(session->chunk_being_archived); + session->chunk_being_archived = NULL; + } return ret; } diff --git a/src/bin/lttng-sessiond/session.h b/src/bin/lttng-sessiond/session.h index 19dabb727..1cea52a61 100644 --- a/src/bin/lttng-sessiond/session.h +++ b/src/bin/lttng-sessiond/session.h @@ -146,61 +146,6 @@ struct ltt_session { * Node in ltt_sessions_ht_by_id. */ struct lttng_ht_node_u64 node; - /* - * The current archive id corresponds to the number of session rotations - * that have occurred for this session. The archive id - * is used to tag the "generation" of a stream. This tag allows the - * consumer and relay daemons to track when a given stream was created - * during the lifetime of a session. - * - * For instance, if a stream is created after a session rotation was - * launched, the consumer and relay daemons must not check its position - * to determine if that specific session rotation was completed. It is - * implicitly "completed" since the stream appeared _after_ the session - * rotation was initiated. - */ - uint64_t current_archive_id; - /* - * Rotation is considered pending between the time it is launched up - * until the moment when the data has been writen at the destination - * and the trace archive has been renamed. - * - * When tracing locally, only 'rotation_pending_local' is used since - * no remote checks are needed. However, when tracing to a relay daemon, - * a second check is needed to ensure that the data has been - * commited at the remote destination. - */ - bool rotation_pending_local; - bool rotation_pending_relay; - /* Current state of a rotation. */ - enum lttng_rotation_state rotation_state; - struct { - /* - * When the rotation is in progress, the temporary path name is - * stored here. When the rotation is complete, the final path name - * is here and can be queried with the rotate_pending call. - */ - char current_rotate_path[LTTNG_PATH_MAX]; - /* - * The path where the consumer is currently writing after the first - * session rotation. - */ - char active_tracing_path[LTTNG_PATH_MAX]; - } rotation_chunk; - /* - * The timestamp of the beginning of the previous chunk. For the - * first chunk, this is the "lttng start" timestamp. For the - * subsequent ones, this copies the current_chunk_start_ts value when - * a new rotation starts. This value is used to set the name of a - * complete chunk directory, ex: "last_chunk_start_ts-now()". - */ - time_t last_chunk_start_ts; - /* - * This is the timestamp when a new chunk starts. When a new rotation - * starts, we copy this value to last_chunk_start_ts and replace it - * with the current timestamp. - */ - time_t current_chunk_start_ts; /* * Timer to check periodically if a relay and/or consumer has completed * the last rotation. @@ -226,8 +171,13 @@ struct ltt_session { */ struct lttng_condition *rotate_condition; struct lttng_trigger *rotate_trigger; - LTTNG_OPTIONAL(uint64_t) last_trace_chunk_id; + LTTNG_OPTIONAL(uint64_t) most_recent_chunk_id; struct lttng_trace_chunk *current_trace_chunk; + struct lttng_trace_chunk *chunk_being_archived; + /* Current state of a rotation. */ + enum lttng_rotation_state rotation_state; + char *last_archived_chunk_name; + LTTNG_OPTIONAL(uint64_t) last_archived_chunk_id; }; /* Prototypes */ @@ -265,10 +215,29 @@ int session_access_ok(struct ltt_session *session, uid_t uid, gid_t gid); int session_reset_rotation_state(struct ltt_session *session, enum lttng_rotation_state result); -enum lttng_error_code session_switch_trace_chunk(struct ltt_session *session, +/* Create a new trace chunk object from the session's configuration. */ +struct lttng_trace_chunk *session_create_new_trace_chunk( + struct ltt_session *session, const char *session_base_path_override, const char *chunk_name_override); + +/* + * Set `new_trace_chunk` as the session's current trace chunk. A reference + * to `new_trace_chunk` is acquired by the session. The chunk is created + * on remote peers (consumer and relay daemons). + * + * A reference to the session's current trace chunk is returned through + * `current_session_trace_chunk` on success. + */ int session_set_trace_chunk(struct ltt_session *session, - struct lttng_trace_chunk *current_trace_chunk); + struct lttng_trace_chunk *new_trace_chunk, + struct lttng_trace_chunk **current_session_trace_chunk); + +/* + * Close a chunk on the remote peers of a session. Has no effect on the + * ltt_session itself. + */ +int session_close_trace_chunk(const struct ltt_session *session, + struct lttng_trace_chunk *trace_chunk); #endif /* _LTT_SESSION_H */ diff --git a/src/bin/lttng-sessiond/ust-app.c b/src/bin/lttng-sessiond/ust-app.c index 4cc21b42c..42a9b407e 100644 --- a/src/bin/lttng-sessiond/ust-app.c +++ b/src/bin/lttng-sessiond/ust-app.c @@ -2462,7 +2462,7 @@ static int do_consumer_create_channel(struct ltt_ust_session *usess, * stream we have to expect. */ ret = ust_consumer_ask_channel(ua_sess, ua_chan, usess->consumer, socket, - registry, trace_archive_id); + registry, usess->current_trace_chunk); if (ret < 0) { goto error_ask; } @@ -2845,7 +2845,7 @@ static int create_channel_per_uid(struct ust_app *app, */ ret = do_consumer_create_channel(usess, ua_sess, ua_chan, app->bits_per_long, reg_uid->registry->reg.ust, - session->current_archive_id); + session->most_recent_chunk_id.value); if (ret < 0) { ERR("Error creating UST channel \"%s\" on the consumer daemon", ua_chan->name); @@ -2959,7 +2959,7 @@ static int create_channel_per_pid(struct ust_app *app, /* Create and get channel on the consumer side. */ ret = do_consumer_create_channel(usess, ua_sess, ua_chan, app->bits_per_long, registry, - session->current_archive_id); + session->most_recent_chunk_id.value); if (ret < 0) { ERR("Error creating UST channel \"%s\" on the consumer daemon", ua_chan->name); @@ -3236,7 +3236,7 @@ static int create_ust_app_metadata(struct ust_app_session *ua_sess, * consumer. */ ret = ust_consumer_ask_channel(ua_sess, metadata, consumer, socket, - registry, session->current_archive_id); + registry, session->current_trace_chunk); if (ret < 0) { /* Nullify the metadata key so we don't try to close it later on. */ registry->metadata_key = 0; @@ -5884,20 +5884,12 @@ enum lttng_error_code ust_app_snapshot_record( struct lttng_ht_iter iter; struct ust_app *app; char pathname[PATH_MAX]; - struct ltt_session *session = NULL; - uint64_t trace_archive_id; assert(usess); assert(output); rcu_read_lock(); - session = session_find_by_id(usess->id); - assert(session); - assert(pthread_mutex_trylock(&session->lock)); - assert(session_trylock_list()); - trace_archive_id = session->current_archive_id; - switch (usess->buffer_type) { case LTTNG_BUFFER_PER_UID: { @@ -5921,8 +5913,12 @@ enum lttng_error_code ust_app_snapshot_record( } memset(pathname, 0, sizeof(pathname)); + /* + * DEFAULT_UST_TRACE_UID_PATH already contains a path + * separator. + */ ret = snprintf(pathname, sizeof(pathname), - DEFAULT_UST_TRACE_DIR "/" DEFAULT_UST_TRACE_UID_PATH, + DEFAULT_UST_TRACE_DIR DEFAULT_UST_TRACE_UID_PATH, reg->uid, reg->bits_per_long); if (ret < 0) { PERROR("snprintf snapshot path"); @@ -5930,23 +5926,21 @@ enum lttng_error_code ust_app_snapshot_record( goto error; } - /* Add the UST default trace dir to path. */ + /* Add the UST default trace dir to path. */ cds_lfht_for_each_entry(reg->registry->channels->ht, &iter.iter, reg_chan, node.node) { status = consumer_snapshot_channel(socket, reg_chan->consumer_key, output, 0, usess->uid, usess->gid, pathname, wait, - nb_packets_per_stream, - trace_archive_id); + nb_packets_per_stream); if (status != LTTNG_OK) { goto error; } } status = consumer_snapshot_channel(socket, reg->registry->reg.ust->metadata_key, output, 1, - usess->uid, usess->gid, pathname, wait, 0, - trace_archive_id); + usess->uid, usess->gid, pathname, wait, 0); if (status != LTTNG_OK) { goto error; } @@ -5978,7 +5972,7 @@ enum lttng_error_code ust_app_snapshot_record( /* Add the UST default trace dir to path. */ memset(pathname, 0, sizeof(pathname)); - ret = snprintf(pathname, sizeof(pathname), DEFAULT_UST_TRACE_DIR "/%s", + ret = snprintf(pathname, sizeof(pathname), DEFAULT_UST_TRACE_DIR "%s", ua_sess->path); if (ret < 0) { status = LTTNG_ERR_INVALID; @@ -5986,14 +5980,13 @@ enum lttng_error_code ust_app_snapshot_record( goto error; } - cds_lfht_for_each_entry(ua_sess->channels->ht, &chan_iter.iter, + cds_lfht_for_each_entry(ua_sess->channels->ht, &chan_iter.iter, ua_chan, node.node) { status = consumer_snapshot_channel(socket, ua_chan->key, output, 0, ua_sess->euid, ua_sess->egid, pathname, wait, - nb_packets_per_stream, - trace_archive_id); + nb_packets_per_stream); switch (status) { case LTTNG_OK: break; @@ -6012,8 +6005,7 @@ enum lttng_error_code ust_app_snapshot_record( status = consumer_snapshot_channel(socket, registry->metadata_key, output, 1, ua_sess->euid, ua_sess->egid, - pathname, wait, 0, - trace_archive_id); + pathname, wait, 0); switch (status) { case LTTNG_OK: break; @@ -6032,9 +6024,6 @@ enum lttng_error_code ust_app_snapshot_record( error: rcu_read_unlock(); - if (session) { - session_put(session); - } return status; } @@ -6281,7 +6270,6 @@ enum lttng_error_code ust_app_rotate_session(struct ltt_session *session) struct lttng_ht_iter iter; struct ust_app *app; struct ltt_ust_session *usess = session->ust_session; - char pathname[LTTNG_PATH_MAX]; assert(usess); @@ -6304,24 +6292,14 @@ enum lttng_error_code ust_app_rotate_session(struct ltt_session *session) goto error; } - ret = snprintf(pathname, sizeof(pathname), - DEFAULT_UST_TRACE_DIR "/" DEFAULT_UST_TRACE_UID_PATH, - reg->uid, reg->bits_per_long); - if (ret < 0 || ret >= sizeof(pathname)) { - PERROR("Failed to format rotation path"); - cmd_ret = LTTNG_ERR_INVALID; - goto error; - } - /* Rotate the data channels. */ cds_lfht_for_each_entry(reg->registry->channels->ht, &iter.iter, reg_chan, node.node) { ret = consumer_rotate_channel(socket, reg_chan->consumer_key, usess->uid, usess->gid, - usess->consumer, pathname, - /* is_metadata_channel */ false, - session->current_archive_id); + usess->consumer, + /* is_metadata_channel */ false); if (ret < 0) { cmd_ret = LTTNG_ERR_ROTATION_FAIL_CONSUMER; goto error; @@ -6333,9 +6311,8 @@ enum lttng_error_code ust_app_rotate_session(struct ltt_session *session) ret = consumer_rotate_channel(socket, reg->registry->reg.ust->metadata_key, usess->uid, usess->gid, - usess->consumer, pathname, - /* is_metadata_channel */ true, - session->current_archive_id); + usess->consumer, + /* is_metadata_channel */ true); if (ret < 0) { cmd_ret = LTTNG_ERR_ROTATION_FAIL_CONSUMER; goto error; @@ -6357,14 +6334,6 @@ enum lttng_error_code ust_app_rotate_session(struct ltt_session *session) /* Session not associated with this app. */ continue; } - ret = snprintf(pathname, sizeof(pathname), - DEFAULT_UST_TRACE_DIR "/%s", - ua_sess->path); - if (ret < 0 || ret >= sizeof(pathname)) { - PERROR("Failed to format rotation path"); - cmd_ret = LTTNG_ERR_INVALID; - goto error; - } /* Get the right consumer socket for the application. */ socket = consumer_find_socket_by_bitness(app->bits_per_long, @@ -6380,15 +6349,13 @@ enum lttng_error_code ust_app_rotate_session(struct ltt_session *session) continue; } - /* Rotate the data channels. */ cds_lfht_for_each_entry(ua_sess->channels->ht, &chan_iter.iter, ua_chan, node.node) { ret = consumer_rotate_channel(socket, ua_chan->key, ua_sess->euid, ua_sess->egid, - ua_sess->consumer, pathname, - /* is_metadata_channel */ false, - session->current_archive_id); + ua_sess->consumer, + /* is_metadata_channel */ false); if (ret < 0) { /* Per-PID buffer and application going away. */ if (ret == -LTTNG_ERR_CHAN_NOT_FOUND) @@ -6402,9 +6369,8 @@ enum lttng_error_code ust_app_rotate_session(struct ltt_session *session) (void) push_metadata(registry, usess->consumer); ret = consumer_rotate_channel(socket, registry->metadata_key, ua_sess->euid, ua_sess->egid, - ua_sess->consumer, pathname, - /* is_metadata_channel */ true, - session->current_archive_id); + ua_sess->consumer, + /* is_metadata_channel */ true); if (ret < 0) { /* Per-PID buffer and application going away. */ if (ret == -LTTNG_ERR_CHAN_NOT_FOUND) @@ -6426,3 +6392,99 @@ error: rcu_read_unlock(); return cmd_ret; } + +enum lttng_error_code ust_app_create_channel_subdirectories( + const struct ltt_ust_session *usess) +{ + enum lttng_error_code ret = LTTNG_OK; + struct lttng_ht_iter iter; + enum lttng_trace_chunk_status chunk_status; + char *pathname_index; + int fmt_ret; + + assert(usess->current_trace_chunk); + rcu_read_lock(); + + switch (usess->buffer_type) { + case LTTNG_BUFFER_PER_UID: + { + struct buffer_reg_uid *reg; + + cds_list_for_each_entry(reg, &usess->buffer_reg_uid_list, lnode) { + fmt_ret = asprintf(&pathname_index, + DEFAULT_UST_TRACE_DIR DEFAULT_UST_TRACE_UID_PATH "/" DEFAULT_INDEX_DIR, + reg->uid, reg->bits_per_long); + if (fmt_ret < 0) { + ERR("Failed to format channel index directory"); + ret = LTTNG_ERR_CREATE_DIR_FAIL; + goto error; + } + + /* + * Create the index subdirectory which will take care + * of implicitly creating the channel's path. + */ + chunk_status = lttng_trace_chunk_create_subdirectory( + usess->current_trace_chunk, + pathname_index); + free(pathname_index); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + ret = LTTNG_ERR_CREATE_DIR_FAIL; + goto error; + } + } + break; + } + case LTTNG_BUFFER_PER_PID: + { + struct ust_app *app; + + cds_lfht_for_each_entry(ust_app_ht->ht, &iter.iter, app, + pid_n.node) { + struct ust_app_session *ua_sess; + struct ust_registry_session *registry; + + ua_sess = lookup_session_by_app(usess, app); + if (!ua_sess) { + /* Session not associated with this app. */ + continue; + } + + registry = get_session_registry(ua_sess); + if (!registry) { + DBG("Application session is being torn down. Skip application."); + continue; + } + + fmt_ret = asprintf(&pathname_index, + DEFAULT_UST_TRACE_DIR "%s/" DEFAULT_INDEX_DIR, + ua_sess->path); + if (fmt_ret < 0) { + ERR("Failed to format channel index directory"); + ret = LTTNG_ERR_CREATE_DIR_FAIL; + goto error; + } + /* + * Create the index subdirectory which will take care + * of implicitly creating the channel's path. + */ + chunk_status = lttng_trace_chunk_create_subdirectory( + usess->current_trace_chunk, + pathname_index); + free(pathname_index); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + ret = LTTNG_ERR_CREATE_DIR_FAIL; + goto error; + } + } + break; + } + default: + abort(); + } + + ret = LTTNG_OK; +error: + rcu_read_unlock(); + return ret; +} diff --git a/src/bin/lttng-sessiond/ust-app.h b/src/bin/lttng-sessiond/ust-app.h index 0627dd904..9afbe6bc2 100644 --- a/src/bin/lttng-sessiond/ust-app.h +++ b/src/bin/lttng-sessiond/ust-app.h @@ -206,6 +206,7 @@ struct ust_app_session { * ust_sessions_objd hash table in the ust_app object. */ struct lttng_ht_node_ulong ust_objd_node; + /* Starts with 'ust'; no leading slash. */ char path[PATH_MAX]; /* UID/GID of the application owning the session */ uid_t uid; @@ -357,6 +358,8 @@ int ust_app_pid_get_channel_runtime_stats(struct ltt_ust_session *usess, int overwrite, uint64_t *discarded, uint64_t *lost); int ust_app_regenerate_statedump_all(struct ltt_ust_session *usess); enum lttng_error_code ust_app_rotate_session(struct ltt_session *session); +enum lttng_error_code ust_app_create_channel_subdirectories( + const struct ltt_ust_session *session); static inline int ust_app_supported(void) @@ -590,6 +593,13 @@ enum lttng_error_code ust_app_rotate_session(struct ltt_session *session) return 0; } +static inline +enum lttng_error_code ust_app_create_channel_subdirectories( + const struct ltt_ust_session *session) +{ + return 0; +} + #endif /* HAVE_LIBLTTNG_UST_CTL */ #endif /* _LTT_UST_APP_H */ diff --git a/src/bin/lttng-sessiond/ust-consumer.c b/src/bin/lttng-sessiond/ust-consumer.c index 6be57e447..c39a87fba 100644 --- a/src/bin/lttng-sessiond/ust-consumer.c +++ b/src/bin/lttng-sessiond/ust-consumer.c @@ -36,13 +36,12 @@ /* * Return allocated full pathname of the session using the consumer trace path - * and subdir if available. On a successful allocation, the directory of the - * trace is created with the session credentials. + * and subdir if available. * * The caller can safely free(3) the returned value. On error, NULL is * returned. */ -static char *setup_trace_path(struct consumer_output *consumer, +static char *setup_channel_trace_path(struct consumer_output *consumer, struct ust_app_session *ua_sess) { int ret; @@ -65,34 +64,24 @@ static char *setup_trace_path(struct consumer_output *consumer, /* Get correct path name destination */ if (consumer->type == CONSUMER_DST_LOCAL) { /* Set application path to the destination path */ - ret = snprintf(pathname, LTTNG_PATH_MAX, "%s/%s%s/%s", - consumer->dst.session_root_path, - consumer->chunk_path, + ret = snprintf(pathname, LTTNG_PATH_MAX, "%s%s", consumer->domain_subdir, ua_sess->path); - if (ret < 0) { - PERROR("snprintf channel path"); - goto error; - } - - /* Create directory. Ignore if exist. */ - ret = run_as_mkdir_recursive(pathname, S_IRWXU | S_IRWXG, - ua_sess->euid, ua_sess->egid); - if (ret < 0) { - if (errno != EEXIST) { - ERR("Trace directory creation error"); - goto error; - } - } + DBG3("Userspace local consumer trace path relative to current trace chunk: \"%s\"", + pathname); } else { ret = snprintf(pathname, LTTNG_PATH_MAX, "%s%s/%s%s", consumer->dst.net.base_dir, consumer->chunk_path, consumer->domain_subdir, ua_sess->path); - if (ret < 0) { - PERROR("snprintf channel path"); - goto error; - } + } + if (ret < 0) { + PERROR("Failed to format channel path"); + goto error; + } else if (ret >= LTTNG_PATH_MAX) { + ERR("Truncation occurred while formatting channel path"); + ret = -1; + goto error; } return pathname; @@ -112,7 +101,7 @@ static int ask_channel_creation(struct ust_app_session *ua_sess, struct consumer_output *consumer, struct consumer_socket *socket, struct ust_registry_session *registry, - uint64_t trace_archive_id) + struct lttng_trace_chunk *trace_chunk) { int ret, output; uint32_t chan_id; @@ -122,6 +111,7 @@ static int ask_channel_creation(struct ust_app_session *ua_sess, struct ust_registry_channel *chan_reg; char shm_path[PATH_MAX] = ""; char root_shm_path[PATH_MAX] = ""; + bool is_local_trace; assert(ua_sess); assert(ua_chan); @@ -131,10 +121,34 @@ static int ask_channel_creation(struct ust_app_session *ua_sess, DBG2("Asking UST consumer for channel"); - /* Get and create full trace path of session. */ - if (ua_sess->output_traces) { - pathname = setup_trace_path(consumer, ua_sess); - if (!pathname) { + is_local_trace = consumer->net_seq_index == -1ULL; + /* Format the channel's path (relative to the current trace chunk). */ + pathname = setup_channel_trace_path(consumer, ua_sess); + if (!pathname) { + ret = -1; + goto error; + } + + if (is_local_trace && trace_chunk) { + enum lttng_trace_chunk_status chunk_status; + char *pathname_index; + + ret = asprintf(&pathname_index, "%s/" DEFAULT_INDEX_DIR, + pathname); + if (ret < 0) { + ERR("Failed to format channel index directory"); + ret = -1; + goto error; + } + + /* + * Create the index subdirectory which will take care + * of implicitly creating the channel's path. + */ + chunk_status = lttng_trace_chunk_create_subdirectory( + trace_chunk, pathname_index); + free(pathname_index); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { ret = -1; goto error; } @@ -192,8 +206,6 @@ static int ask_channel_creation(struct ust_app_session *ua_sess, ua_sess->tracing_id, pathname, ua_chan->name, - ua_sess->euid, - ua_sess->egid, consumer->net_seq_index, ua_chan->key, registry->uuid, @@ -205,7 +217,7 @@ static int ask_channel_creation(struct ust_app_session *ua_sess, ua_sess->uid, ua_chan->attr.blocking_timeout, root_shm_path, shm_path, - trace_archive_id); + trace_chunk); health_code_update(); @@ -247,7 +259,7 @@ int ust_consumer_ask_channel(struct ust_app_session *ua_sess, struct consumer_output *consumer, struct consumer_socket *socket, struct ust_registry_session *registry, - uint64_t trace_archive_id) + struct lttng_trace_chunk * trace_chunk) { int ret; @@ -265,7 +277,7 @@ int ust_consumer_ask_channel(struct ust_app_session *ua_sess, pthread_mutex_lock(socket->lock); ret = ask_channel_creation(ua_sess, ua_chan, consumer, socket, registry, - trace_archive_id); + trace_chunk); pthread_mutex_unlock(socket->lock); if (ret < 0) { ERR("ask_channel_creation consumer command failed"); diff --git a/src/bin/lttng-sessiond/ust-consumer.h b/src/bin/lttng-sessiond/ust-consumer.h index b8bd65575..77b1d77ea 100644 --- a/src/bin/lttng-sessiond/ust-consumer.h +++ b/src/bin/lttng-sessiond/ust-consumer.h @@ -21,13 +21,14 @@ #include "consumer.h" #include "ust-app.h" #include +#include int ust_consumer_ask_channel(struct ust_app_session *ua_sess, struct ust_app_channel *ua_chan, struct consumer_output *consumer, struct consumer_socket *socket, struct ust_registry_session *registry, - uint64_t trace_archive_id); + struct lttng_trace_chunk *trace_chunk); int ust_consumer_get_channel(struct consumer_socket *socket, struct ust_app_channel *ua_chan); diff --git a/src/bin/lttng-sessiond/utils.c b/src/bin/lttng-sessiond/utils.c index 38082f639..51fdc9933 100644 --- a/src/bin/lttng-sessiond/utils.c +++ b/src/bin/lttng-sessiond/utils.c @@ -23,6 +23,7 @@ #include #include "utils.h" +#include "snapshot.h" #include "lttng-sessiond.h" int ht_cleanup_pipe[2] = { -1, -1 }; @@ -102,3 +103,11 @@ const char *session_get_base_path(const struct ltt_session *session) session->consumer->dst.net.base_dir : session->consumer->dst.session_root_path; } + +const char *snapshot_output_get_base_path( + const struct snapshot_output *snapshot_output) +{ + return snapshot_output->consumer->type == CONSUMER_DST_LOCAL ? + snapshot_output->consumer->dst.session_root_path : + snapshot_output->consumer->dst.net.base_dir; +} diff --git a/src/bin/lttng-sessiond/utils.h b/src/bin/lttng-sessiond/utils.h index 30c80725f..34aa25784 100644 --- a/src/bin/lttng-sessiond/utils.h +++ b/src/bin/lttng-sessiond/utils.h @@ -20,6 +20,7 @@ struct lttng_ht; struct ltt_session; +struct snapshot_output; const char *get_home_dir(void); int notify_thread_pipe(int wpipe); @@ -27,5 +28,7 @@ void ht_cleanup_push(struct lttng_ht *ht); int loglevels_match(int a_loglevel_type, int a_loglevel_value, int b_loglevel_type, int b_loglevel_value, int loglevel_all_type); const char *session_get_base_path(const struct ltt_session *session); +const char *snapshot_output_get_base_path( + const struct snapshot_output *snapshot_output); #endif /* _LTT_UTILS_H */ diff --git a/src/bin/lttng/commands/rotate.c b/src/bin/lttng/commands/rotate.c index 312bbba9c..e2a4c509a 100644 --- a/src/bin/lttng/commands/rotate.c +++ b/src/bin/lttng/commands/rotate.c @@ -193,6 +193,7 @@ static int rotate_tracing(char *session_name) rotation_status = lttng_rotation_handle_get_state(handle, &rotation_state); if (rotation_status != LTTNG_ROTATION_STATUS_OK) { + MSG(""); ERR("Failed to query the state of the rotation."); goto error; } @@ -200,14 +201,14 @@ static int rotate_tracing(char *session_name) if (rotation_state == LTTNG_ROTATION_STATE_ONGOING) { ret = usleep(DEFAULT_DATA_AVAILABILITY_WAIT_TIME); if (ret) { - PERROR("usleep"); + PERROR("\nusleep"); goto error; } _MSG("."); ret = fflush(stdout); if (ret) { - PERROR("fflush"); + PERROR("\nfflush"); goto error; } } diff --git a/src/bin/lttng/commands/snapshot.c b/src/bin/lttng/commands/snapshot.c index 33ab5d995..904ebae15 100644 --- a/src/bin/lttng/commands/snapshot.c +++ b/src/bin/lttng/commands/snapshot.c @@ -375,6 +375,7 @@ static int cmd_del_output(int argc, const char **argv) long id; if (argc < 2) { + ERR("A snapshot output name or id must be provided to delete a snapshot output."); ret = CMD_ERROR; goto end; } diff --git a/src/common/compat/directory-handle.h b/src/common/compat/directory-handle.h index bdf216200..f50cef674 100644 --- a/src/common/compat/directory-handle.h +++ b/src/common/compat/directory-handle.h @@ -177,4 +177,37 @@ int lttng_directory_handle_unlink_file_as_user( const char *filename, const struct lttng_credentials *creds); +LTTNG_HIDDEN +int lttng_directory_handle_rename( + const struct lttng_directory_handle *handle, + const char *old, const char *new); + +LTTNG_HIDDEN +int lttng_directory_handle_rename_as_user( + const struct lttng_directory_handle *handle, + const char *old, const char *new, + const struct lttng_credentials *creds); + +LTTNG_HIDDEN +int lttng_directory_handle_rmdir( + const struct lttng_directory_handle *handle, + const char *name); + +LTTNG_HIDDEN +int lttng_directory_handle_rmdir_as_user( + const struct lttng_directory_handle *handle, + const char *name, + const struct lttng_credentials *creds); + +LTTNG_HIDDEN +int lttng_directory_handle_rmdir_recursive( + const struct lttng_directory_handle *handle, + const char *name); + +LTTNG_HIDDEN +int lttng_directory_handle_rmdir_recursive_as_user( + const struct lttng_directory_handle *handle, + const char *name, + const struct lttng_credentials *creds); + #endif /* _COMPAT_PATH_HANDLE_H */ diff --git a/src/common/consumer/consumer-metadata-cache.c b/src/common/consumer/consumer-metadata-cache.c index 5e9dca3ea..65702a229 100644 --- a/src/common/consumer/consumer-metadata-cache.c +++ b/src/common/consumer/consumer-metadata-cache.c @@ -120,10 +120,20 @@ int consumer_metadata_wakeup_pipe(const struct lttng_consumer_channel *channel) write_ret = lttng_write(channel->metadata_stream->ust_metadata_poll_pipe[1], &dummy, 1); if (write_ret < 1) { - PERROR("Wake-up UST metadata pipe"); - ret = -1; - goto end; - } + if (errno == EWOULDBLOCK) { + /* + * This is fine, the metadata poll thread + * is having a hard time keeping-up, but + * it will eventually wake-up and consume + * the available data. + */ + ret = 0; + } else { + PERROR("Wake-up UST metadata pipe"); + ret = -1; + goto end; + } + } } end: diff --git a/src/common/consumer/consumer-stream.c b/src/common/consumer/consumer-stream.c index ca2c4536c..32441ea2c 100644 --- a/src/common/consumer/consumer-stream.c +++ b/src/common/consumer/consumer-stream.c @@ -164,6 +164,9 @@ void consumer_stream_close(struct lttng_consumer_stream *stream) stream->index_file = NULL; } + lttng_trace_chunk_put(stream->trace_chunk); + stream->trace_chunk = NULL; + /* Check and cleanup relayd if needed. */ rcu_read_lock(); relayd = consumer_find_relayd(stream->net_seq_idx); @@ -343,6 +346,8 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream, } /* Free stream within a RCU call. */ + lttng_trace_chunk_put(stream->trace_chunk); + stream->trace_chunk = NULL; consumer_stream_free(stream); } @@ -556,3 +561,87 @@ end: rcu_read_unlock(); return ret; } + +int consumer_stream_create_output_files(struct lttng_consumer_stream *stream, + bool create_index) +{ + int ret; + enum lttng_trace_chunk_status chunk_status; + const int flags = O_WRONLY | O_CREAT | O_TRUNC; + const mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP; + char stream_path[LTTNG_PATH_MAX]; + + ASSERT_LOCKED(stream->lock); + assert(stream->trace_chunk); + + ret = utils_stream_file_path(stream->chan->pathname, stream->name, + stream->chan->tracefile_size, + stream->chan->tracefile_count, NULL, + stream_path, sizeof(stream_path)); + if (ret < 0) { + goto end; + } + + if (stream->out_fd >= 0) { + ret = close(stream->out_fd); + if (ret < 0) { + PERROR("Failed to close stream file \"%s\"", + stream->name); + goto end; + } + stream->out_fd = -1; + } + + DBG("Opening stream output file \"%s\"", stream_path); + chunk_status = lttng_trace_chunk_open_file(stream->trace_chunk, stream_path, + flags, mode, &stream->out_fd); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + ERR("Failed to open stream file \"%s\"", stream->name); + ret = -1; + goto end; + } + + if (!stream->metadata_flag && (create_index || stream->index_file)) { + if (stream->index_file) { + lttng_index_file_put(stream->index_file); + } + stream->index_file = lttng_index_file_create_from_trace_chunk( + stream->trace_chunk, + stream->chan->pathname, + stream->name, + stream->chan->tracefile_size, + stream->tracefile_count_current, + CTF_INDEX_MAJOR, CTF_INDEX_MINOR, + false); + if (!stream->index_file) { + ret = -1; + goto end; + } + } + + /* Reset current size because we just perform a rotation. */ + stream->tracefile_size_current = 0; + stream->out_fd_offset = 0; +end: + return ret; +} + +int consumer_stream_rotate_output_files(struct lttng_consumer_stream *stream) +{ + int ret; + + stream->tracefile_count_current++; + if (stream->chan->tracefile_count > 0) { + stream->tracefile_count_current %= + stream->chan->tracefile_count; + } + + DBG("Rotating output files of stream \"%s\"", stream->name); + ret = consumer_stream_create_output_files(stream, true); + if (ret) { + goto end; + } + +end: + return ret; +} diff --git a/src/common/consumer/consumer-stream.h b/src/common/consumer/consumer-stream.h index c5fb09732..8bda682bd 100644 --- a/src/common/consumer/consumer-stream.h +++ b/src/common/consumer/consumer-stream.h @@ -77,4 +77,21 @@ int consumer_stream_write_index(struct lttng_consumer_stream *stream, int consumer_stream_sync_metadata(struct lttng_consumer_local_data *ctx, uint64_t session_id); +/* + * Create the output files of a local stream. + * + * This must be called with the channel's and the stream's lock held. + */ +int consumer_stream_create_output_files(struct lttng_consumer_stream *stream, + bool create_index); + +/* + * Rotate the output files of a local stream. This will change the + * active output files of both the binary and index in accordance + * with the stream's configuration (stream file count). + * + * This must be called with the channel's and the stream's lock held. + */ +int consumer_stream_rotate_output_files(struct lttng_consumer_stream *stream); + #endif /* LTTNG_CONSUMER_STREAM_H */ diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index 3c20c57bd..7a7f7954b 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -33,6 +33,7 @@ #include #include #include +#include #include #include #include @@ -48,6 +49,9 @@ #include #include #include +#include +#include +#include struct lttng_consumer_global_data consumer_data = { .stream_count = 0, @@ -358,7 +362,6 @@ void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd) */ void consumer_del_channel(struct lttng_consumer_channel *channel) { - int ret; struct lttng_ht_iter iter; DBG("Consumer delete channel key %" PRIu64, channel->key); @@ -389,17 +392,25 @@ void consumer_del_channel(struct lttng_consumer_channel *channel) goto end; } - rcu_read_lock(); - iter.iter.node = &channel->node.node; - ret = lttng_ht_del(consumer_data.channel_ht, &iter); - assert(!ret); + lttng_trace_chunk_put(channel->trace_chunk); + channel->trace_chunk = NULL; - iter.iter.node = &channel->channels_by_session_id_ht_node.node; - ret = lttng_ht_del(consumer_data.channels_by_session_id_ht, &iter); - assert(!ret); - rcu_read_unlock(); + if (channel->is_published) { + int ret; + + rcu_read_lock(); + iter.iter.node = &channel->node.node; + ret = lttng_ht_del(consumer_data.channel_ht, &iter); + assert(!ret); - call_rcu(&channel->node.head, free_channel_rcu); + iter.iter.node = &channel->channels_by_session_id_ht_node.node; + ret = lttng_ht_del(consumer_data.channels_by_session_id_ht, + &iter); + assert(!ret); + rcu_read_unlock(); + } + + call_rcu(&channel->node.head, free_channel_rcu); end: pthread_mutex_unlock(&channel->lock); pthread_mutex_unlock(&consumer_data.lock); @@ -549,23 +560,18 @@ void consumer_stream_update_channel_attributes( { stream->channel_read_only_attributes.tracefile_size = channel->tracefile_size; - memcpy(stream->channel_read_only_attributes.path, channel->pathname, - sizeof(stream->channel_read_only_attributes.path)); } struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, uint64_t stream_key, - enum lttng_consumer_stream_state state, const char *channel_name, - uid_t uid, - gid_t gid, uint64_t relayd_id, uint64_t session_id, + struct lttng_trace_chunk *trace_chunk, int cpu, int *alloc_ret, enum consumer_channel_type type, - unsigned int monitor, - uint64_t trace_archive_id) + unsigned int monitor) { int ret; struct lttng_consumer_stream *stream; @@ -577,22 +583,24 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, goto end; } - rcu_read_lock(); + if (trace_chunk && !lttng_trace_chunk_get(trace_chunk)) { + ERR("Failed to acquire trace chunk reference during the creation of a stream"); + ret = -1; + goto error; + } + rcu_read_lock(); stream->key = stream_key; + stream->trace_chunk = trace_chunk; stream->out_fd = -1; stream->out_fd_offset = 0; stream->output_written = 0; - stream->state = state; - stream->uid = uid; - stream->gid = gid; stream->net_seq_idx = relayd_id; stream->session_id = session_id; stream->monitor = monitor; stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE; stream->index_file = NULL; stream->last_sequence_number = -1ULL; - stream->trace_archive_id = trace_archive_id; pthread_mutex_init(&stream->lock, NULL); pthread_mutex_init(&stream->metadata_timer_lock, NULL); @@ -633,6 +641,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, error: rcu_read_unlock(); + lttng_trace_chunk_put(stream->trace_chunk); free(stream); end: if (alloc_ret) { @@ -810,8 +819,9 @@ int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, pthread_mutex_lock(&relayd->ctrl_sock_mutex); ret = relayd_add_stream(&relayd->control_sock, stream->name, path, &stream->relayd_stream_id, - stream->chan->tracefile_size, stream->chan->tracefile_count, - stream->trace_archive_id); + stream->chan->tracefile_size, + stream->chan->tracefile_count, + stream->trace_chunk); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { ERR("Relayd add stream failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); @@ -952,6 +962,151 @@ error: return outfd; } +/* + * Trigger a dump of the metadata content. Following/during the succesful + * completion of this call, the metadata poll thread will start receiving + * metadata packets to consume. + * + * The caller must hold the channel and stream locks. + */ +static +int consumer_metadata_stream_dump(struct lttng_consumer_stream *stream) +{ + int ret; + + ASSERT_LOCKED(stream->chan->lock); + ASSERT_LOCKED(stream->lock); + assert(stream->metadata_flag); + assert(stream->chan->trace_chunk); + + switch (consumer_data.type) { + case LTTNG_CONSUMER_KERNEL: + /* + * Reset the position of what has been read from the + * metadata cache to 0 so we can dump it again. + */ + ret = kernctl_metadata_cache_dump(stream->wait_fd); + break; + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: + /* + * Reset the position pushed from the metadata cache so it + * will write from the beginning on the next push. + */ + stream->ust_metadata_pushed = 0; + ret = consumer_metadata_wakeup_pipe(stream->chan); + break; + default: + ERR("Unknown consumer_data type"); + abort(); + } + if (ret < 0) { + ERR("Failed to dump the metadata cache"); + } + return ret; +} + +static +int lttng_consumer_channel_set_trace_chunk( + struct lttng_consumer_channel *channel, + struct lttng_trace_chunk *new_trace_chunk) +{ + int ret = 0; + const bool is_local_trace = channel->relayd_id == -1ULL; + bool update_stream_trace_chunk; + struct cds_lfht_iter iter; + struct lttng_consumer_stream *stream; + unsigned long channel_hash; + + pthread_mutex_lock(&channel->lock); + /* + * A stream can transition to a state where it and its channel + * no longer belong to a trace chunk. For instance, this happens when + * a session is rotated while it is inactive. After the rotation + * of an inactive session completes, the channel and its streams no + * longer belong to a trace chunk. + * + * However, if a session is stopped, rotated, and started again, + * the session daemon will create a new chunk and send it to its peers. + * In that case, the streams' transition to a new chunk can be performed + * immediately. + * + * This trace chunk transition could also be performed lazily when + * a buffer is consumed. However, creating the files here allows the + * consumer daemon to report any creation error to the session daemon + * and cause the start of the tracing session to fail. + */ + update_stream_trace_chunk = !channel->trace_chunk && new_trace_chunk; + + /* + * The acquisition of the reference cannot fail (barring + * a severe internal error) since a reference to the published + * chunk is already held by the caller. + */ + if (new_trace_chunk) { + const bool acquired_reference = lttng_trace_chunk_get( + new_trace_chunk); + + assert(acquired_reference); + } + + lttng_trace_chunk_put(channel->trace_chunk); + channel->trace_chunk = new_trace_chunk; + if (!is_local_trace || !new_trace_chunk) { + /* Not an error. */ + goto end; + } + + if (!update_stream_trace_chunk) { + goto end; + } + + channel_hash = consumer_data.stream_per_chan_id_ht->hash_fct( + &channel->key, lttng_ht_seed); + rcu_read_lock(); + cds_lfht_for_each_entry_duplicate(consumer_data.stream_per_chan_id_ht->ht, + channel_hash, + consumer_data.stream_per_chan_id_ht->match_fct, + &channel->key, &iter, stream, node_channel_id.node) { + bool acquired_reference, should_regenerate_metadata = false; + + acquired_reference = lttng_trace_chunk_get(channel->trace_chunk); + assert(acquired_reference); + + pthread_mutex_lock(&stream->lock); + + /* + * On a transition from "no-chunk" to a new chunk, a metadata + * stream's content must be entirely dumped. This must occcur + * _after_ the creation of the metadata stream's output files + * as the consumption thread (not necessarily the one executing + * this) may start to consume during the call to + * consumer_metadata_stream_dump(). + */ + should_regenerate_metadata = + stream->metadata_flag && + !stream->trace_chunk && channel->trace_chunk; + stream->trace_chunk = channel->trace_chunk; + ret = consumer_stream_create_output_files(stream, true); + if (ret) { + pthread_mutex_unlock(&stream->lock); + goto end_rcu_unlock; + } + if (should_regenerate_metadata) { + ret = consumer_metadata_stream_dump(stream); + } + pthread_mutex_unlock(&stream->lock); + if (ret) { + goto end_rcu_unlock; + } + } +end_rcu_unlock: + rcu_read_unlock(); +end: + pthread_mutex_unlock(&channel->lock); + return ret; +} + /* * Allocate and return a new lttng_consumer_channel object using the given key * to initialize the hash table node. @@ -960,10 +1115,9 @@ error: */ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, uint64_t session_id, + const uint64_t *chunk_id, const char *pathname, const char *name, - uid_t uid, - gid_t gid, uint64_t relayd_id, enum lttng_event_output output, uint64_t tracefile_size, @@ -974,7 +1128,18 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, const char *root_shm_path, const char *shm_path) { - struct lttng_consumer_channel *channel; + struct lttng_consumer_channel *channel = NULL; + struct lttng_trace_chunk *trace_chunk = NULL; + + if (chunk_id) { + trace_chunk = lttng_trace_chunk_registry_find_chunk( + consumer_data.chunk_registry, session_id, + *chunk_id); + if (!trace_chunk) { + ERR("Failed to find trace chunk reference during creation of channel"); + goto end; + } + } channel = zmalloc(sizeof(*channel)); if (channel == NULL) { @@ -986,8 +1151,6 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, channel->refcount = 0; channel->session_id = session_id; channel->session_id_per_pid = session_id_per_pid; - channel->uid = uid; - channel->gid = gid; channel->relayd_id = relayd_id; channel->tracefile_size = tracefile_size; channel->tracefile_count = tracefile_count; @@ -1043,13 +1206,25 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, channel->session_id); channel->wait_fd = -1; - CDS_INIT_LIST_HEAD(&channel->streams.head); + if (trace_chunk) { + int ret = lttng_consumer_channel_set_trace_chunk(channel, + trace_chunk); + if (ret) { + goto error; + } + } + DBG("Allocated channel (key %" PRIu64 ")", channel->key); end: + lttng_trace_chunk_put(trace_chunk); return channel; +error: + consumer_del_channel(channel); + channel = NULL; + goto end; } /* @@ -1076,6 +1251,7 @@ int consumer_add_channel(struct lttng_consumer_channel *channel, lttng_ht_add_u64(consumer_data.channels_by_session_id_ht, &channel->channels_by_session_id_ht_node); rcu_read_unlock(); + channel->is_published = true; pthread_mutex_unlock(&channel->timer_lock); pthread_mutex_unlock(&channel->lock); @@ -1125,16 +1301,13 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx, * closed by the polling thread after a wakeup on the data_pipe or * metadata_pipe. */ - if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM || - stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) { + if (stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) { (*nb_inactive_fd)++; continue; } /* * This clobbers way too much the debug output. Uncomment that if you * need it for debugging purposes. - * - * DBG("Active FD %d", stream->wait_fd); */ (*pollfd)[i].fd = stream->wait_fd; (*pollfd)[i].events = POLLIN | POLLPRI; @@ -1529,7 +1702,7 @@ end: * core function for writing trace buffers to either the local filesystem or * the network. * - * It must be called with the stream lock held. + * It must be called with the stream and the channel lock held. * * Careful review MUST be put if any changes occur! * @@ -1553,6 +1726,8 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( /* RCU lock for the relayd pointer */ rcu_read_lock(); + assert(stream->chan->trace_chunk); + /* Flag that the current stream if set for network streaming. */ if (stream->net_seq_idx != (uint64_t) -1ULL) { relayd = consumer_find_relayd(stream->net_seq_idx); @@ -1651,32 +1826,11 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( if (stream->chan->tracefile_size > 0 && (stream->tracefile_size_current + len) > stream->chan->tracefile_size) { - ret = utils_rotate_stream_file(stream->chan->pathname, - stream->name, stream->chan->tracefile_size, - stream->chan->tracefile_count, stream->uid, stream->gid, - stream->out_fd, &(stream->tracefile_count_current), - &stream->out_fd); - if (ret < 0) { - ERR("Rotating output file"); + ret = consumer_stream_rotate_output_files(stream); + if (ret) { goto end; } outfd = stream->out_fd; - - if (stream->index_file) { - lttng_index_file_put(stream->index_file); - stream->index_file = lttng_index_file_create(stream->chan->pathname, - stream->name, stream->uid, stream->gid, - stream->chan->tracefile_size, - stream->tracefile_count_current, - CTF_INDEX_MAJOR, CTF_INDEX_MINOR); - if (!stream->index_file) { - goto end; - } - } - - /* Reset current size because we just perform a rotation. */ - stream->tracefile_size_current = 0; - stream->out_fd_offset = 0; orig_offset = 0; } stream->tracefile_size_current += len; @@ -1853,33 +2007,12 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( if (stream->chan->tracefile_size > 0 && (stream->tracefile_size_current + len) > stream->chan->tracefile_size) { - ret = utils_rotate_stream_file(stream->chan->pathname, - stream->name, stream->chan->tracefile_size, - stream->chan->tracefile_count, stream->uid, stream->gid, - stream->out_fd, &(stream->tracefile_count_current), - &stream->out_fd); + ret = consumer_stream_rotate_output_files(stream); if (ret < 0) { written = ret; - ERR("Rotating output file"); goto end; } outfd = stream->out_fd; - - if (stream->index_file) { - lttng_index_file_put(stream->index_file); - stream->index_file = lttng_index_file_create(stream->chan->pathname, - stream->name, stream->uid, stream->gid, - stream->chan->tracefile_size, - stream->tracefile_count_current, - CTF_INDEX_MAJOR, CTF_INDEX_MINOR); - if (!stream->index_file) { - goto end; - } - } - - /* Reset current size because we just perform a rotation. */ - stream->tracefile_size_current = 0; - stream->out_fd_offset = 0; orig_offset = 0; } stream->tracefile_size_current += len; @@ -2170,6 +2303,8 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, consumer_del_channel(free_chan); } + lttng_trace_chunk_put(stream->trace_chunk); + stream->trace_chunk = NULL; consumer_stream_free(stream); } @@ -2294,46 +2429,6 @@ static void validate_endpoint_status_metadata_stream( rcu_read_unlock(); } -/* - * Perform operations that need to be done after a stream has - * rotated and released the stream lock. - * - * Multiple rotations cannot occur simultaneously, so we know the state of the - * "rotated" stream flag cannot change. - * - * This MUST be called WITHOUT the stream lock held. - */ -static -int consumer_post_rotation(struct lttng_consumer_stream *stream, - struct lttng_consumer_local_data *ctx) -{ - int ret = 0; - - pthread_mutex_lock(&stream->chan->lock); - - switch (consumer_data.type) { - case LTTNG_CONSUMER_KERNEL: - break; - case LTTNG_CONSUMER32_UST: - case LTTNG_CONSUMER64_UST: - /* - * The ust_metadata_pushed counter has been reset to 0, so now - * we can wakeup the metadata thread so it dumps the metadata - * cache to the new file. - */ - if (stream->metadata_flag) { - consumer_metadata_wakeup_pipe(stream->chan); - } - break; - default: - ERR("Unknown consumer_data type"); - abort(); - } - - pthread_mutex_unlock(&stream->chan->lock); - return ret; -} - /* * Thread polls on metadata file descriptor and write them on disk or on the * network. @@ -3370,9 +3465,8 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, struct lttng_consumer_local_data *ctx) { ssize_t ret; - int rotate_ret; - bool rotated = false; + pthread_mutex_lock(&stream->chan->lock); pthread_mutex_lock(&stream->lock); if (stream->metadata_flag) { pthread_mutex_lock(&stream->metadata_rdv_lock); @@ -3380,11 +3474,11 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, switch (consumer_data.type) { case LTTNG_CONSUMER_KERNEL: - ret = lttng_kconsumer_read_subbuffer(stream, ctx, &rotated); + ret = lttng_kconsumer_read_subbuffer(stream, ctx); break; case LTTNG_CONSUMER32_UST: case LTTNG_CONSUMER64_UST: - ret = lttng_ustconsumer_read_subbuffer(stream, ctx, &rotated); + ret = lttng_ustconsumer_read_subbuffer(stream, ctx); break; default: ERR("Unknown consumer_data type"); @@ -3398,13 +3492,7 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, pthread_mutex_unlock(&stream->metadata_rdv_lock); } pthread_mutex_unlock(&stream->lock); - if (rotated) { - rotate_ret = consumer_post_rotation(stream, ctx); - if (rotate_ret < 0) { - ERR("Failed after a rotation"); - ret = -1; - } - } + pthread_mutex_unlock(&stream->chan->lock); return ret; } @@ -3916,8 +4004,7 @@ end: * Returns 0 on success, < 0 on error */ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, - uint64_t key, const char *path, uint64_t relayd_id, - uint32_t metadata, uint64_t new_chunk_id, + uint64_t key, uint64_t relayd_id, uint32_t metadata, struct lttng_consumer_local_data *ctx) { int ret; @@ -3930,30 +4017,6 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, rcu_read_lock(); pthread_mutex_lock(&channel->lock); - channel->current_chunk_id = new_chunk_id; - - ret = lttng_strncpy(channel->pathname, path, sizeof(channel->pathname)); - if (ret) { - ERR("Failed to copy new path to channel during channel rotation"); - ret = -1; - goto end_unlock_channel; - } - - if (relayd_id == -1ULL) { - /* - * The domain path (/ust or /kernel) has been created before, we - * now need to create the last part of the path: the application/user - * specific section (uid/1000/64-bit). - */ - ret = utils_mkdir_recursive(channel->pathname, S_IRWXU | S_IRWXG, - channel->uid, channel->gid); - if (ret < 0) { - ERR("Failed to create trace directory at %s during rotation", - channel->pathname); - ret = -1; - goto end_unlock_channel; - } - } cds_lfht_for_each_entry_duplicate(ht->ht, ht->hash_fct(&channel->key, lttng_ht_seed), @@ -3968,13 +4031,6 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, */ pthread_mutex_lock(&stream->lock); - ret = lttng_strncpy(stream->channel_read_only_attributes.path, - channel->pathname, - sizeof(stream->channel_read_only_attributes.path)); - if (ret) { - ERR("Failed to sample channel path name during channel rotation"); - goto end_unlock_stream; - } ret = lttng_consumer_sample_snapshot_positions(stream); if (ret < 0) { ERR("Failed to sample snapshot position during channel rotation"); @@ -4010,7 +4066,6 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, end_unlock_stream: pthread_mutex_unlock(&stream->lock); -end_unlock_channel: pthread_mutex_unlock(&channel->lock); end: rcu_read_unlock(); @@ -4077,64 +4132,39 @@ void lttng_consumer_reset_stream_rotate_state(struct lttng_consumer_stream *stre /* * Perform the rotation a local stream file. */ +static int rotate_local_stream(struct lttng_consumer_local_data *ctx, struct lttng_consumer_stream *stream) { - int ret; + int ret = 0; - DBG("Rotate local stream: stream key %" PRIu64 ", channel key %" PRIu64 " at path %s", + DBG("Rotate local stream: stream key %" PRIu64 ", channel key %" PRIu64, stream->key, - stream->chan->key, - stream->channel_read_only_attributes.path); - - ret = close(stream->out_fd); - if (ret < 0) { - PERROR("Closing trace file (fd %d), stream %" PRIu64, - stream->out_fd, stream->key); - assert(0); - goto error; - } - - ret = utils_create_stream_file( - stream->channel_read_only_attributes.path, - stream->name, - stream->channel_read_only_attributes.tracefile_size, - stream->tracefile_count_current, - stream->uid, stream->gid, NULL); - if (ret < 0) { - ERR("Rotate create stream file"); - goto error; - } - stream->out_fd = ret; + stream->chan->key); stream->tracefile_size_current = 0; + stream->tracefile_count_current = 0; - if (!stream->metadata_flag) { - struct lttng_index_file *index_file; + if (stream->out_fd >= 0) { + ret = close(stream->out_fd); + if (ret) { + PERROR("Failed to close stream out_fd of channel \"%s\"", + stream->chan->name); + } + stream->out_fd = -1; + } + if (stream->index_file) { lttng_index_file_put(stream->index_file); - - index_file = lttng_index_file_create( - stream->channel_read_only_attributes.path, - stream->name, stream->uid, stream->gid, - stream->channel_read_only_attributes.tracefile_size, - stream->tracefile_count_current, - CTF_INDEX_MAJOR, CTF_INDEX_MINOR); - if (!index_file) { - ERR("Create index file during rotation"); - goto error; - } - stream->index_file = index_file; - stream->out_fd_offset = 0; + stream->index_file = NULL; } - ret = 0; - goto end; + if (!stream->trace_chunk) { + goto end; + } -error: - ret = -1; + ret = consumer_stream_create_output_files(stream, true); end: return ret; - } /* @@ -4145,6 +4175,8 @@ int rotate_relay_stream(struct lttng_consumer_local_data *ctx, { int ret; struct consumer_relayd_sock_pair *relayd; + uint64_t chunk_id; + enum lttng_trace_chunk_status chunk_status; DBG("Rotate relay stream"); relayd = consumer_find_relayd(stream->net_seq_idx); @@ -4154,11 +4186,19 @@ int rotate_relay_stream(struct lttng_consumer_local_data *ctx, goto end; } + chunk_status = lttng_trace_chunk_get_id(stream->chan->trace_chunk, + &chunk_id); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + ERR("Failed to retrieve the id of the current trace chunk of channel \"%s\"", + stream->chan->name); + ret = -1; + goto end; + } + pthread_mutex_lock(&relayd->ctrl_sock_mutex); ret = relayd_rotate_stream(&relayd->control_sock, stream->relayd_stream_id, - stream->channel_read_only_attributes.path, - stream->chan->current_chunk_id, + chunk_id, stream->last_sequence_number); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { @@ -4175,60 +4215,74 @@ end: /* * Performs the stream rotation for the rotate session feature if needed. - * It must be called with the stream lock held. + * It must be called with the channel and stream locks held. * * Return 0 on success, a negative number of error. */ int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx, - struct lttng_consumer_stream *stream, bool *rotated) + struct lttng_consumer_stream *stream) { int ret; DBG("Consumer rotate stream %" PRIu64, stream->key); + /* + * Update the stream's 'current' chunk to the session's (channel) + * now-current chunk. + */ + lttng_trace_chunk_put(stream->trace_chunk); + if (stream->chan->trace_chunk == stream->trace_chunk) { + /* + * A channel can be rotated and not have a "next" chunk + * to transition to. In that case, the channel's "current chunk" + * has not been closed yet, but it has not been updated to + * a "next" trace chunk either. Hence, the stream, like its + * parent channel, becomes part of no chunk and can't output + * anything until a new trace chunk is created. + */ + stream->trace_chunk = NULL; + } else if (stream->chan->trace_chunk && + !lttng_trace_chunk_get(stream->chan->trace_chunk)) { + ERR("Failed to acquire a reference to channel's trace chunk during stream rotation"); + ret = -1; + goto error; + } else { + /* + * Update the stream's trace chunk to its parent channel's + * current trace chunk. + */ + stream->trace_chunk = stream->chan->trace_chunk; + } + if (stream->net_seq_idx != (uint64_t) -1ULL) { ret = rotate_relay_stream(ctx, stream); } else { ret = rotate_local_stream(ctx, stream); } - stream->trace_archive_id++; if (ret < 0) { ERR("Failed to rotate stream, ret = %i", ret); goto error; } - if (stream->metadata_flag) { - switch (consumer_data.type) { - case LTTNG_CONSUMER_KERNEL: - /* - * Reset the position of what has been read from the metadata - * cache to 0 so we can dump it again. - */ - ret = kernctl_metadata_cache_dump(stream->wait_fd); - if (ret < 0) { - ERR("Failed to dump the kernel metadata cache after rotation"); - goto error; - } - break; - case LTTNG_CONSUMER32_UST: - case LTTNG_CONSUMER64_UST: - /* - * Reset the position pushed from the metadata cache so it - * will write from the beginning on the next push. - */ - stream->ust_metadata_pushed = 0; - break; - default: - ERR("Unknown consumer_data type"); - abort(); + if (stream->metadata_flag && stream->trace_chunk) { + /* + * If the stream has transitioned to a new trace + * chunk, the metadata should be re-dumped to the + * newest chunk. + * + * However, it is possible for a stream to transition to + * a "no-chunk" state. This can happen if a rotation + * occurs on an inactive session. In such cases, the metadata + * regeneration will happen when the next trace chunk is + * created. + */ + ret = consumer_metadata_stream_dump(stream); + if (ret) { + goto error; } } lttng_consumer_reset_stream_rotate_state(stream); - if (rotated) { - *rotated = true; - } - ret = 0; error: @@ -4264,21 +4318,19 @@ int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel, stream, node_channel_id.node) { health_code_update(); + pthread_mutex_lock(&stream->chan->lock); pthread_mutex_lock(&stream->lock); if (!stream->rotate_ready) { pthread_mutex_unlock(&stream->lock); + pthread_mutex_unlock(&stream->chan->lock); continue; } DBG("Consumer rotate ready stream %" PRIu64, stream->key); - ret = lttng_consumer_rotate_stream(ctx, stream, NULL); + ret = lttng_consumer_rotate_stream(ctx, stream); pthread_mutex_unlock(&stream->lock); - if (ret) { - goto end; - } - - ret = consumer_post_rotation(stream, ctx); + pthread_mutex_unlock(&stream->chan->lock); if (ret) { goto end; } @@ -4291,218 +4343,298 @@ end: return ret; } -static -int rotate_rename_local(const char *old_path, const char *new_path, - uid_t uid, gid_t gid) +enum lttcomm_return_code lttng_consumer_init_command( + struct lttng_consumer_local_data *ctx, + const lttng_uuid sessiond_uuid) { - int ret; - - assert(old_path); - assert(new_path); - - ret = utils_mkdir_recursive(new_path, S_IRWXU | S_IRWXG, uid, gid); - if (ret < 0) { - ERR("Create directory on rotate"); - goto end; - } + enum lttcomm_return_code ret; + char uuid_str[UUID_STR_LEN]; - ret = rename(old_path, new_path); - if (ret < 0 && errno != ENOENT) { - PERROR("Rename completed rotation chunk"); + if (ctx->sessiond_uuid.is_set) { + ret = LTTCOMM_CONSUMERD_ALREADY_SET; goto end; } - ret = 0; + ctx->sessiond_uuid.is_set = true; + memcpy(ctx->sessiond_uuid.value, sessiond_uuid, sizeof(lttng_uuid)); + ret = LTTCOMM_CONSUMERD_SUCCESS; + lttng_uuid_to_str(sessiond_uuid, uuid_str); + DBG("Received session daemon UUID: %s", uuid_str); end: return ret; } -static -int rotate_rename_relay(const char *old_path, const char *new_path, - uint64_t relayd_id) +enum lttcomm_return_code lttng_consumer_create_trace_chunk( + const uint64_t *relayd_id, uint64_t session_id, + uint64_t chunk_id, + time_t chunk_creation_timestamp, + const char *chunk_override_name, + const struct lttng_credentials *credentials, + struct lttng_directory_handle *chunk_directory_handle) { int ret; - struct consumer_relayd_sock_pair *relayd; - - relayd = consumer_find_relayd(relayd_id); - if (!relayd) { - ERR("Failed to find relayd while running rotate_rename_relay command"); - ret = -1; - goto end; - } - - pthread_mutex_lock(&relayd->ctrl_sock_mutex); - ret = relayd_rotate_rename(&relayd->control_sock, old_path, new_path); - if (ret < 0) { - ERR("Relayd rotate rename failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); - lttng_consumer_cleanup_relayd(relayd); - } - pthread_mutex_unlock(&relayd->ctrl_sock_mutex); -end: - return ret; -} - -int lttng_consumer_rotate_rename(const char *old_path, const char *new_path, - uid_t uid, gid_t gid, uint64_t relayd_id) -{ - if (relayd_id != -1ULL) { - return rotate_rename_relay(old_path, new_path, relayd_id); - } else { - return rotate_rename_local(old_path, new_path, uid, gid); - } -} - -/* Stream lock must be acquired by the caller. */ -static -bool check_stream_rotation_pending(const struct lttng_consumer_stream *stream, - uint64_t session_id, uint64_t chunk_id) -{ - bool pending = false; + enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS; + struct lttng_trace_chunk *created_chunk, *published_chunk; + enum lttng_trace_chunk_status chunk_status; + char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)]; + char creation_timestamp_buffer[ISO8601_STR_LEN]; + const char *relayd_id_str = "(none)"; + const char *creation_timestamp_str; + struct lttng_ht_iter iter; + struct lttng_consumer_channel *channel; - if (stream->session_id != session_id) { - /* Skip. */ - goto end; - } + if (relayd_id) { + /* Only used for logging purposes. */ + ret = snprintf(relayd_id_buffer, sizeof(relayd_id_buffer), + "%" PRIu64, *relayd_id); + if (ret > 0 && ret < sizeof(relayd_id_buffer)) { + relayd_id_str = relayd_id_buffer; + } else { + relayd_id_str = "(formatting error)"; + } + } + + /* Local protocol error. */ + assert(chunk_creation_timestamp); + ret = time_to_iso8601_str(chunk_creation_timestamp, + creation_timestamp_buffer, + sizeof(creation_timestamp_buffer)); + creation_timestamp_str = !ret ? creation_timestamp_buffer : + "(formatting error)"; + + DBG("Consumer create trace chunk command: relay_id = %s" + ", session_id = %" PRIu64 ", chunk_id = %" PRIu64 + ", chunk_override_name = %s" + ", chunk_creation_timestamp = %s", + relayd_id_str, session_id, chunk_id, + chunk_override_name ? : "(none)", + creation_timestamp_str); /* - * If the stream's archive_id belongs to the chunk being rotated (or an - * even older one), it means that the consumer has not consumed all the - * buffers that belong to the chunk being rotated. Therefore, the - * rotation is considered as ongoing/pending. + * The trace chunk registry, as used by the consumer daemon, implicitly + * owns the trace chunks. This is only needed in the consumer since + * the consumer has no notion of a session beyond session IDs being + * used to identify other objects. + * + * The lttng_trace_chunk_registry_publish() call below provides a + * reference which is not released; it implicitly becomes the session + * daemon's reference to the chunk in the consumer daemon. + * + * The lifetime of trace chunks in the consumer daemon is managed by + * the session daemon through the LTTNG_CONSUMER_CREATE_TRACE_CHUNK + * and LTTNG_CONSUMER_DESTROY_TRACE_CHUNK commands. */ - pending = stream->trace_archive_id <= chunk_id; -end: - return pending; -} - -/* RCU read lock must be acquired by the caller. */ -int lttng_consumer_check_rotation_pending_local(uint64_t session_id, - uint64_t chunk_id) -{ - struct lttng_ht_iter iter; - struct lttng_consumer_stream *stream; - bool rotation_pending = false; + created_chunk = lttng_trace_chunk_create(chunk_id, + chunk_creation_timestamp); + if (!created_chunk) { + ERR("Failed to create trace chunk"); + ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED; + goto end; + } - /* Start with the metadata streams... */ - cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) { - pthread_mutex_lock(&stream->lock); - rotation_pending = check_stream_rotation_pending(stream, - session_id, chunk_id); - pthread_mutex_unlock(&stream->lock); - if (rotation_pending) { + if (chunk_override_name) { + chunk_status = lttng_trace_chunk_override_name(created_chunk, + chunk_override_name); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED; goto end; } } - /* ... followed by the data streams. */ - cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) { - pthread_mutex_lock(&stream->lock); - rotation_pending = check_stream_rotation_pending(stream, - session_id, chunk_id); - pthread_mutex_unlock(&stream->lock); - if (rotation_pending) { + if (chunk_directory_handle) { + chunk_status = lttng_trace_chunk_set_credentials(created_chunk, + credentials); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + ERR("Failed to set trace chunk credentials"); + ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED; + goto end; + } + /* + * The consumer daemon has no ownership of the chunk output + * directory. + */ + chunk_status = lttng_trace_chunk_set_as_user(created_chunk, + chunk_directory_handle); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + ERR("Failed to set trace chunk's directory handle"); + ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED; goto end; } } -end: - return !!rotation_pending; -} - -int lttng_consumer_check_rotation_pending_relay(uint64_t session_id, - uint64_t relayd_id, uint64_t chunk_id) -{ - int ret; - struct consumer_relayd_sock_pair *relayd; - - relayd = consumer_find_relayd(relayd_id); - if (!relayd) { - ERR("Failed to find relayd id %" PRIu64, relayd_id); - ret = -1; + published_chunk = lttng_trace_chunk_registry_publish_chunk( + consumer_data.chunk_registry, session_id, + created_chunk); + lttng_trace_chunk_put(created_chunk); + created_chunk = NULL; + if (!published_chunk) { + ERR("Failed to publish trace chunk"); + ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED; goto end; } - pthread_mutex_lock(&relayd->ctrl_sock_mutex); - ret = relayd_rotate_pending(&relayd->control_sock, chunk_id); - if (ret < 0) { - ERR("Relayd rotate pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); - lttng_consumer_cleanup_relayd(relayd); - } - pthread_mutex_unlock(&relayd->ctrl_sock_mutex); - -end: - return ret; -} - -static -int mkdir_local(const char *path, uid_t uid, gid_t gid) -{ - int ret; + rcu_read_lock(); + cds_lfht_for_each_entry_duplicate(consumer_data.channels_by_session_id_ht->ht, + consumer_data.channels_by_session_id_ht->hash_fct( + &session_id, lttng_ht_seed), + consumer_data.channels_by_session_id_ht->match_fct, + &session_id, &iter.iter, channel, + channels_by_session_id_ht_node.node) { + ret = lttng_consumer_channel_set_trace_chunk(channel, + published_chunk); + if (ret) { + /* + * Roll-back the creation of this chunk. + * + * This is important since the session daemon will + * assume that the creation of this chunk failed and + * will never ask for it to be closed, resulting + * in a leak and an inconsistent state for some + * channels. + */ + enum lttcomm_return_code close_ret; + + DBG("Failed to set new trace chunk on existing channels, rolling back"); + close_ret = lttng_consumer_close_trace_chunk(relayd_id, + session_id, chunk_id, + chunk_creation_timestamp); + if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) { + ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64 ", chunk_id = %" PRIu64, + session_id, chunk_id); + } - ret = utils_mkdir_recursive(path, S_IRWXU | S_IRWXG, uid, gid); - if (ret < 0) { - /* utils_mkdir_recursive logs an error. */ - goto end; + ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED; + break; + } } + rcu_read_unlock(); - ret = 0; + /* Release the reference returned by the "publish" operation. */ + lttng_trace_chunk_put(published_chunk); end: - return ret; + return ret_code; } -static -int mkdir_relay(const char *path, uint64_t relayd_id) +enum lttcomm_return_code lttng_consumer_close_trace_chunk( + const uint64_t *relayd_id, uint64_t session_id, + uint64_t chunk_id, time_t chunk_close_timestamp) { - int ret; - struct consumer_relayd_sock_pair *relayd; + enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS; + struct lttng_trace_chunk *chunk; + char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)]; + const char *relayd_id_str = "(none)"; + struct lttng_ht_iter iter; + struct lttng_consumer_channel *channel; + enum lttng_trace_chunk_status chunk_status; - relayd = consumer_find_relayd(relayd_id); - if (!relayd) { - ERR("Failed to find relayd"); - ret = -1; + if (relayd_id) { + int ret; + + /* Only used for logging purposes. */ + ret = snprintf(relayd_id_buffer, sizeof(relayd_id_buffer), + "%" PRIu64, *relayd_id); + if (ret > 0 && ret < sizeof(relayd_id_buffer)) { + relayd_id_str = relayd_id_buffer; + } else { + relayd_id_str = "(formatting error)"; + } + } + + DBG("Consumer close trace chunk command: relayd_id = %s" + ", session_id = %" PRIu64 + ", chunk_id = %" PRIu64, relayd_id_str, + session_id, chunk_id); + chunk = lttng_trace_chunk_registry_find_chunk( + consumer_data.chunk_registry, session_id, + chunk_id); + if (!chunk) { + ERR("Failed to find chunk: session_id = %" PRIu64 + ", chunk_id = %" PRIu64, + session_id, chunk_id); + ret_code = LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK; goto end; } - pthread_mutex_lock(&relayd->ctrl_sock_mutex); - ret = relayd_mkdir(&relayd->control_sock, path); - if (ret < 0) { - ERR("Relayd mkdir failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); - lttng_consumer_cleanup_relayd(relayd); + chunk_status = lttng_trace_chunk_set_close_timestamp(chunk, + chunk_close_timestamp); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED; + goto end; } - pthread_mutex_unlock(&relayd->ctrl_sock_mutex); - -end: - return ret; + /* + * Release the reference returned by the "find" operation and + * the session daemon's implicit reference to the chunk. + */ + lttng_trace_chunk_put(chunk); + lttng_trace_chunk_put(chunk); -} + /* + * chunk is now invalid to access as we no longer hold a reference to + * it; it is only kept around to compare it (by address) to the + * current chunk found in the session's channels. + */ + rcu_read_lock(); + cds_lfht_for_each_entry(consumer_data.channel_ht->ht, &iter.iter, + channel, node.node) { + int ret; -int lttng_consumer_mkdir(const char *path, uid_t uid, gid_t gid, - uint64_t relayd_id) -{ - if (relayd_id != -1ULL) { - return mkdir_relay(path, relayd_id); - } else { - return mkdir_local(path, uid, gid); + /* + * Only change the channel's chunk to NULL if it still + * references the chunk being closed. The channel may + * reference a newer channel in the case of a session + * rotation. When a session rotation occurs, the "next" + * chunk is created before the "current" chunk is closed. + */ + if (channel->trace_chunk != chunk) { + continue; + } + ret = lttng_consumer_channel_set_trace_chunk(channel, NULL); + if (ret) { + /* + * Attempt to close the chunk on as many channels as + * possible. + */ + ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED; + } } + rcu_read_unlock(); +end: + return ret_code; } -enum lttcomm_return_code lttng_consumer_init_command( - struct lttng_consumer_local_data *ctx, - const lttng_uuid sessiond_uuid) +enum lttcomm_return_code lttng_consumer_trace_chunk_exists( + const uint64_t *relayd_id, uint64_t session_id, + uint64_t chunk_id) { - enum lttcomm_return_code ret; - char uuid_str[UUID_STR_LEN]; - - if (ctx->sessiond_uuid.is_set) { - ret = LTTCOMM_CONSUMERD_ALREADY_SET; - goto end; - } - - ctx->sessiond_uuid.is_set = true; - memcpy(ctx->sessiond_uuid.value, sessiond_uuid, sizeof(lttng_uuid)); - ret = LTTCOMM_CONSUMERD_SUCCESS; - lttng_uuid_to_str(sessiond_uuid, uuid_str); - DBG("Received session daemon UUID: %s", uuid_str); -end: - return ret; + enum lttcomm_return_code ret_code; + struct lttng_trace_chunk *chunk; + char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)]; + const char *relayd_id_str = "(none)"; + + if (relayd_id) { + int ret; + + /* Only used for logging purposes. */ + ret = snprintf(relayd_id_buffer, sizeof(relayd_id_buffer), + "%" PRIu64, *relayd_id); + if (ret > 0 && ret < sizeof(relayd_id_buffer)) { + relayd_id_str = relayd_id_buffer; + } else { + relayd_id_str = "(formatting error)"; + } + } + + DBG("Consumer trace chunk exists command: relayd_id = %s" + ", session_id = %" PRIu64 + ", chunk_id = %" PRIu64, relayd_id_str, + session_id, chunk_id); + chunk = lttng_trace_chunk_registry_find_chunk( + consumer_data.chunk_registry, session_id, + chunk_id); + DBG("Trace chunk %s locally", chunk ? "exists" : "does not exist"); + ret_code = chunk ? LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_LOCAL : + LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK; + + lttng_trace_chunk_put(chunk); + return ret_code; } diff --git a/src/common/consumer/consumer.h b/src/common/consumer/consumer.h index 83196444b..f514aba71 100644 --- a/src/common/consumer/consumer.h +++ b/src/common/consumer/consumer.h @@ -35,6 +35,7 @@ #include #include #include +#include /* Commands for consumer */ enum lttng_consumer_command { @@ -65,18 +66,10 @@ enum lttng_consumer_command { LTTNG_CONSUMER_CLEAR_QUIESCENT_CHANNEL, LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE, LTTNG_CONSUMER_ROTATE_CHANNEL, - LTTNG_CONSUMER_ROTATE_RENAME, - LTTNG_CONSUMER_CHECK_ROTATION_PENDING_LOCAL, - LTTNG_CONSUMER_CHECK_ROTATION_PENDING_RELAY, - LTTNG_CONSUMER_MKDIR, LTTNG_CONSUMER_INIT, -}; - -/* State of each fd in consumer */ -enum lttng_consumer_stream_state { - LTTNG_CONSUMER_ACTIVE_STREAM, - LTTNG_CONSUMER_PAUSE_STREAM, - LTTNG_CONSUMER_DELETE_STREAM, + LTTNG_CONSUMER_CREATE_TRACE_CHUNK, + LTTNG_CONSUMER_CLOSE_TRACE_CHUNK, + LTTNG_CONSUMER_TRACE_CHUNK_EXISTS, }; enum lttng_consumer_type { @@ -112,6 +105,8 @@ struct stream_list { struct consumer_metadata_cache; struct lttng_consumer_channel { + /* Is the channel published in the channel hash tables? */ + bool is_published; /* HT node used for consumer_data.channel_ht */ struct lttng_ht_node_u64 node; /* HT node used for consumer_data.channels_by_session_id_ht */ @@ -122,6 +117,8 @@ struct lttng_consumer_channel { int refcount; /* Tracing session id on the session daemon side. */ uint64_t session_id; + /* Current trace chunk of the session in which this channel exists. */ + struct lttng_trace_chunk *trace_chunk; /* * Session id when requesting metadata to the session daemon for * a session with per-PID buffers. @@ -131,9 +128,6 @@ struct lttng_consumer_channel { char pathname[PATH_MAX]; /* Channel name. */ char name[LTTNG_SYMBOL_NAME_LEN]; - /* UID and GID of the session owning this channel. */ - uid_t uid; - gid_t gid; /* Relayd id of the channel. -1ULL if it does not apply. */ uint64_t relayd_id; /* @@ -230,19 +224,14 @@ struct lttng_consumer_channel { int nr_stream_fds; char root_shm_path[PATH_MAX]; char shm_path[PATH_MAX]; + /* Only set for UST channels. */ + LTTNG_OPTIONAL(struct lttng_credentials) buffer_credentials; /* Total number of discarded events for that channel. */ uint64_t discarded_events; /* Total number of missed packets due to overwriting (overwrite). */ uint64_t lost_packets; bool streams_sent_to_relayd; - - /* - * The chunk id where we currently write the data. This value is sent - * to the relay when we add a stream and when a stream rotates. This - * allows to keep track of where each stream on the relay is writing. - */ - uint64_t current_chunk_id; }; /* @@ -258,6 +247,12 @@ struct lttng_consumer_stream { struct lttng_ht_node_u64 node_session_id; /* Pointer to associated channel. */ struct lttng_consumer_channel *chan; + /* + * Current trace chunk. Holds a reference to the trace chunk. + * `chunk` can be NULL when a stream is not associated to a chunk, e.g. + * when it was created in the context of a no-output session. + */ + struct lttng_trace_chunk *trace_chunk; /* Key by which the stream is indexed for 'node'. */ uint64_t key; @@ -270,7 +265,6 @@ struct lttng_consumer_stream { off_t out_fd_offset; /* Amount of bytes written to the output */ uint64_t output_written; - enum lttng_consumer_stream_state state; int shm_fd_is_copy; int data_read; int hangup_flush_done; @@ -322,9 +316,6 @@ struct lttng_consumer_stream { /* For UST */ int wait_fd; - /* UID/GID of the user owning the session to which stream belongs */ - uid_t uid; - gid_t gid; /* Network sequence number. Indicating on which relayd socket it goes. */ uint64_t net_seq_idx; /* @@ -414,12 +405,6 @@ struct lttng_consumer_stream { uint64_t last_discarded_events; /* Copy of the sequence number of the last packet extracted. */ uint64_t last_sequence_number; - /* - * A stream is created with a trace_archive_id matching the session's - * current trace archive id at the time of the creation of the stream. - * It is incremented when the rotate_position is reached. - */ - uint64_t trace_archive_id; /* * Index file object of the index file for this stream. */ @@ -451,7 +436,6 @@ struct lttng_consumer_stream { * the stream objects when we introduce refcounting. */ struct { - char path[LTTNG_PATH_MAX]; uint64_t tracefile_size; } channel_read_only_attributes; @@ -745,23 +729,19 @@ void consumer_stream_update_channel_attributes( struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, uint64_t stream_key, - enum lttng_consumer_stream_state state, const char *channel_name, - uid_t uid, - gid_t gid, uint64_t relayd_id, uint64_t session_id, + struct lttng_trace_chunk *trace_chunk, int cpu, int *alloc_ret, enum consumer_channel_type type, - unsigned int monitor, - uint64_t trace_archive_id); + unsigned int monitor); struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, uint64_t session_id, + const uint64_t *chunk_id, const char *pathname, const char *name, - uid_t uid, - gid_t gid, uint64_t relayd_id, enum lttng_event_output output, uint64_t tracefile_size, @@ -847,23 +827,27 @@ void consumer_add_metadata_stream(struct lttng_consumer_stream *stream); void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream); int consumer_create_index_file(struct lttng_consumer_stream *stream); int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, - uint64_t key, const char *path, uint64_t relayd_id, - uint32_t metadata, uint64_t new_chunk_id, + uint64_t key, uint64_t relayd_id, uint32_t metadata, struct lttng_consumer_local_data *ctx); int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream); int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx, - struct lttng_consumer_stream *stream, bool *rotated); + struct lttng_consumer_stream *stream); int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel, uint64_t key, struct lttng_consumer_local_data *ctx); -int lttng_consumer_rotate_rename(const char *current_path, const char *new_path, - uid_t uid, gid_t gid, uint64_t relayd_id); -int lttng_consumer_check_rotation_pending_local(uint64_t session_id, - uint64_t chunk_id); -int lttng_consumer_check_rotation_pending_relay(uint64_t session_id, - uint64_t relayd_id, uint64_t chunk_id); void lttng_consumer_reset_stream_rotate_state(struct lttng_consumer_stream *stream); -int lttng_consumer_mkdir(const char *path, uid_t uid, gid_t gid, - uint64_t relayd_id); +enum lttcomm_return_code lttng_consumer_create_trace_chunk( + const uint64_t *relayd_id, uint64_t session_id, + uint64_t chunk_id, + time_t chunk_creation_timestamp, + const char *chunk_override_name, + const struct lttng_credentials *credentials, + struct lttng_directory_handle *chunk_directory_handle); +enum lttcomm_return_code lttng_consumer_close_trace_chunk( + const uint64_t *relayd_id, uint64_t session_id, + uint64_t chunk_id, time_t chunk_close_timestamp); +enum lttcomm_return_code lttng_consumer_trace_chunk_exists( + const uint64_t *relayd_id, uint64_t session_id, + uint64_t chunk_id); void lttng_consumer_cleanup_relayd(struct consumer_relayd_sock_pair *relayd); enum lttcomm_return_code lttng_consumer_init_command( struct lttng_consumer_local_data *ctx, diff --git a/src/common/defaults.h b/src/common/defaults.h index 5c93e498d..5df5127b3 100644 --- a/src/common/defaults.h +++ b/src/common/defaults.h @@ -55,8 +55,8 @@ #define DEFAULT_TRACE_OUTPUT DEFAULT_HOME_DIR "/lttng" /* Default directory where the trace are written in per domain */ -#define DEFAULT_KERNEL_TRACE_DIR "/kernel" -#define DEFAULT_UST_TRACE_DIR "/ust" +#define DEFAULT_KERNEL_TRACE_DIR "kernel" +#define DEFAULT_UST_TRACE_DIR "ust" /* Subpath for per PID or UID sessions. */ #define DEFAULT_UST_TRACE_PID_PATH "/pid" @@ -343,6 +343,13 @@ #define DEFAULT_LTTNG_RELAYD_TCP_KEEP_ALIVE_PROBE_INTERVAL_ENV "LTTNG_RELAYD_TCP_KEEP_ALIVE_PROBE_INTERVAL" #define DEFAULT_LTTNG_RELAYD_TCP_KEEP_ALIVE_ABORT_THRESHOLD_ENV "LTTNG_RELAYD_TCP_KEEP_ALIVE_ABORT_THRESHOLD" +/* + * Name of the intermediate directory used to rename the trace chunk of a + * session's first rotation. + */ +#define DEFAULT_TEMPORARY_CHUNK_RENAME_DIRECTORY ".tmp_rename_chunk" +#define DEFAULT_ARCHIVED_TRACE_CHUNKS_DIRECTORY "archives" + /* * Default timer value in usec for the rotate pending polling check on the * relay when a rotation has completed on the consumer. diff --git a/src/common/error.c b/src/common/error.c index a7d6e1220..2ba62d7be 100644 --- a/src/common/error.c +++ b/src/common/error.c @@ -214,6 +214,9 @@ static const char *error_string_array[] = { [ ERROR_INDEX(LTTNG_ERR_CHAN_NOT_FOUND) ] = "Channel not found", [ ERROR_INDEX(LTTNG_ERR_SNAPSHOT_UNSUPPORTED) ] = "Session configuration does not allow the use of snapshots", [ ERROR_INDEX(LTTNG_ERR_SESSION_NOT_EXIST) ] = "Tracing session does not exist", + [ ERROR_INDEX(LTTNG_ERR_CREATE_TRACE_CHUNK_FAIL_CONSUMER) ] = "Trace chunk creation failed on consumer", + [ ERROR_INDEX(LTTNG_ERR_CLOSE_TRACE_CHUNK_FAIL_CONSUMER) ] = "Trace chunk close failed on consumer", + [ ERROR_INDEX(LTTNG_ERR_TRACE_CHUNK_EXISTS_FAIL_CONSUMER) ] = "Failed to query consumer for trace chunk existence", /* Last element */ [ ERROR_INDEX(LTTNG_ERR_NR) ] = "Unknown error code" diff --git a/src/common/index/index.c b/src/common/index/index.c index 0935d702d..694e3d186 100644 --- a/src/common/index/index.c +++ b/src/common/index/index.c @@ -23,6 +23,7 @@ #include #include +#include #include #include #include @@ -115,6 +116,93 @@ error: return NULL; } +struct lttng_index_file *lttng_index_file_create_from_trace_chunk( + struct lttng_trace_chunk *chunk, + const char *channel_path, char *stream_name, + uint64_t stream_file_size, uint64_t stream_count, + uint32_t index_major, uint32_t index_minor, + bool unlink_existing_file) +{ + struct lttng_index_file *index_file; + enum lttng_trace_chunk_status chunk_status; + int ret, fd = -1; + ssize_t size_ret; + struct ctf_packet_index_file_hdr hdr; + char index_directory_path[LTTNG_PATH_MAX]; + char index_file_path[LTTNG_PATH_MAX]; + const uint32_t element_len = ctf_packet_index_len(index_major, + index_minor); + const int flags = O_WRONLY | O_CREAT | O_TRUNC; + const mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP; + + index_file = zmalloc(sizeof(*index_file)); + if (!index_file) { + PERROR("Failed to allocate lttng_index_file"); + goto error; + } + + ret = snprintf(index_directory_path, sizeof(index_directory_path), + "%s/" DEFAULT_INDEX_DIR, channel_path); + if (ret < 0 || ret >= sizeof(index_directory_path)) { + ERR("Failed to format index directory path"); + goto error; + } + + ret = utils_stream_file_path(index_directory_path, stream_name, + stream_file_size, stream_count, + DEFAULT_INDEX_FILE_SUFFIX, + index_file_path, sizeof(index_file_path)); + if (ret) { + goto error; + } + + if (unlink_existing_file) { + /* + * For tracefile rotation. We need to unlink the old + * file if present to synchronize with the tail of the + * live viewer which could be working on this same file. + * By doing so, any reference to the old index file + * stays valid even if we re-create a new file with the + * same name afterwards. + */ + chunk_status = lttng_trace_chunk_unlink_file(chunk, + index_file_path); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + goto error; + } + } + + chunk_status = lttng_trace_chunk_open_file(chunk, index_file_path, + flags, mode, &fd); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + goto error; + } + + ctf_packet_index_file_hdr_init(&hdr, index_major, index_minor); + size_ret = lttng_write(fd, &hdr, sizeof(hdr)); + if (size_ret < sizeof(hdr)) { + PERROR("Failed to write index header"); + goto error; + } + index_file->fd = fd; + index_file->major = index_major; + index_file->minor = index_minor; + index_file->element_len = element_len; + urcu_ref_init(&index_file->ref); + + return index_file; + +error: + if (fd >= 0) { + ret = close(fd); + if (ret < 0) { + PERROR("Failed to close file descriptor of index file"); + } + } + free(index_file); + return NULL; +} + /* * Write index values to the given index file. * diff --git a/src/common/index/index.h b/src/common/index/index.h index 83e61f648..469a870ab 100644 --- a/src/common/index/index.h +++ b/src/common/index/index.h @@ -23,6 +23,7 @@ #include #include +#include #include "ctf-index.h" struct lttng_index_file { @@ -40,6 +41,12 @@ struct lttng_index_file { struct lttng_index_file *lttng_index_file_create(const char *path_name, char *stream_name, int uid, int gid, uint64_t size, uint64_t count, uint32_t major, uint32_t minor); +struct lttng_index_file *lttng_index_file_create_from_trace_chunk( + struct lttng_trace_chunk *chunk, + const char *channel_path, char *stream_name, + uint64_t stream_file_size, uint64_t stream_count, + uint32_t index_major, uint32_t index_minor, + bool unlink_existing_file); struct lttng_index_file *lttng_index_file_open(const char *path_name, const char *channel_name, uint64_t tracefile_count, uint64_t tracefile_count_current); diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 74c6de399..ae909517f 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -43,6 +43,7 @@ #include #include #include +#include #include "kernel-consumer.h" @@ -125,7 +126,7 @@ int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream, /* * Take a snapshot of all the stream of a channel * RCU read-side lock must be held across this function to ensure existence of - * channel. + * channel. The channel lock must be held by the caller. * * Returns 0 on success, < 0 on error */ @@ -160,6 +161,19 @@ static int lttng_kconsumer_snapshot_channel( */ pthread_mutex_lock(&stream->lock); + assert(channel->trace_chunk); + if (!lttng_trace_chunk_get(channel->trace_chunk)) { + /* + * Can't happen barring an internal error as the channel + * holds a reference to the trace chunk. + */ + ERR("Failed to acquire reference to channel's trace chunk"); + ret = -1; + goto end_unlock; + } + assert(!stream->trace_chunk); + stream->trace_chunk = channel->trace_chunk; + /* * Assign the received relayd ID so we can use it for streaming. The streams * are not visible to anyone so this is OK to change it. @@ -173,20 +187,13 @@ static int lttng_kconsumer_snapshot_channel( goto end_unlock; } } else { - ret = utils_create_stream_file(path, stream->name, - stream->chan->tracefile_size, - stream->tracefile_count_current, - stream->uid, stream->gid, NULL); + ret = consumer_stream_create_output_files(stream, + false); if (ret < 0) { - ERR("utils_create_stream_file"); goto end_unlock; } - - stream->out_fd = ret; - stream->tracefile_size_current = 0; - - DBG("Kernel consumer snapshot stream %s/%s (%" PRIu64 ")", - path, stream->name, stream->key); + DBG("Kernel consumer snapshot stream (%" PRIu64 ")", + stream->key); } ret = kernctl_buffer_flush_empty(stream->wait_fd); @@ -309,6 +316,8 @@ static int lttng_kconsumer_snapshot_channel( close_relayd_stream(stream); stream->net_seq_idx = (uint64_t) -1ULL; } + lttng_trace_chunk_put(stream->trace_chunk); + stream->trace_chunk = NULL; pthread_mutex_unlock(&stream->lock); } @@ -331,11 +340,12 @@ end: /* * Read the whole metadata available for a snapshot. * RCU read-side lock must be held across this function to ensure existence of - * metadata_channel. + * metadata_channel. The channel lock must be held by the caller. * * Returns 0 on success, < 0 on error */ -static int lttng_kconsumer_snapshot_metadata(struct lttng_consumer_channel *metadata_channel, +static int lttng_kconsumer_snapshot_metadata( + struct lttng_consumer_channel *metadata_channel, uint64_t key, char *path, uint64_t relayd_id, struct lttng_consumer_local_data *ctx) { @@ -352,7 +362,10 @@ static int lttng_kconsumer_snapshot_metadata(struct lttng_consumer_channel *meta metadata_stream = metadata_channel->metadata_stream; assert(metadata_stream); + pthread_mutex_lock(&metadata_stream->lock); + assert(metadata_channel->trace_chunk); + assert(metadata_stream->trace_chunk); /* Flag once that we have a valid relayd for the stream. */ if (relayd_id != (uint64_t) -1ULL) { @@ -365,20 +378,17 @@ static int lttng_kconsumer_snapshot_metadata(struct lttng_consumer_channel *meta goto error_snapshot; } } else { - ret = utils_create_stream_file(path, metadata_stream->name, - metadata_stream->chan->tracefile_size, - metadata_stream->tracefile_count_current, - metadata_stream->uid, metadata_stream->gid, NULL); + ret = consumer_stream_create_output_files(metadata_stream, + false); if (ret < 0) { goto error_snapshot; } - metadata_stream->out_fd = ret; } do { health_code_update(); - ret_read = lttng_kconsumer_read_subbuffer(metadata_stream, ctx, NULL); + ret_read = lttng_kconsumer_read_subbuffer(metadata_stream, ctx); if (ret_read < 0) { if (ret_read != -EAGAIN) { ERR("Kernel snapshot reading metadata subbuffer (ret: %zd)", @@ -405,6 +415,8 @@ static int lttng_kconsumer_snapshot_metadata(struct lttng_consumer_channel *meta */ } metadata_stream->out_fd = -1; + lttng_trace_chunk_put(metadata_stream->trace_chunk); + metadata_stream->trace_chunk = NULL; } } @@ -465,6 +477,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, { struct lttng_consumer_channel *new_channel; int ret_recv; + const uint64_t chunk_id = msg.u.channel.chunk_id.value; health_code_update(); @@ -479,8 +492,11 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, DBG("consumer_add_channel %" PRIu64, msg.u.channel.channel_key); new_channel = consumer_allocate_channel(msg.u.channel.channel_key, - msg.u.channel.session_id, msg.u.channel.pathname, - msg.u.channel.name, msg.u.channel.uid, msg.u.channel.gid, + msg.u.channel.session_id, + msg.u.channel.chunk_id.is_set ? + &chunk_id : NULL, + msg.u.channel.pathname, + msg.u.channel.name, msg.u.channel.relayd_id, msg.u.channel.output, msg.u.channel.tracefile_size, msg.u.channel.tracefile_count, 0, @@ -627,19 +643,17 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, health_code_update(); + pthread_mutex_lock(&channel->lock); new_stream = consumer_allocate_stream(channel->key, fd, - LTTNG_CONSUMER_ACTIVE_STREAM, channel->name, - channel->uid, - channel->gid, channel->relayd_id, channel->session_id, + channel->trace_chunk, msg.u.stream.cpu, &alloc_ret, channel->type, - channel->monitor, - msg.u.stream.trace_archive_id); + channel->monitor); if (new_stream == NULL) { switch (alloc_ret) { case -ENOMEM: @@ -648,6 +662,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); break; } + pthread_mutex_unlock(&channel->lock); goto end_nosignal; } @@ -660,6 +675,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, new_stream->output = LTTNG_EVENT_SPLICE; ret = utils_create_pipe(new_stream->splice_pipe); if (ret < 0) { + pthread_mutex_unlock(&channel->lock); goto end_nosignal; } break; @@ -668,6 +684,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, break; default: ERR("Stream output unknown %d", channel->output); + pthread_mutex_unlock(&channel->lock); goto end_nosignal; } @@ -693,14 +710,16 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, health_code_update(); + pthread_mutex_lock(&new_stream->lock); if (ctx->on_recv_stream) { ret = ctx->on_recv_stream(new_stream); if (ret < 0) { + pthread_mutex_unlock(&new_stream->lock); + pthread_mutex_unlock(&channel->lock); consumer_stream_free(new_stream); goto end_nosignal; } } - health_code_update(); if (new_stream->metadata_flag) { @@ -713,6 +732,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, "relayd id %" PRIu64, new_stream->name, new_stream->net_seq_idx); cds_list_add(&new_stream->send_node, &channel->streams.head); + pthread_mutex_unlock(&new_stream->lock); + pthread_mutex_unlock(&channel->lock); break; } @@ -721,6 +742,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret = consumer_send_relayd_stream(new_stream, new_stream->chan->pathname); if (ret < 0) { + pthread_mutex_unlock(&new_stream->lock); + pthread_mutex_unlock(&channel->lock); consumer_stream_free(new_stream); goto end_nosignal; } @@ -734,10 +757,14 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret = consumer_send_relayd_streams_sent( new_stream->net_seq_idx); if (ret < 0) { + pthread_mutex_unlock(&new_stream->lock); + pthread_mutex_unlock(&channel->lock); goto end_nosignal; } } } + pthread_mutex_unlock(&new_stream->lock); + pthread_mutex_unlock(&channel->lock); /* Get the right pipe where the stream will be sent. */ if (new_stream->metadata_flag) { @@ -899,6 +926,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ERR("Channel %" PRIu64 " not found", key); ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND; } else { + pthread_mutex_lock(&channel->lock); if (msg.u.snapshot_channel.metadata == 1) { ret = lttng_kconsumer_snapshot_metadata(channel, key, msg.u.snapshot_channel.pathname, @@ -918,6 +946,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED; } } + pthread_mutex_unlock(&channel->lock); } health_code_update(); @@ -1097,10 +1126,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, * Sample the rotate position of all the streams in this channel. */ ret = lttng_consumer_rotate_channel(channel, key, - msg.u.rotate_channel.pathname, msg.u.rotate_channel.relayd_id, msg.u.rotate_channel.metadata, - msg.u.rotate_channel.new_chunk_id, ctx); if (ret < 0) { ERR("Rotate channel failed"); @@ -1125,24 +1152,11 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, break; } - case LTTNG_CONSUMER_ROTATE_RENAME: + case LTTNG_CONSUMER_INIT: { - DBG("Consumer rename session %" PRIu64 " after rotation, old path = \"%s\", new path = \"%s\"", - msg.u.rotate_rename.session_id, - msg.u.rotate_rename.old_path, - msg.u.rotate_rename.new_path); - ret = lttng_consumer_rotate_rename(msg.u.rotate_rename.old_path, - msg.u.rotate_rename.new_path, - msg.u.rotate_rename.uid, - msg.u.rotate_rename.gid, - msg.u.rotate_rename.relayd_id); - if (ret < 0) { - ERR("Rotate rename failed"); - ret_code = LTTCOMM_CONSUMERD_ROTATE_RENAME_FAILED; - } - + ret_code = lttng_consumer_init_command(ctx, + msg.u.init.sessiond_uuid); health_code_update(); - ret = consumer_send_status_msg(sock, ret_code); if (ret < 0) { /* Somehow, the session daemon is not responding anymore. */ @@ -1150,128 +1164,100 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } break; } - case LTTNG_CONSUMER_CHECK_ROTATION_PENDING_LOCAL: + case LTTNG_CONSUMER_CREATE_TRACE_CHUNK: { - int pending; - uint32_t pending_reply; - - DBG("Perform local check of pending rotation for session id %" PRIu64, - msg.u.check_rotation_pending_local.session_id); - pending = lttng_consumer_check_rotation_pending_local( - msg.u.check_rotation_pending_local.session_id, - msg.u.check_rotation_pending_local.chunk_id); - if (pending < 0) { - ERR("Local rotation pending check failed with code %i", pending); - ret_code = LTTCOMM_CONSUMERD_ROTATION_PENDING_LOCAL_FAILED; - } else { - pending_reply = !!pending; - } - - health_code_update(); - - ret = consumer_send_status_msg(sock, ret_code); - if (ret < 0) { - /* Somehow, the session daemon is not responding anymore. */ - goto end_nosignal; - } + const struct lttng_credentials credentials = { + .uid = msg.u.create_trace_chunk.credentials.uid, + .gid = msg.u.create_trace_chunk.credentials.gid, + }; + const bool is_local_trace = + !msg.u.create_trace_chunk.relayd_id.is_set; + const uint64_t relayd_id = + msg.u.create_trace_chunk.relayd_id.value; + const char *chunk_override_name = + *msg.u.create_trace_chunk.override_name ? + msg.u.create_trace_chunk.override_name : + NULL; + LTTNG_OPTIONAL(struct lttng_directory_handle) chunk_directory_handle = + LTTNG_OPTIONAL_INIT; - if (pending < 0) { - /* - * An error occurred while running the command; - * don't send the 'pending' flag as the sessiond - * will not read it. - */ - break; - } + /* + * The session daemon will only provide a chunk directory file + * descriptor for local traces. + */ + if (is_local_trace) { + int chunk_dirfd; - /* Send back returned value to session daemon */ - ret = lttcomm_send_unix_sock(sock, &pending_reply, - sizeof(pending_reply)); - if (ret < 0) { - PERROR("Failed to send rotation pending return code"); - goto error_fatal; - } - break; - } - case LTTNG_CONSUMER_CHECK_ROTATION_PENDING_RELAY: - { - int pending; - uint32_t pending_reply; - - DBG("Perform relayd check of pending rotation for session id %" PRIu64, - msg.u.check_rotation_pending_relay.session_id); - pending = lttng_consumer_check_rotation_pending_relay( - msg.u.check_rotation_pending_relay.session_id, - msg.u.check_rotation_pending_relay.relayd_id, - msg.u.check_rotation_pending_relay.chunk_id); - if (pending < 0) { - ERR("Relayd rotation pending check failed with code %i", pending); - ret_code = LTTCOMM_CONSUMERD_ROTATION_PENDING_RELAY_FAILED; - } else { - pending_reply = !!pending; - } + /* Acnowledge the reception of the command. */ + ret = consumer_send_status_msg(sock, + LTTCOMM_CONSUMERD_SUCCESS); + if (ret < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto end_nosignal; + } - health_code_update(); + ret = lttcomm_recv_fds_unix_sock(sock, &chunk_dirfd, 1); + if (ret != sizeof(chunk_dirfd)) { + ERR("Failed to receive trace chunk directory file descriptor"); + goto error_fatal; + } - ret = consumer_send_status_msg(sock, ret_code); - if (ret < 0) { - /* Somehow, the session daemon is not responding anymore. */ - goto end_nosignal; + DBG("Received trace chunk directory fd (%d)", + chunk_dirfd); + ret = lttng_directory_handle_init_from_dirfd( + &chunk_directory_handle.value, + chunk_dirfd); + if (ret) { + ERR("Failed to initialize chunk directory handle from directory file descriptor"); + if (close(chunk_dirfd)) { + PERROR("Failed to close chunk directory file descriptor"); + } + goto error_fatal; + } + chunk_directory_handle.is_set = true; } - if (pending < 0) { - /* - * An error occurred while running the command; - * don't send the 'pending' flag as the sessiond - * will not read it. - */ - break; - } + ret_code = lttng_consumer_create_trace_chunk( + !is_local_trace ? &relayd_id : NULL, + msg.u.create_trace_chunk.session_id, + msg.u.create_trace_chunk.chunk_id, + (time_t) msg.u.create_trace_chunk.creation_timestamp, + chunk_override_name, + &credentials, + chunk_directory_handle.is_set ? + &chunk_directory_handle.value : + NULL); - /* Send back returned value to session daemon */ - ret = lttcomm_send_unix_sock(sock, &pending_reply, - sizeof(pending_reply)); - if (ret < 0) { - PERROR("Failed to send rotation pending return code"); - goto error_fatal; + if (chunk_directory_handle.is_set) { + lttng_directory_handle_fini( + &chunk_directory_handle.value); } - break; + goto end_msg_sessiond; } - case LTTNG_CONSUMER_MKDIR: + case LTTNG_CONSUMER_CLOSE_TRACE_CHUNK: { - DBG("Consumer mkdir %s in session %" PRIu64, - msg.u.mkdir.path, - msg.u.mkdir.session_id); - ret = lttng_consumer_mkdir(msg.u.mkdir.path, - msg.u.mkdir.uid, - msg.u.mkdir.gid, - msg.u.mkdir.relayd_id); - if (ret < 0) { - ERR("consumer mkdir failed"); - ret_code = LTTCOMM_CONSUMERD_MKDIR_FAILED; - } - - health_code_update(); - - ret = consumer_send_status_msg(sock, ret_code); - if (ret < 0) { - /* Somehow, the session daemon is not responding anymore. */ - goto end_nosignal; - } - break; + const uint64_t relayd_id = + msg.u.close_trace_chunk.relayd_id.value; + + ret_code = lttng_consumer_close_trace_chunk( + msg.u.close_trace_chunk.relayd_id.is_set ? + &relayd_id : NULL, + msg.u.close_trace_chunk.session_id, + msg.u.close_trace_chunk.chunk_id, + (time_t) msg.u.close_trace_chunk.close_timestamp); + goto end_msg_sessiond; } - case LTTNG_CONSUMER_INIT: + case LTTNG_CONSUMER_TRACE_CHUNK_EXISTS: { - ret_code = lttng_consumer_init_command(ctx, - msg.u.init.sessiond_uuid); - - health_code_update(); - ret = consumer_send_status_msg(sock, ret_code); - if (ret < 0) { - /* Somehow, the session daemon is not responding anymore. */ - goto end_nosignal; - } - break; + const uint64_t relayd_id = + msg.u.trace_chunk_exists.relayd_id.value; + + ret_code = lttng_consumer_trace_chunk_exists( + msg.u.trace_chunk_exists.relayd_id.is_set ? + &relayd_id : NULL, + msg.u.trace_chunk_exists.session_id, + msg.u.trace_chunk_exists.chunk_id); + goto end_msg_sessiond; } default: goto end_nosignal; @@ -1287,6 +1273,22 @@ end_nosignal: health_code_update(); return 1; +end_msg_sessiond: + /* + * The returned value here is not useful since either way we'll return 1 to + * the caller because the session daemon socket management is done + * elsewhere. Returning a negative code or 0 will shutdown the consumer. + */ + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + goto error_fatal; + } + rcu_read_unlock(); + + health_code_update(); + + return 1; + error_fatal: rcu_read_unlock(); /* This will issue a consumer stop. */ @@ -1510,9 +1512,10 @@ end: /* * Consume data on a file descriptor and write it on a trace file. + * The stream and channel locks must be held by the caller. */ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, - struct lttng_consumer_local_data *ctx, bool *rotated) + struct lttng_consumer_local_data *ctx) { unsigned long len, subbuf_size, padding; int err, write_index = 1, rotation_ret; @@ -1528,7 +1531,7 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, */ if (stream->rotate_ready) { DBG("Rotate stream before extracting data"); - rotation_ret = lttng_consumer_rotate_stream(ctx, stream, rotated); + rotation_ret = lttng_consumer_rotate_stream(ctx, stream); if (rotation_ret < 0) { ERR("Stream rotation error"); ret = -1; @@ -1752,7 +1755,7 @@ rotate: */ rotation_ret = lttng_consumer_stream_is_rotate_ready(stream); if (rotation_ret == 1) { - rotation_ret = lttng_consumer_rotate_stream(ctx, stream, rotated); + rotation_ret = lttng_consumer_rotate_stream(ctx, stream); if (rotation_ret < 0) { ERR("Stream rotation error"); ret = -1; @@ -1775,33 +1778,15 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream) assert(stream); /* - * Don't create anything if this is set for streaming or should not be - * monitored. + * Don't create anything if this is set for streaming or if there is + * no current trace chunk on the parent channel. */ - if (stream->net_seq_idx == (uint64_t) -1ULL && stream->chan->monitor) { - ret = utils_create_stream_file(stream->chan->pathname, stream->name, - stream->chan->tracefile_size, stream->tracefile_count_current, - stream->uid, stream->gid, NULL); - if (ret < 0) { + if (stream->net_seq_idx == (uint64_t) -1ULL && stream->chan->monitor && + stream->chan->trace_chunk) { + ret = consumer_stream_create_output_files(stream, true); + if (ret) { goto error; } - stream->out_fd = ret; - stream->tracefile_size_current = 0; - - if (!stream->metadata_flag) { - struct lttng_index_file *index_file; - - index_file = lttng_index_file_create(stream->chan->pathname, - stream->name, stream->uid, stream->gid, - stream->chan->tracefile_size, - stream->tracefile_count_current, - CTF_INDEX_MAJOR, CTF_INDEX_MINOR); - if (!index_file) { - goto error; - } - assert(!stream->index_file); - stream->index_file = index_file; - } } if (stream->output == LTTNG_EVENT_MMAP) { diff --git a/src/common/kernel-consumer/kernel-consumer.h b/src/common/kernel-consumer/kernel-consumer.h index 2aee20327..3bae225b0 100644 --- a/src/common/kernel-consumer/kernel-consumer.h +++ b/src/common/kernel-consumer/kernel-consumer.h @@ -33,7 +33,7 @@ int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream, int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, int sock, struct pollfd *consumer_sockpoll); ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, - struct lttng_consumer_local_data *ctx, bool *rotated); + struct lttng_consumer_local_data *ctx); int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream); int lttng_kconsumer_data_pending(struct lttng_consumer_stream *stream); int lttng_kconsumer_sync_metadata(struct lttng_consumer_stream *metadata); diff --git a/src/common/relayd/relayd.c b/src/common/relayd/relayd.c index 08bbad23b..fb459699d 100644 --- a/src/common/relayd/relayd.c +++ b/src/common/relayd/relayd.c @@ -29,6 +29,7 @@ #include #include #include +#include #include "relayd.h" @@ -416,7 +417,7 @@ error: int relayd_add_stream(struct lttcomm_relayd_sock *rsock, const char *channel_name, const char *pathname, uint64_t *stream_id, uint64_t tracefile_size, uint64_t tracefile_count, - uint64_t trace_archive_id) + struct lttng_trace_chunk *trace_chunk) { int ret; struct lttcomm_relayd_status_stream reply; @@ -431,17 +432,27 @@ int relayd_add_stream(struct lttcomm_relayd_sock *rsock, const char *channel_nam /* Compat with relayd 2.1 */ if (rsock->minor == 1) { /* For 2.1 */ + assert(!trace_chunk); ret = relayd_add_stream_2_1(rsock, channel_name, pathname); } else if (rsock->minor > 1 && rsock->minor < 11) { /* From 2.2 to 2.10 */ + assert(!trace_chunk); ret = relayd_add_stream_2_2(rsock, channel_name, pathname, tracefile_size, tracefile_count); } else { + enum lttng_trace_chunk_status chunk_status; + uint64_t chunk_id; + + assert(trace_chunk); + chunk_status = lttng_trace_chunk_get_id(trace_chunk, + &chunk_id); + assert(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK); + /* From 2.11 to ...*/ ret = relayd_add_stream_2_11(rsock, channel_name, pathname, tracefile_size, tracefile_count, - trace_archive_id); + chunk_id); } if (ret) { @@ -1099,14 +1110,15 @@ error: } int relayd_rotate_stream(struct lttcomm_relayd_sock *rsock, uint64_t stream_id, - const char *new_pathname, uint64_t new_chunk_id, - uint64_t seq_num) + uint64_t new_chunk_id, uint64_t seq_num) { int ret; struct lttcomm_relayd_rotate_stream *msg = NULL; struct lttcomm_relayd_generic_reply reply; size_t len; int msg_len; + /* FIXME */ + char *new_pathname = NULL; /* Code flow error. Safety net. */ assert(rsock); diff --git a/src/common/relayd/relayd.h b/src/common/relayd/relayd.h index af93188ee..d4b86f4b2 100644 --- a/src/common/relayd/relayd.h +++ b/src/common/relayd/relayd.h @@ -22,6 +22,7 @@ #include #include +#include int relayd_connect(struct lttcomm_relayd_sock *sock); int relayd_close(struct lttcomm_relayd_sock *sock); @@ -33,7 +34,7 @@ int relayd_create_session(struct lttcomm_relayd_sock *sock, int relayd_add_stream(struct lttcomm_relayd_sock *sock, const char *channel_name, const char *pathname, uint64_t *stream_id, uint64_t tracefile_size, uint64_t tracefile_count, - uint64_t trace_archive_id); + struct lttng_trace_chunk *trace_chunk); int relayd_streams_sent(struct lttcomm_relayd_sock *rsock); int relayd_send_close_stream(struct lttcomm_relayd_sock *sock, uint64_t stream_id, uint64_t last_net_seq_num); @@ -55,7 +56,7 @@ int relayd_send_index(struct lttcomm_relayd_sock *rsock, int relayd_reset_metadata(struct lttcomm_relayd_sock *rsock, uint64_t stream_id, uint64_t version); int relayd_rotate_stream(struct lttcomm_relayd_sock *sock, uint64_t stream_id, - const char *new_pathname, uint64_t new_chunk_id, uint64_t seq_num); + uint64_t new_chunk_id, uint64_t seq_num); int relayd_rotate_rename(struct lttcomm_relayd_sock *sock, const char *current_path, const char *new_path); int relayd_rotate_pending(struct lttcomm_relayd_sock *sock, diff --git a/src/common/sessiond-comm/sessiond-comm.h b/src/common/sessiond-comm/sessiond-comm.h index 836c3af66..d673c6e19 100644 --- a/src/common/sessiond-comm/sessiond-comm.h +++ b/src/common/sessiond-comm/sessiond-comm.h @@ -37,6 +37,7 @@ #include #include #include +#include #include #include @@ -145,7 +146,13 @@ enum lttcomm_relayd_command { */ enum lttcomm_return_code { LTTCOMM_CONSUMERD_SUCCESS = 0, /* Everything went fine. */ - LTTCOMM_CONSUMERD_COMMAND_SOCK_READY = 1, /* Command socket ready */ + /* + * Some code paths use -1 to express an error, others + * negate this consumer return code. Starting codes at + * 100 ensures there is no mix-up between this error value + * and legitimate status codes. + */ + LTTCOMM_CONSUMERD_COMMAND_SOCK_READY = 100, /* Command socket ready */ LTTCOMM_CONSUMERD_SUCCESS_RECV_FD, /* Success on receiving fds */ LTTCOMM_CONSUMERD_ERROR_RECV_FD, /* Error on receiving fds */ LTTCOMM_CONSUMERD_ERROR_RECV_CMD, /* Error on receiving command */ @@ -167,11 +174,13 @@ enum lttcomm_return_code { LTTCOMM_CONSUMERD_CHAN_NOT_FOUND, /* Channel not found. */ LTTCOMM_CONSUMERD_ALREADY_SET, /* Resource already set. */ LTTCOMM_CONSUMERD_ROTATION_FAIL, /* Rotation has failed. */ - LTTCOMM_CONSUMERD_ROTATE_RENAME_FAILED, /* Rotation rename has failed. */ - LTTCOMM_CONSUMERD_ROTATION_PENDING_LOCAL_FAILED, /* Rotation pending relay failed. */ - LTTCOMM_CONSUMERD_ROTATION_PENDING_RELAY_FAILED, /* Rotation pending relay failed. */ - LTTCOMM_CONSUMERD_MKDIR_FAILED, /* mkdir has failed. */ LTTCOMM_CONSUMERD_SNAPSHOT_FAILED, /* snapshot has failed. */ + LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED,/* Trace chunk creation failed. */ + LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED, /* Trace chunk creation failed. */ + LTTCOMM_CONSUMERD_INVALID_PARAMETERS, /* Invalid parameters. */ + LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_LOCAL, /* Trace chunk exists on consumer daemon. */ + LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_REMOTE,/* Trace chunk exists on relay daemon. */ + LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK, /* Unknown trace chunk. */ /* MUST be last element */ LTTCOMM_NR, /* Last element */ @@ -462,9 +471,9 @@ struct lttcomm_consumer_msg { struct { uint64_t channel_key; uint64_t session_id; + /* ID of the session's current trace chunk. */ + LTTNG_OPTIONAL(uint64_t) LTTNG_PACKED chunk_id; char pathname[PATH_MAX]; - uint32_t uid; - uint32_t gid; uint64_t relayd_id; /* nb_init_streams is the number of streams open initially. */ uint32_t nb_init_streams; @@ -487,13 +496,6 @@ struct lttcomm_consumer_msg { int32_t cpu; /* On which CPU this stream is assigned. */ /* Tells the consumer if the stream should be or not monitored. */ uint32_t no_monitor; - /* - * The archive id that was "current" at the time this - * stream was created. This is used to determine - * whether a rotation request was sent before or after - * the creation of a stream. - */ - uint64_t trace_archive_id; } LTTNG_PACKED stream; /* Only used by Kernel. */ struct { uint64_t net_index; @@ -524,10 +526,15 @@ struct lttcomm_consumer_msg { uint64_t session_id; /* Tracing session id */ char pathname[PATH_MAX]; /* Channel file path. */ char name[LTTNG_SYMBOL_NAME_LEN]; /* Channel name. */ - uint32_t uid; /* User ID of the session */ - uint32_t gid; /* Group ID ot the session */ + /* Credentials used to open the UST buffer shared mappings. */ + struct { + uint32_t uid; + uint32_t gid; + } LTTNG_PACKED buffer_credentials; uint64_t relayd_id; /* Relayd id if apply. */ uint64_t key; /* Unique channel key. */ + /* ID of the session's current trace chunk. */ + LTTNG_OPTIONAL(uint64_t) LTTNG_PACKED chunk_id; unsigned char uuid[UUID_LEN]; /* uuid for ust tracer. */ uint32_t chan_id; /* Channel ID on the tracer side. */ uint64_t tracefile_size; /* bytes */ @@ -543,13 +550,6 @@ struct lttcomm_consumer_msg { */ uint32_t ust_app_uid; int64_t blocking_timeout; - /* - * The archive id that was "current" at the time this - * channel was created. This is used to determine - * whether a rotation request was sent before or after - * the creation of a channel. - */ - uint64_t trace_archive_id; char root_shm_path[PATH_MAX]; char shm_path[PATH_MAX]; } LTTNG_PACKED ask_channel; @@ -585,12 +585,6 @@ struct lttcomm_consumer_msg { uint64_t relayd_id; /* Relayd id if apply. */ uint64_t key; uint64_t nb_packets_per_stream; - /* - * The session's current trace archive id is propagated - * since a snapshot triggers the creation of an - * ephemeral metadata stream. - */ - uint64_t trace_archive_id; } LTTNG_PACKED snapshot_channel; struct { uint64_t channel_key; @@ -608,20 +602,10 @@ struct lttcomm_consumer_msg { uint64_t session_id; } LTTNG_PACKED regenerate_metadata; struct { - char pathname[PATH_MAX]; uint32_t metadata; /* This is a metadata channel. */ uint64_t relayd_id; /* Relayd id if apply. */ uint64_t key; - uint64_t new_chunk_id; } LTTNG_PACKED rotate_channel; - struct { - char old_path[LTTNG_PATH_MAX]; - char new_path[LTTNG_PATH_MAX]; - uint64_t relayd_id; /* Relayd id if apply. */ - uint64_t session_id; - uint32_t uid; - uint32_t gid; - } LTTNG_PACKED rotate_rename; struct { uint64_t session_id; uint64_t chunk_id; @@ -632,12 +616,40 @@ struct lttcomm_consumer_msg { uint64_t chunk_id; } LTTNG_PACKED check_rotation_pending_relay; struct { - char path[LTTNG_PATH_MAX]; - uint64_t relayd_id; /* Relayd id if apply. */ + /* + * Relayd id, if applicable (remote). + * + * A directory file descriptor referring to the chunk's + * output folder is transmitted if the chunk is local + * (relayd_id unset). + * + * `override_name` is left NULL (all-zeroes) if the + * chunk's name is not overriden. + */ + LTTNG_OPTIONAL(uint64_t) LTTNG_PACKED relayd_id; + char override_name[LTTNG_NAME_MAX]; + uint64_t session_id; + uint64_t chunk_id; + uint64_t creation_timestamp; + struct { + uint32_t uid; + uint32_t gid; + } LTTNG_PACKED credentials; + } LTTNG_PACKED create_trace_chunk; + struct { + LTTNG_OPTIONAL(uint64_t) LTTNG_PACKED relayd_id; + uint64_t session_id; + uint64_t chunk_id; + uint64_t close_timestamp; + } LTTNG_PACKED close_trace_chunk; + struct { + LTTNG_OPTIONAL(uint64_t) LTTNG_PACKED relayd_id; uint64_t session_id; - uint32_t uid; - uint32_t gid; - } LTTNG_PACKED mkdir; + uint64_t chunk_id; + } LTTNG_PACKED trace_chunk_exists; + struct { + lttng_uuid sessiond_uuid; + } LTTNG_PACKED init; } u; } LTTNG_PACKED; diff --git a/src/common/trace-chunk.c b/src/common/trace-chunk.c index a15f0ddaa..53cc08a2c 100644 --- a/src/common/trace-chunk.c +++ b/src/common/trace-chunk.c @@ -745,6 +745,8 @@ enum lttng_trace_chunk_status lttng_trace_chunk_open_file( chunk->credentials.value.use_current_user ? NULL : &chunk->credentials.value.user); if (ret < 0) { + ERR("Failed to open file relative to trace chunk file_path = \"%s\", flags = %d, mode = %d", + file_path, flags, (int) mode); status = LTTNG_TRACE_CHUNK_STATUS_ERROR; goto end; } diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 4d1737726..83b2143d7 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -73,6 +73,7 @@ static void destroy_channel(struct lttng_consumer_channel *channel) cds_list_del(&stream->send_node); ustctl_destroy_stream(stream->ustream); + lttng_trace_chunk_put(stream->trace_chunk); free(stream); } @@ -123,7 +124,7 @@ error: * Allocate and return a consumer channel object. */ static struct lttng_consumer_channel *allocate_channel(uint64_t session_id, - const char *pathname, const char *name, uid_t uid, gid_t gid, + const uint64_t *chunk_id, const char *pathname, const char *name, uint64_t relayd_id, uint64_t key, enum lttng_event_output output, uint64_t tracefile_size, uint64_t tracefile_count, uint64_t session_id_per_pid, unsigned int monitor, @@ -133,8 +134,8 @@ static struct lttng_consumer_channel *allocate_channel(uint64_t session_id, assert(pathname); assert(name); - return consumer_allocate_channel(key, session_id, pathname, name, uid, - gid, relayd_id, output, tracefile_size, + return consumer_allocate_channel(key, session_id, chunk_id, pathname, + name, relayd_id, output, tracefile_size, tracefile_count, session_id_per_pid, monitor, live_timer_interval, root_shm_path, shm_path); } @@ -147,8 +148,7 @@ static struct lttng_consumer_channel *allocate_channel(uint64_t session_id, */ static struct lttng_consumer_stream *allocate_stream(int cpu, int key, struct lttng_consumer_channel *channel, - struct lttng_consumer_local_data *ctx, int *_alloc_ret, - uint64_t trace_archive_id) + struct lttng_consumer_local_data *ctx, int *_alloc_ret) { int alloc_ret; struct lttng_consumer_stream *stream = NULL; @@ -158,17 +158,14 @@ static struct lttng_consumer_stream *allocate_stream(int cpu, int key, stream = consumer_allocate_stream(channel->key, key, - LTTNG_CONSUMER_ACTIVE_STREAM, channel->name, - channel->uid, - channel->gid, channel->relayd_id, channel->session_id, + channel->trace_chunk, cpu, &alloc_ret, channel->type, - channel->monitor, - trace_archive_id); + channel->monitor); if (stream == NULL) { switch (alloc_ret) { case -ENOENT: @@ -265,16 +262,17 @@ end: /* * Create streams for the given channel using liblttng-ust-ctl. + * The channel lock must be acquired by the caller. * * Return 0 on success else a negative value. */ static int create_ust_streams(struct lttng_consumer_channel *channel, - struct lttng_consumer_local_data *ctx, - uint64_t trace_archive_id) + struct lttng_consumer_local_data *ctx) { int ret, cpu = 0; struct ustctl_consumer_stream *ustream; struct lttng_consumer_stream *stream; + pthread_mutex_t *current_stream_lock = NULL; assert(channel); assert(ctx); @@ -301,8 +299,7 @@ static int create_ust_streams(struct lttng_consumer_channel *channel, } /* Allocate consumer stream object. */ - stream = allocate_stream(cpu, wait_fd, channel, ctx, &ret, - trace_archive_id); + stream = allocate_stream(cpu, wait_fd, channel, ctx, &ret); if (!stream) { goto error_alloc; } @@ -322,6 +319,8 @@ static int create_ust_streams(struct lttng_consumer_channel *channel, uatomic_inc(&stream->chan->refcount); } + pthread_mutex_lock(&stream->lock); + current_stream_lock = &stream->lock; /* * Order is important this is why a list is used. On error, the caller * should clean this list. @@ -360,12 +359,17 @@ static int create_ust_streams(struct lttng_consumer_channel *channel, sizeof(ust_metadata_pipe)); } } + pthread_mutex_unlock(&stream->lock); + current_stream_lock = NULL; } return 0; error: error_alloc: + if (current_stream_lock) { + pthread_mutex_unlock(current_stream_lock); + } return ret; } @@ -411,7 +415,8 @@ error_shm_open: return -1; } -static int open_ust_stream_fd(struct lttng_consumer_channel *channel, int cpu) +static int open_ust_stream_fd(struct lttng_consumer_channel *channel, int cpu, + const struct lttng_credentials *session_credentials) { char shm_path[PATH_MAX]; int ret; @@ -425,7 +430,7 @@ static int open_ust_stream_fd(struct lttng_consumer_channel *channel, int cpu) } return run_as_open(shm_path, O_RDWR | O_CREAT | O_EXCL, S_IRUSR | S_IWUSR, - channel->uid, channel->gid); + session_credentials->uid, session_credentials->gid); error_shm_path: return -1; @@ -448,6 +453,7 @@ static int create_ust_channel(struct lttng_consumer_channel *channel, assert(channel); assert(attr); assert(ust_chanp); + assert(channel->buffer_credentials.is_set); DBG3("Creating channel to ustctl with attr: [overwrite: %d, " "subbuf_size: %" PRIu64 ", num_subbuf: %" PRIu64 ", " @@ -466,7 +472,8 @@ static int create_ust_channel(struct lttng_consumer_channel *channel, goto error_alloc; } for (i = 0; i < nr_stream_fds; i++) { - stream_fds[i] = open_ust_stream_fd(channel, i); + stream_fds[i] = open_ust_stream_fd(channel, i, + &channel->buffer_credentials.value); if (stream_fds[i] < 0) { ret = -1; goto error_open; @@ -501,7 +508,8 @@ error_open: ERR("Cannot get stream shm path"); } closeret = run_as_unlink(shm_path, - channel->uid, channel->gid); + channel->buffer_credentials.value.uid, + channel->buffer_credentials.value.gid); if (closeret) { PERROR("unlink %s", shm_path); } @@ -510,7 +518,8 @@ error_open: /* Try to rmdir all directories under shm_path root. */ if (channel->root_shm_path[0]) { (void) run_as_rmdir_recursive(channel->root_shm_path, - channel->uid, channel->gid); + channel->buffer_credentials.value.uid, + channel->buffer_credentials.value.gid); } free(stream_fds); error_alloc: @@ -645,8 +654,7 @@ error: */ static int ask_channel(struct lttng_consumer_local_data *ctx, struct lttng_consumer_channel *channel, - struct ustctl_consumer_channel_attr *attr, - uint64_t trace_archive_id) + struct ustctl_consumer_channel_attr *attr) { int ret; @@ -687,7 +695,9 @@ static int ask_channel(struct lttng_consumer_local_data *ctx, } /* Open all streams for this channel. */ - ret = create_ust_streams(channel, ctx, trace_archive_id); + pthread_mutex_lock(&channel->lock); + ret = create_ust_streams(channel, ctx); + pthread_mutex_unlock(&channel->lock); if (ret < 0) { goto end; } @@ -819,7 +829,6 @@ error: /* * Close metadata stream wakeup_fd using the given key to retrieve the channel. - * RCU read side lock MUST be acquired before calling this function. * * Return 0 on success else an LTTng error code. */ @@ -987,15 +996,13 @@ end: /* * Snapshot the whole metadata. - * RCU read-side lock must be held across this function to ensure existence of - * metadata_channel. + * RCU read-side lock must be held by the caller. * * Returns 0 on success, < 0 on error */ static int snapshot_metadata(struct lttng_consumer_channel *metadata_channel, uint64_t key, char *path, uint64_t relayd_id, - struct lttng_consumer_local_data *ctx, - uint64_t trace_archive_id) + struct lttng_consumer_local_data *ctx) { int ret = 0; struct lttng_consumer_stream *metadata_stream; @@ -1027,7 +1034,7 @@ static int snapshot_metadata(struct lttng_consumer_channel *metadata_channel, * The metadata stream is NOT created in no monitor mode when the channel * is created on a sessiond ask channel command. */ - ret = create_ust_streams(metadata_channel, ctx, trace_archive_id); + ret = create_ust_streams(metadata_channel, ctx); if (ret < 0) { goto error; } @@ -1035,22 +1042,17 @@ static int snapshot_metadata(struct lttng_consumer_channel *metadata_channel, metadata_stream = metadata_channel->metadata_stream; assert(metadata_stream); + pthread_mutex_lock(&metadata_stream->lock); if (relayd_id != (uint64_t) -1ULL) { metadata_stream->net_seq_idx = relayd_id; ret = consumer_send_relayd_stream(metadata_stream, path); - if (ret < 0) { - goto error_stream; - } } else { - ret = utils_create_stream_file(path, metadata_stream->name, - metadata_stream->chan->tracefile_size, - metadata_stream->tracefile_count_current, - metadata_stream->uid, metadata_stream->gid, NULL); - if (ret < 0) { - goto error_stream; - } - metadata_stream->out_fd = ret; - metadata_stream->tracefile_size_current = 0; + ret = consumer_stream_create_output_files(metadata_stream, + false); + } + pthread_mutex_unlock(&metadata_stream->lock); + if (ret < 0) { + goto error_stream; } do { @@ -1067,6 +1069,7 @@ error_stream: * Clean up the stream completly because the next snapshot will use a new * metadata stream. */ + pthread_mutex_lock(&metadata_stream->lock); consumer_stream_destroy(metadata_stream, NULL); cds_list_del(&metadata_stream->send_node); metadata_channel->metadata_stream = NULL; @@ -1078,8 +1081,7 @@ error: /* * Take a snapshot of all the stream of a channel. - * RCU read-side lock must be held across this function to ensure existence of - * channel. + * RCU read-side lock and the channel lock must be held by the caller. * * Returns 0 on success, < 0 on error */ @@ -1110,6 +1112,19 @@ static int snapshot_channel(struct lttng_consumer_channel *channel, /* Lock stream because we are about to change its state. */ pthread_mutex_lock(&stream->lock); + assert(channel->trace_chunk); + if (!lttng_trace_chunk_get(channel->trace_chunk)) { + /* + * Can't happen barring an internal error as the channel + * holds a reference to the trace chunk. + */ + ERR("Failed to acquire reference to channel's trace chunk"); + ret = -1; + goto error_unlock; + } + assert(!stream->trace_chunk); + stream->trace_chunk = channel->trace_chunk; + stream->net_seq_idx = relayd_id; if (use_relayd) { @@ -1118,18 +1133,13 @@ static int snapshot_channel(struct lttng_consumer_channel *channel, goto error_unlock; } } else { - ret = utils_create_stream_file(path, stream->name, - stream->chan->tracefile_size, - stream->tracefile_count_current, - stream->uid, stream->gid, NULL); + ret = consumer_stream_create_output_files(stream, + false); if (ret < 0) { goto error_unlock; } - stream->out_fd = ret; - stream->tracefile_size_current = 0; - - DBG("UST consumer snapshot stream %s/%s (%" PRIu64 ")", path, - stream->name, stream->key); + DBG("UST consumer snapshot stream (%" PRIu64 ")", + stream->key); } /* @@ -1423,12 +1433,20 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, { int ret; struct ustctl_consumer_channel_attr attr; + const uint64_t chunk_id = msg.u.ask_channel.chunk_id.value; + const struct lttng_credentials buffer_credentials = { + .uid = msg.u.ask_channel.buffer_credentials.uid, + .gid = msg.u.ask_channel.buffer_credentials.gid, + }; /* Create a plain object and reserve a channel key. */ channel = allocate_channel(msg.u.ask_channel.session_id, - msg.u.ask_channel.pathname, msg.u.ask_channel.name, - msg.u.ask_channel.uid, msg.u.ask_channel.gid, - msg.u.ask_channel.relayd_id, msg.u.ask_channel.key, + msg.u.ask_channel.chunk_id.is_set ? + &chunk_id : NULL, + msg.u.ask_channel.pathname, + msg.u.ask_channel.name, + msg.u.ask_channel.relayd_id, + msg.u.ask_channel.key, (enum lttng_event_output) msg.u.ask_channel.output, msg.u.ask_channel.tracefile_size, msg.u.ask_channel.tracefile_count, @@ -1441,6 +1459,9 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, goto end_channel_error; } + LTTNG_OPTIONAL_SET(&channel->buffer_credentials, + buffer_credentials); + /* * Assign UST application UID to the channel. This value is ignored for * per PID buffers. This is specific to UST thus setting this after the @@ -1489,8 +1510,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, health_code_update(); - ret = ask_channel(ctx, channel, &attr, - msg.u.ask_channel.trace_archive_id); + ret = ask_channel(ctx, channel, &attr); if (ret < 0) { goto end_channel_error; } @@ -1753,8 +1773,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret = snapshot_metadata(channel, key, msg.u.snapshot_channel.pathname, msg.u.snapshot_channel.relayd_id, - ctx, - msg.u.snapshot_channel.trace_archive_id); + ctx); if (ret < 0) { ERR("Snapshot metadata failed"); ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED; @@ -1942,10 +1961,8 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, * this channel. */ ret = lttng_consumer_rotate_channel(channel, key, - msg.u.rotate_channel.pathname, msg.u.rotate_channel.relayd_id, msg.u.rotate_channel.metadata, - msg.u.rotate_channel.new_chunk_id, ctx); if (ret < 0) { ERR("Rotate channel failed"); @@ -1976,22 +1993,11 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } break; } - case LTTNG_CONSUMER_ROTATE_RENAME: + case LTTNG_CONSUMER_INIT: { - DBG("Consumer rename session %" PRIu64 " after rotation", - msg.u.rotate_rename.session_id); - ret = lttng_consumer_rotate_rename(msg.u.rotate_rename.old_path, - msg.u.rotate_rename.new_path, - msg.u.rotate_rename.uid, - msg.u.rotate_rename.gid, - msg.u.rotate_rename.relayd_id); - if (ret < 0) { - ERR("Rotate rename failed"); - ret_code = LTTCOMM_CONSUMERD_ROTATE_RENAME_FAILED; - } - + ret_code = lttng_consumer_init_command(ctx, + msg.u.init.sessiond_uuid); health_code_update(); - ret = consumer_send_status_msg(sock, ret_code); if (ret < 0) { /* Somehow, the session daemon is not responding anymore. */ @@ -1999,128 +2005,100 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } break; } - case LTTNG_CONSUMER_CHECK_ROTATION_PENDING_LOCAL: + case LTTNG_CONSUMER_CREATE_TRACE_CHUNK: { - int pending; - uint32_t pending_reply; - - DBG("Perform local check of pending rotation for session id %" PRIu64, - msg.u.check_rotation_pending_local.session_id); - pending = lttng_consumer_check_rotation_pending_local( - msg.u.check_rotation_pending_local.session_id, - msg.u.check_rotation_pending_local.chunk_id); - if (pending < 0) { - ERR("Local rotation pending check failed with code %i", pending); - ret_code = LTTCOMM_CONSUMERD_ROTATION_PENDING_LOCAL_FAILED; - } else { - pending_reply = !!pending; - } - - health_code_update(); - - ret = consumer_send_status_msg(sock, ret_code); - if (ret < 0) { - /* Somehow, the session daemon is not responding anymore. */ - goto end_nosignal; - } + const struct lttng_credentials credentials = { + .uid = msg.u.create_trace_chunk.credentials.uid, + .gid = msg.u.create_trace_chunk.credentials.gid, + }; + const bool is_local_trace = + !msg.u.create_trace_chunk.relayd_id.is_set; + const uint64_t relayd_id = + msg.u.create_trace_chunk.relayd_id.value; + const char *chunk_override_name = + *msg.u.create_trace_chunk.override_name ? + msg.u.create_trace_chunk.override_name : + NULL; + LTTNG_OPTIONAL(struct lttng_directory_handle) chunk_directory_handle = + LTTNG_OPTIONAL_INIT; - if (pending < 0) { - /* - * An error occurred while running the command; - * don't send the 'pending' flag as the sessiond - * will not read it. - */ - break; - } + /* + * The session daemon will only provide a chunk directory file + * descriptor for local traces. + */ + if (is_local_trace) { + int chunk_dirfd; - /* Send back returned value to session daemon */ - ret = lttcomm_send_unix_sock(sock, &pending_reply, - sizeof(pending_reply)); - if (ret < 0) { - PERROR("Failed to send rotation pending return code"); - goto error_fatal; - } - break; - } - case LTTNG_CONSUMER_CHECK_ROTATION_PENDING_RELAY: - { - int pending; - uint32_t pending_reply; - - DBG("Perform relayd check of pending rotation for session id %" PRIu64, - msg.u.check_rotation_pending_relay.session_id); - pending = lttng_consumer_check_rotation_pending_relay( - msg.u.check_rotation_pending_relay.session_id, - msg.u.check_rotation_pending_relay.relayd_id, - msg.u.check_rotation_pending_relay.chunk_id); - if (pending < 0) { - ERR("Relayd rotation pending check failed with code %i", pending); - ret_code = LTTCOMM_CONSUMERD_ROTATION_PENDING_RELAY_FAILED; - } else { - pending_reply = !!pending; - } + /* Acnowledge the reception of the command. */ + ret = consumer_send_status_msg(sock, + LTTCOMM_CONSUMERD_SUCCESS); + if (ret < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto end_nosignal; + } - health_code_update(); + ret = lttcomm_recv_fds_unix_sock(sock, &chunk_dirfd, 1); + if (ret != sizeof(chunk_dirfd)) { + ERR("Failed to receive trace chunk directory file descriptor"); + goto error_fatal; + } - ret = consumer_send_status_msg(sock, ret_code); - if (ret < 0) { - /* Somehow, the session daemon is not responding anymore. */ - goto end_nosignal; + DBG("Received trace chunk directory fd (%d)", + chunk_dirfd); + ret = lttng_directory_handle_init_from_dirfd( + &chunk_directory_handle.value, + chunk_dirfd); + if (ret) { + ERR("Failed to initialize chunk directory handle from directory file descriptor"); + if (close(chunk_dirfd)) { + PERROR("Failed to close chunk directory file descriptor"); + } + goto error_fatal; + } + chunk_directory_handle.is_set = true; } - if (pending < 0) { - /* - * An error occurred while running the command; - * don't send the 'pending' flag as the sessiond - * will not read it. - */ - break; - } + ret_code = lttng_consumer_create_trace_chunk( + !is_local_trace ? &relayd_id : NULL, + msg.u.create_trace_chunk.session_id, + msg.u.create_trace_chunk.chunk_id, + (time_t) msg.u.create_trace_chunk.creation_timestamp, + chunk_override_name, + &credentials, + chunk_directory_handle.is_set ? + &chunk_directory_handle.value : + NULL); - /* Send back returned value to session daemon */ - ret = lttcomm_send_unix_sock(sock, &pending_reply, - sizeof(pending_reply)); - if (ret < 0) { - PERROR("Failed to send rotation pending return code"); - goto error_fatal; + if (chunk_directory_handle.is_set) { + lttng_directory_handle_fini( + &chunk_directory_handle.value); } - break; + goto end_msg_sessiond; } - case LTTNG_CONSUMER_MKDIR: + case LTTNG_CONSUMER_CLOSE_TRACE_CHUNK: { - DBG("Consumer mkdir %s in session %" PRIu64, - msg.u.mkdir.path, - msg.u.mkdir.session_id); - ret = lttng_consumer_mkdir(msg.u.mkdir.path, - msg.u.mkdir.uid, - msg.u.mkdir.gid, - msg.u.mkdir.relayd_id); - if (ret < 0) { - ERR("consumer mkdir failed"); - ret_code = LTTCOMM_CONSUMERD_MKDIR_FAILED; - } - - health_code_update(); - - ret = consumer_send_status_msg(sock, ret_code); - if (ret < 0) { - /* Somehow, the session daemon is not responding anymore. */ - goto end_nosignal; - } - break; + const uint64_t relayd_id = + msg.u.close_trace_chunk.relayd_id.value; + + ret_code = lttng_consumer_close_trace_chunk( + msg.u.close_trace_chunk.relayd_id.is_set ? + &relayd_id : NULL, + msg.u.close_trace_chunk.session_id, + msg.u.close_trace_chunk.chunk_id, + (time_t) msg.u.close_trace_chunk.close_timestamp); + goto end_msg_sessiond; } - case LTTNG_CONSUMER_INIT: + case LTTNG_CONSUMER_TRACE_CHUNK_EXISTS: { - ret_code = lttng_consumer_init_command(ctx, - msg.u.init.sessiond_uuid); - - health_code_update(); - ret = consumer_send_status_msg(sock, ret_code); - if (ret < 0) { - /* Somehow, the session daemon is not responding anymore. */ - goto end_nosignal; - } - break; + const uint64_t relayd_id = + msg.u.trace_chunk_exists.relayd_id.value; + + ret_code = lttng_consumer_trace_chunk_exists( + msg.u.trace_chunk_exists.relayd_id.is_set ? + &relayd_id : NULL, + msg.u.trace_chunk_exists.session_id, + msg.u.trace_chunk_exists.chunk_id); + goto end_msg_sessiond; } default: break; @@ -2154,6 +2132,7 @@ end_msg_sessiond: return 1; end_channel_error: if (channel) { + pthread_mutex_unlock(&channel->lock); /* * Free channel here since no one has a reference to it. We don't * free after that because a stream can store this pointer. @@ -2320,6 +2299,7 @@ void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan) assert(chan); assert(chan->uchan); + assert(chan->buffer_credentials.is_set); if (chan->switch_timer_enabled == 1) { consumer_timer_switch_stop(chan); @@ -2338,7 +2318,9 @@ void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan) if (ret) { ERR("Cannot get stream shm path"); } - ret = run_as_unlink(shm_path, chan->uid, chan->gid); + ret = run_as_unlink(shm_path, + chan->buffer_credentials.value.uid, + chan->buffer_credentials.value.gid); if (ret) { PERROR("unlink %s", shm_path); } @@ -2350,13 +2332,15 @@ void lttng_ustconsumer_free_channel(struct lttng_consumer_channel *chan) { assert(chan); assert(chan->uchan); + assert(chan->buffer_credentials.is_set); consumer_metadata_cache_destroy(chan); ustctl_destroy_channel(chan->uchan); /* Try to rmdir all directories under shm_path root. */ if (chan->root_shm_path[0]) { (void) run_as_rmdir_recursive(chan->root_shm_path, - chan->uid, chan->gid); + chan->buffer_credentials.value.uid, + chan->buffer_credentials.value.gid); } free(chan->stream_fds); } @@ -2706,12 +2690,12 @@ end: /* * Read subbuffer from the given stream. * - * Stream lock MUST be acquired. + * Stream and channel locks MUST be acquired by the caller. * * Return 0 on success else a negative value. */ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, - struct lttng_consumer_local_data *ctx, bool *rotated) + struct lttng_consumer_local_data *ctx) { unsigned long len, subbuf_size, padding; int err, write_index = 1, rotation_ret; @@ -2756,7 +2740,7 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, */ if (stream->rotate_ready) { DBG("Rotate stream before extracting data"); - rotation_ret = lttng_consumer_rotate_stream(ctx, stream, rotated); + rotation_ret = lttng_consumer_rotate_stream(ctx, stream); if (rotation_ret < 0) { ERR("Stream rotation error"); ret = -1; @@ -2907,7 +2891,7 @@ rotate: */ rotation_ret = lttng_consumer_stream_is_rotate_ready(stream); if (rotation_ret == 1) { - rotation_ret = lttng_consumer_rotate_stream(ctx, stream, rotated); + rotation_ret = lttng_consumer_rotate_stream(ctx, stream); if (rotation_ret < 0) { ERR("Stream rotation error"); ret = -1; @@ -2933,31 +2917,16 @@ int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream) assert(stream); - /* Don't create anything if this is set for streaming. */ - if (stream->net_seq_idx == (uint64_t) -1ULL && stream->chan->monitor) { - ret = utils_create_stream_file(stream->chan->pathname, stream->name, - stream->chan->tracefile_size, stream->tracefile_count_current, - stream->uid, stream->gid, NULL); - if (ret < 0) { + /* + * Don't create anything if this is set for streaming or if there is + * no current trace chunk on the parent channel. + */ + if (stream->net_seq_idx == (uint64_t) -1ULL && stream->chan->monitor && + stream->chan->trace_chunk) { + ret = consumer_stream_create_output_files(stream, true); + if (ret) { goto error; } - stream->out_fd = ret; - stream->tracefile_size_current = 0; - - if (!stream->metadata_flag) { - struct lttng_index_file *index_file; - - index_file = lttng_index_file_create(stream->chan->pathname, - stream->name, stream->uid, stream->gid, - stream->chan->tracefile_size, - stream->tracefile_count_current, - CTF_INDEX_MAJOR, CTF_INDEX_MINOR); - if (!index_file) { - goto error; - } - assert(!stream->index_file); - stream->index_file = index_file; - } } ret = 0; diff --git a/src/common/ust-consumer/ust-consumer.h b/src/common/ust-consumer/ust-consumer.h index b0e1c7d0f..d73b9852e 100644 --- a/src/common/ust-consumer/ust-consumer.h +++ b/src/common/ust-consumer/ust-consumer.h @@ -45,7 +45,7 @@ extern int lttng_ustconsumer_add_stream(struct lttng_consumer_stream *stream); extern void lttng_ustconsumer_del_stream(struct lttng_consumer_stream *stream); int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, - struct lttng_consumer_local_data *ctx, bool *rotated); + struct lttng_consumer_local_data *ctx); int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream); void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream); @@ -158,7 +158,7 @@ void lttng_ustconsumer_del_stream(struct lttng_consumer_stream *stream) static inline int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, - struct lttng_consumer_local_data *ctx, bool *rotated) + struct lttng_consumer_local_data *ctx) { return -ENOSYS; }