X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fsession.c;h=abac2404f04bdda9b2c3a94ec8507c950f3a427a;hp=6c10aaeba9259286cdac4a43db72619d8a4e8f51;hb=d295668767ac8234e83984e1812d342d03293d88;hpb=fb9a95c4d6242bd8336b638c90a7d8f846125659 diff --git a/src/bin/lttng-sessiond/session.c b/src/bin/lttng-sessiond/session.c index 6c10aaeba..abac2404f 100644 --- a/src/bin/lttng-sessiond/session.c +++ b/src/bin/lttng-sessiond/session.c @@ -67,13 +67,6 @@ static const char *forbidden_name_chars = "/"; /* Global hash table to keep the sessions, indexed by id. */ static struct lttng_ht *ltt_sessions_ht_by_id = NULL; -struct consumer_create_chunk_transaction { - struct consumer_socket *socket; - struct lttng_trace_chunk *new_chunk; - struct lttng_trace_chunk *previous_chunk; - bool new_chunk_created; -}; - /* * Validate the session name for forbidden characters. * @@ -254,16 +247,26 @@ void session_get_net_consumer_ports(const struct ltt_session *session, struct lttng_trace_archive_location *session_get_trace_archive_location( struct ltt_session *session) { + int ret; struct lttng_trace_archive_location *location = NULL; + char *chunk_path = NULL; + + if (session->rotation_state != LTTNG_ROTATION_STATE_COMPLETED || + !session->last_archived_chunk_name) { + goto end; + } - if (session->rotation_state != LTTNG_ROTATION_STATE_COMPLETED) { + ret = asprintf(&chunk_path, "%s/" DEFAULT_ARCHIVED_TRACE_CHUNKS_DIRECTORY "/%s", + session_get_base_path(session), + session->last_archived_chunk_name); + if (ret == -1) { goto end; } switch (session_get_consumer_destination_type(session)) { case CONSUMER_DST_LOCAL: location = lttng_trace_archive_location_local_create( - session->rotation_chunk.current_rotate_path); + chunk_path); break; case CONSUMER_DST_NET: { @@ -277,14 +280,14 @@ struct lttng_trace_archive_location *session_get_trace_archive_location( location = lttng_trace_archive_location_relay_create( hostname, LTTNG_TRACE_ARCHIVE_LOCATION_RELAY_PROTOCOL_TYPE_TCP, - control_port, data_port, - session->rotation_chunk.current_rotate_path); + control_port, data_port, chunk_path); break; } default: abort(); } end: + free(chunk_path); return location; } @@ -411,150 +414,130 @@ void session_unlock(struct ltt_session *session) static int _session_set_trace_chunk_no_lock_check(struct ltt_session *session, - struct lttng_trace_chunk *new_trace_chunk) + struct lttng_trace_chunk *new_trace_chunk, + struct lttng_trace_chunk **_current_trace_chunk) { int ret; unsigned int i, refs_to_acquire = 0, refs_acquired = 0, refs_to_release = 0; - unsigned int consumer_count = 0; - /* - * The maximum amount of consumers to reach is 3 - * (32/64 userspace + kernel). - */ - struct consumer_create_chunk_transaction transactions[3] = {}; struct cds_lfht_iter iter; struct consumer_socket *socket; - bool close_error_occured = false; - - if (new_trace_chunk) { - uint64_t chunk_id; - enum lttng_trace_chunk_status chunk_status = - lttng_trace_chunk_get_id(new_trace_chunk, - &chunk_id); - - assert(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK); - LTTNG_OPTIONAL_SET(&session->last_trace_chunk_id, chunk_id) - } - - if (new_trace_chunk) { - refs_to_acquire = 1; - refs_to_acquire += !!session->ust_session; - refs_to_acquire += !!session->kernel_session; - } + struct lttng_trace_chunk *current_trace_chunk; + uint64_t chunk_id; + enum lttng_trace_chunk_status chunk_status; + const uint64_t relayd_id = session->consumer->net_seq_index; + const bool is_local_trace = relayd_id == -1ULL; + rcu_read_lock(); /* - * Build a list of consumers to reach to announce the new trace chunk. - * - * Rolling back the annoucement in case of an error is important since - * not doing so would result in a leak; the chunk will not be - * "reclaimed" by the consumer(s) since they have no concept of the - * lifetime of a session. + * Ownership of current trace chunk is transferred to + * `current_trace_chunk`. */ + current_trace_chunk = session->current_trace_chunk; + session->current_trace_chunk = NULL; if (session->ust_session) { - cds_lfht_for_each_entry( - session->ust_session->consumer->socks->ht, - &iter, socket, node.node) { - transactions[consumer_count].socket = socket; - transactions[consumer_count].new_chunk = new_trace_chunk; - transactions[consumer_count].previous_chunk = - session->current_trace_chunk; - consumer_count++; - assert(consumer_count <= 3); - } + lttng_trace_chunk_put( + session->ust_session->current_trace_chunk); + session->ust_session->current_trace_chunk = NULL; } if (session->kernel_session) { - cds_lfht_for_each_entry( - session->kernel_session->consumer->socks->ht, - &iter, socket, node.node) { - transactions[consumer_count].socket = socket; - transactions[consumer_count].new_chunk = new_trace_chunk; - transactions[consumer_count].previous_chunk = - session->current_trace_chunk; - consumer_count++; - assert(consumer_count <= 3); - } + lttng_trace_chunk_put( + session->kernel_session->current_trace_chunk); + session->kernel_session->current_trace_chunk = NULL; } - for (refs_acquired = 0; refs_acquired < refs_to_acquire; refs_acquired++) { - if (new_trace_chunk && !lttng_trace_chunk_get(new_trace_chunk)) { - ERR("Failed to acquire reference to new current trace chunk of session \"%s\"", - session->name); - goto error; - } + if (!new_trace_chunk) { + ret = 0; + goto end; } + chunk_status = lttng_trace_chunk_get_id(new_trace_chunk, &chunk_id); + assert(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK); - /* - * Close the previous chunk on remote peers (consumers and relayd). - */ - for (i = 0; i < consumer_count; i++) { - if (!transactions[i].previous_chunk) { - continue; - } - pthread_mutex_lock(transactions[i].socket->lock); - ret = consumer_close_trace_chunk(transactions[i].socket, - session->consumer->net_seq_index, - session->id, - transactions[i].previous_chunk); - pthread_mutex_unlock(transactions[i].socket->lock); - if (ret) { - ERR("Failed to close trace chunk on consumer"); - close_error_occured = true; + refs_to_acquire = 1; + refs_to_acquire += !!session->ust_session; + refs_to_acquire += !!session->kernel_session; + + for (refs_acquired = 0; refs_acquired < refs_to_acquire; + refs_acquired++) { + if (!lttng_trace_chunk_get(new_trace_chunk)) { + ERR("Failed to acquire reference to new trace chunk of session \"%s\"", + session->name); + goto error; } } - if (close_error_occured) { - /* - * Skip the creation of the new trace chunk and report the - * error. - */ - goto error; - } + if (session->ust_session) { + session->ust_session->current_trace_chunk = new_trace_chunk; + if (is_local_trace) { + enum lttng_error_code ret_error_code; - /* Create the new chunk on remote peers (consumers and relayd) */ - if (new_trace_chunk) { - for (i = 0; i < consumer_count; i++) { - pthread_mutex_lock(transactions[i].socket->lock); - ret = consumer_create_trace_chunk(transactions[i].socket, - session->consumer->net_seq_index, - session->id, - transactions[i].new_chunk); - pthread_mutex_unlock(transactions[i].socket->lock); - if (ret) { - ERR("Failed to create trace chunk on consumer"); + ret_error_code = ust_app_create_channel_subdirectories( + session->ust_session); + if (ret_error_code != LTTNG_OK) { + ret = -ret_error_code; goto error; } - /* This will have to be rolled-back on error. */ - transactions[i].new_chunk_created = true; - } - } - - lttng_trace_chunk_put(session->current_trace_chunk); - session->current_trace_chunk = NULL; - if (session->ust_session) { - lttng_trace_chunk_put( - session->ust_session->current_trace_chunk); - session->ust_session->current_trace_chunk = NULL; - } + } + cds_lfht_for_each_entry( + session->ust_session->consumer->socks->ht, + &iter, socket, node.node) { + pthread_mutex_lock(socket->lock); + ret = consumer_create_trace_chunk(socket, + relayd_id, + session->id, new_trace_chunk); + pthread_mutex_unlock(socket->lock); + if (ret) { + goto error; + } + } + } if (session->kernel_session) { - lttng_trace_chunk_put( - session->kernel_session->current_trace_chunk); - session->kernel_session->current_trace_chunk = NULL; - } + session->kernel_session->current_trace_chunk = new_trace_chunk; + if (is_local_trace) { + enum lttng_error_code ret_error_code; + + ret_error_code = kernel_create_channel_subdirectories( + session->kernel_session); + if (ret_error_code != LTTNG_OK) { + ret = -ret_error_code; + goto error; + } + } + cds_lfht_for_each_entry( + session->kernel_session->consumer->socks->ht, + &iter, socket, node.node) { + pthread_mutex_lock(socket->lock); + ret = consumer_create_trace_chunk(socket, + relayd_id, + session->id, new_trace_chunk); + pthread_mutex_unlock(socket->lock); + if (ret) { + goto error; + } + } + } /* * Update local current trace chunk state last, only if all remote - * annoucements succeeded. + * creations succeeded. */ session->current_trace_chunk = new_trace_chunk; + LTTNG_OPTIONAL_SET(&session->most_recent_chunk_id, chunk_id); +end: + if (_current_trace_chunk) { + *_current_trace_chunk = current_trace_chunk; + current_trace_chunk = NULL; + } +end_no_move: + rcu_read_unlock(); + lttng_trace_chunk_put(current_trace_chunk); + return ret; +error: if (session->ust_session) { - session->ust_session->current_trace_chunk = new_trace_chunk; + session->ust_session->current_trace_chunk = NULL; } if (session->kernel_session) { - session->kernel_session->current_trace_chunk = - new_trace_chunk; + session->kernel_session->current_trace_chunk = NULL; } - - return 0; -error: - /* + /* * Release references taken in the case where all references could not * be acquired. */ @@ -562,34 +545,12 @@ error: for (i = 0; i < refs_to_release; i++) { lttng_trace_chunk_put(new_trace_chunk); } - - /* - * Close the newly-created chunk from remote peers (consumers and - * relayd). - */ - DBG("Rolling back the creation of the new trace chunk on consumers"); - for (i = 0; i < consumer_count; i++) { - if (!transactions[i].new_chunk_created) { - continue; - } - - pthread_mutex_lock(transactions[i].socket->lock); - ret = consumer_close_trace_chunk(transactions[i].socket, - session->consumer->net_seq_index, - session->id, - transactions[i].new_chunk); - pthread_mutex_unlock(transactions[i].socket->lock); - if (ret) { - ERR("Failed to close trace chunk on consumer"); - close_error_occured = true; - } - } - - return -1; + ret = -1; + goto end_no_move; } static -bool output_supports_chunks(const struct ltt_session *session) +bool output_supports_trace_chunks(const struct ltt_session *session) { if (session->consumer->type == CONSUMER_DST_LOCAL) { return true; @@ -614,15 +575,15 @@ bool output_supports_chunks(const struct ltt_session *session) return false; } -enum lttng_error_code session_switch_trace_chunk(struct ltt_session *session, +struct lttng_trace_chunk *session_create_new_trace_chunk( + struct ltt_session *session, const char *session_base_path_override, const char *chunk_name_override) { int ret; - enum lttng_error_code ret_code = LTTNG_OK; struct lttng_trace_chunk *trace_chunk = NULL; enum lttng_trace_chunk_status chunk_status; - const time_t timestamp_begin = time(NULL); + const time_t chunk_creation_ts = time(NULL); const bool is_local_trace = session->consumer->type == CONSUMER_DST_LOCAL; const char *base_path = session_base_path_override ? : @@ -634,37 +595,28 @@ enum lttng_error_code session_switch_trace_chunk(struct ltt_session *session, }; uint64_t next_chunk_id; - if (timestamp_begin == (time_t) -1) { - PERROR("Failed to sample time while changing session \"%s\" trace chunk", + if (chunk_creation_ts == (time_t) -1) { + PERROR("Failed to sample time while creation session \"%s\" trace chunk", session->name); - ret_code = LTTNG_ERR_FATAL; goto error; } - session->current_chunk_start_ts = timestamp_begin; - if (!output_supports_chunks(session)) { + if (!output_supports_trace_chunks(session)) { goto end; } - next_chunk_id = session->last_trace_chunk_id.is_set ? - session->last_trace_chunk_id.value + 1 : 0; + next_chunk_id = session->most_recent_chunk_id.is_set ? + session->most_recent_chunk_id.value + 1 : 0; - trace_chunk = lttng_trace_chunk_create(next_chunk_id, timestamp_begin); + trace_chunk = lttng_trace_chunk_create(next_chunk_id, + chunk_creation_ts); if (!trace_chunk) { - ret_code = LTTNG_ERR_FATAL; goto error; } if (chunk_name_override) { chunk_status = lttng_trace_chunk_override_name(trace_chunk, chunk_name_override); - switch (chunk_status) { - case LTTNG_TRACE_CHUNK_STATUS_OK: - break; - case LTTNG_TRACE_CHUNK_STATUS_INVALID_ARGUMENT: - ret_code = LTTNG_ERR_INVALID; - goto error; - default: - ret_code = LTTNG_ERR_NOMEM; + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { goto error; } } @@ -674,49 +626,101 @@ enum lttng_error_code session_switch_trace_chunk(struct ltt_session *session, * No need to set crendentials and output directory * for remote trace chunks. */ - goto publish; + goto end; } chunk_status = lttng_trace_chunk_set_credentials(trace_chunk, &session_credentials); if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { - ret_code = LTTNG_ERR_FATAL; goto error; } - if (!session->current_trace_chunk) { - DBG("Creating base output directory of session \"%s\" at %s", - session->name, base_path); - } + DBG("Creating base output directory of session \"%s\" at %s", + session->name, base_path); ret = utils_mkdir_recursive(base_path, S_IRWXU | S_IRWXG, session->uid, session->gid); if (ret) { - ret = LTTNG_ERR_FATAL; goto error; } ret = lttng_directory_handle_init(&session_output_directory, base_path); if (ret) { - ret = LTTNG_ERR_FATAL; goto error; } chunk_status = lttng_trace_chunk_set_as_owner(trace_chunk, &session_output_directory); lttng_directory_handle_fini(&session_output_directory); if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { - ret = LTTNG_ERR_CREATE_DIR_FAIL; - goto error; - } -publish: - ret = session_set_trace_chunk(session, trace_chunk); - if (ret) { - ret_code = LTTNG_ERR_FATAL; goto error; } +end: + return trace_chunk; error: lttng_trace_chunk_put(trace_chunk); + trace_chunk = NULL; + goto end; +} + +int session_close_trace_chunk(const struct ltt_session *session, + struct lttng_trace_chunk *trace_chunk) +{ + int ret = 0; + bool error_occurred = false; + struct cds_lfht_iter iter; + struct consumer_socket *socket; + enum lttng_trace_chunk_status chunk_status; + const time_t chunk_close_timestamp = time(NULL); + + if (chunk_close_timestamp == (time_t) -1) { + ERR("Failed to sample the close timestamp of the current trace chunk of session \"%s\"", + session->name); + ret = -1; + goto end; + } + chunk_status = lttng_trace_chunk_set_close_timestamp(trace_chunk, + chunk_close_timestamp); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + ERR("Failed to set the close timestamp of the current trace chunk of session \"%s\"", + session->name); + ret = -1; + goto end; + } + + if (session->ust_session) { + cds_lfht_for_each_entry( + session->ust_session->consumer->socks->ht, + &iter, socket, node.node) { + pthread_mutex_lock(socket->lock); + ret = consumer_close_trace_chunk(socket, + session->consumer->net_seq_index, + session->id, + trace_chunk); + pthread_mutex_unlock(socket->lock); + if (ret) { + ERR("Failed to close trace chunk on user space consumer"); + error_occurred = true; + } + } + } + if (session->kernel_session) { + cds_lfht_for_each_entry( + session->kernel_session->consumer->socks->ht, + &iter, socket, node.node) { + pthread_mutex_lock(socket->lock); + ret = consumer_close_trace_chunk(socket, + session->consumer->net_seq_index, + session->id, + trace_chunk); + pthread_mutex_unlock(socket->lock); + if (ret) { + ERR("Failed to close trace chunk on kernel consumer"); + error_occurred = true; + } + } + } + ret = error_occurred ? -1 : 0; end: - return ret_code; + return ret; } /* @@ -725,10 +729,12 @@ end: * Must be called with the session lock held. */ int session_set_trace_chunk(struct ltt_session *session, - struct lttng_trace_chunk *new_trace_chunk) + struct lttng_trace_chunk *new_trace_chunk, + struct lttng_trace_chunk **current_trace_chunk) { ASSERT_LOCKED(session->lock); - return _session_set_trace_chunk_no_lock_check(session, new_trace_chunk); + return _session_set_trace_chunk_no_lock_check(session, new_trace_chunk, + current_trace_chunk); } static @@ -739,11 +745,24 @@ void session_release(struct urcu_ref *ref) struct ltt_kernel_session *ksess; struct ltt_session *session = container_of(ref, typeof(*session), ref); + assert(!session->chunk_being_archived); + usess = session->ust_session; ksess = session->kernel_session; - (void) _session_set_trace_chunk_no_lock_check(session, NULL); + if (session->current_trace_chunk) { + ret = session_close_trace_chunk(session, session->current_trace_chunk); + if (ret) { + ERR("Failed to close the current trace chunk of session \"%s\" during its release", + session->name); + } + ret = _session_set_trace_chunk_no_lock_check(session, NULL, NULL); + if (ret) { + ERR("Failed to release the current trace chunk of session \"%s\" during its release", + session->name); + } + } - /* Clean kernel session teardown */ + /* Clean kernel session teardown */ kernel_destroy_session(ksess); session->kernel_session = NULL; @@ -785,6 +804,7 @@ void session_release(struct urcu_ref *ref) del_session_ht(session); pthread_cond_broadcast(<t_session_list.removal_cond); } + free(session->last_archived_chunk_name); free(session); } @@ -1097,12 +1117,22 @@ int session_reset_rotation_state(struct ltt_session *session, ASSERT_LOCKED(ltt_session_list.lock); ASSERT_LOCKED(session->lock); - session->rotation_pending_local = false; - session->rotation_pending_relay = false; - session->rotated_after_last_stop = false; session->rotation_state = result; if (session->rotation_pending_check_timer_enabled) { ret = timer_session_rotation_pending_check_stop(session); } + if (session->chunk_being_archived) { + uint64_t chunk_id; + enum lttng_trace_chunk_status chunk_status; + + chunk_status = lttng_trace_chunk_get_id( + session->chunk_being_archived, + &chunk_id); + assert(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK); + LTTNG_OPTIONAL_SET(&session->last_archived_chunk_id, + chunk_id); + lttng_trace_chunk_put(session->chunk_being_archived); + session->chunk_being_archived = NULL; + } return ret; }