X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fsession.c;h=6c10aaeba9259286cdac4a43db72619d8a4e8f51;hp=491de61759825f0485326cae5b2d7e72db72c6ad;hb=fb9a95c4d6242bd8336b638c90a7d8f846125659;hpb=b178f53e90c376dd44b020535c32649edef8f80e diff --git a/src/bin/lttng-sessiond/session.c b/src/bin/lttng-sessiond/session.c index 491de6175..6c10aaeba 100644 --- a/src/bin/lttng-sessiond/session.c +++ b/src/bin/lttng-sessiond/session.c @@ -28,6 +28,8 @@ #include #include +#include +#include #include #include #include "lttng-sessiond.h" @@ -65,6 +67,13 @@ static const char *forbidden_name_chars = "/"; /* Global hash table to keep the sessions, indexed by id. */ static struct lttng_ht *ltt_sessions_ht_by_id = NULL; +struct consumer_create_chunk_transaction { + struct consumer_socket *socket; + struct lttng_trace_chunk *new_chunk; + struct lttng_trace_chunk *previous_chunk; + bool new_chunk_created; +}; + /* * Validate the session name for forbidden characters. * @@ -400,6 +409,328 @@ void session_unlock(struct ltt_session *session) pthread_mutex_unlock(&session->lock); } +static +int _session_set_trace_chunk_no_lock_check(struct ltt_session *session, + struct lttng_trace_chunk *new_trace_chunk) +{ + int ret; + unsigned int i, refs_to_acquire = 0, refs_acquired = 0, refs_to_release = 0; + unsigned int consumer_count = 0; + /* + * The maximum amount of consumers to reach is 3 + * (32/64 userspace + kernel). + */ + struct consumer_create_chunk_transaction transactions[3] = {}; + struct cds_lfht_iter iter; + struct consumer_socket *socket; + bool close_error_occured = false; + + if (new_trace_chunk) { + uint64_t chunk_id; + enum lttng_trace_chunk_status chunk_status = + lttng_trace_chunk_get_id(new_trace_chunk, + &chunk_id); + + assert(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK); + LTTNG_OPTIONAL_SET(&session->last_trace_chunk_id, chunk_id) + } + + if (new_trace_chunk) { + refs_to_acquire = 1; + refs_to_acquire += !!session->ust_session; + refs_to_acquire += !!session->kernel_session; + } + + /* + * Build a list of consumers to reach to announce the new trace chunk. + * + * Rolling back the annoucement in case of an error is important since + * not doing so would result in a leak; the chunk will not be + * "reclaimed" by the consumer(s) since they have no concept of the + * lifetime of a session. + */ + if (session->ust_session) { + cds_lfht_for_each_entry( + session->ust_session->consumer->socks->ht, + &iter, socket, node.node) { + transactions[consumer_count].socket = socket; + transactions[consumer_count].new_chunk = new_trace_chunk; + transactions[consumer_count].previous_chunk = + session->current_trace_chunk; + consumer_count++; + assert(consumer_count <= 3); + } + } + if (session->kernel_session) { + cds_lfht_for_each_entry( + session->kernel_session->consumer->socks->ht, + &iter, socket, node.node) { + transactions[consumer_count].socket = socket; + transactions[consumer_count].new_chunk = new_trace_chunk; + transactions[consumer_count].previous_chunk = + session->current_trace_chunk; + consumer_count++; + assert(consumer_count <= 3); + } + } + for (refs_acquired = 0; refs_acquired < refs_to_acquire; refs_acquired++) { + if (new_trace_chunk && !lttng_trace_chunk_get(new_trace_chunk)) { + ERR("Failed to acquire reference to new current trace chunk of session \"%s\"", + session->name); + goto error; + } + } + + /* + * Close the previous chunk on remote peers (consumers and relayd). + */ + for (i = 0; i < consumer_count; i++) { + if (!transactions[i].previous_chunk) { + continue; + } + pthread_mutex_lock(transactions[i].socket->lock); + ret = consumer_close_trace_chunk(transactions[i].socket, + session->consumer->net_seq_index, + session->id, + transactions[i].previous_chunk); + pthread_mutex_unlock(transactions[i].socket->lock); + if (ret) { + ERR("Failed to close trace chunk on consumer"); + close_error_occured = true; + } + } + + if (close_error_occured) { + /* + * Skip the creation of the new trace chunk and report the + * error. + */ + goto error; + } + + /* Create the new chunk on remote peers (consumers and relayd) */ + if (new_trace_chunk) { + for (i = 0; i < consumer_count; i++) { + pthread_mutex_lock(transactions[i].socket->lock); + ret = consumer_create_trace_chunk(transactions[i].socket, + session->consumer->net_seq_index, + session->id, + transactions[i].new_chunk); + pthread_mutex_unlock(transactions[i].socket->lock); + if (ret) { + ERR("Failed to create trace chunk on consumer"); + goto error; + } + /* This will have to be rolled-back on error. */ + transactions[i].new_chunk_created = true; + } + } + + lttng_trace_chunk_put(session->current_trace_chunk); + session->current_trace_chunk = NULL; + if (session->ust_session) { + lttng_trace_chunk_put( + session->ust_session->current_trace_chunk); + session->ust_session->current_trace_chunk = NULL; + } + if (session->kernel_session) { + lttng_trace_chunk_put( + session->kernel_session->current_trace_chunk); + session->kernel_session->current_trace_chunk = NULL; + } + + /* + * Update local current trace chunk state last, only if all remote + * annoucements succeeded. + */ + session->current_trace_chunk = new_trace_chunk; + if (session->ust_session) { + session->ust_session->current_trace_chunk = new_trace_chunk; + } + if (session->kernel_session) { + session->kernel_session->current_trace_chunk = + new_trace_chunk; + } + + return 0; +error: + /* + * Release references taken in the case where all references could not + * be acquired. + */ + refs_to_release = refs_to_acquire - refs_acquired; + for (i = 0; i < refs_to_release; i++) { + lttng_trace_chunk_put(new_trace_chunk); + } + + /* + * Close the newly-created chunk from remote peers (consumers and + * relayd). + */ + DBG("Rolling back the creation of the new trace chunk on consumers"); + for (i = 0; i < consumer_count; i++) { + if (!transactions[i].new_chunk_created) { + continue; + } + + pthread_mutex_lock(transactions[i].socket->lock); + ret = consumer_close_trace_chunk(transactions[i].socket, + session->consumer->net_seq_index, + session->id, + transactions[i].new_chunk); + pthread_mutex_unlock(transactions[i].socket->lock); + if (ret) { + ERR("Failed to close trace chunk on consumer"); + close_error_occured = true; + } + } + + return -1; +} + +static +bool output_supports_chunks(const struct ltt_session *session) +{ + if (session->consumer->type == CONSUMER_DST_LOCAL) { + return true; + } else { + struct consumer_output *output; + + if (session->ust_session) { + output = session->ust_session->consumer; + } else if (session->kernel_session) { + output = session->kernel_session->consumer; + } else { + abort(); + } + + if (output->relay_major_version > 2) { + return true; + } else if (output->relay_major_version == 2 && + output->relay_minor_version >= 11) { + return true; + } + } + return false; +} + +enum lttng_error_code session_switch_trace_chunk(struct ltt_session *session, + const char *session_base_path_override, + const char *chunk_name_override) +{ + int ret; + enum lttng_error_code ret_code = LTTNG_OK; + struct lttng_trace_chunk *trace_chunk = NULL; + enum lttng_trace_chunk_status chunk_status; + const time_t timestamp_begin = time(NULL); + const bool is_local_trace = + session->consumer->type == CONSUMER_DST_LOCAL; + const char *base_path = session_base_path_override ? : + session_get_base_path(session); + struct lttng_directory_handle session_output_directory; + const struct lttng_credentials session_credentials = { + .uid = session->uid, + .gid = session->gid, + }; + uint64_t next_chunk_id; + + if (timestamp_begin == (time_t) -1) { + PERROR("Failed to sample time while changing session \"%s\" trace chunk", + session->name); + ret_code = LTTNG_ERR_FATAL; + goto error; + } + session->current_chunk_start_ts = timestamp_begin; + + if (!output_supports_chunks(session)) { + goto end; + } + next_chunk_id = session->last_trace_chunk_id.is_set ? + session->last_trace_chunk_id.value + 1 : 0; + + trace_chunk = lttng_trace_chunk_create(next_chunk_id, timestamp_begin); + if (!trace_chunk) { + ret_code = LTTNG_ERR_FATAL; + goto error; + } + + if (chunk_name_override) { + chunk_status = lttng_trace_chunk_override_name(trace_chunk, + chunk_name_override); + switch (chunk_status) { + case LTTNG_TRACE_CHUNK_STATUS_OK: + break; + case LTTNG_TRACE_CHUNK_STATUS_INVALID_ARGUMENT: + ret_code = LTTNG_ERR_INVALID; + goto error; + default: + ret_code = LTTNG_ERR_NOMEM; + goto error; + } + } + + if (!is_local_trace) { + /* + * No need to set crendentials and output directory + * for remote trace chunks. + */ + goto publish; + } + + chunk_status = lttng_trace_chunk_set_credentials(trace_chunk, + &session_credentials); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + ret_code = LTTNG_ERR_FATAL; + goto error; + } + + if (!session->current_trace_chunk) { + DBG("Creating base output directory of session \"%s\" at %s", + session->name, base_path); + } + ret = utils_mkdir_recursive(base_path, S_IRWXU | S_IRWXG, + session->uid, session->gid); + if (ret) { + ret = LTTNG_ERR_FATAL; + goto error; + } + ret = lttng_directory_handle_init(&session_output_directory, + base_path); + if (ret) { + ret = LTTNG_ERR_FATAL; + goto error; + } + chunk_status = lttng_trace_chunk_set_as_owner(trace_chunk, + &session_output_directory); + lttng_directory_handle_fini(&session_output_directory); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + ret = LTTNG_ERR_CREATE_DIR_FAIL; + goto error; + } +publish: + ret = session_set_trace_chunk(session, trace_chunk); + if (ret) { + ret_code = LTTNG_ERR_FATAL; + goto error; + } +error: + lttng_trace_chunk_put(trace_chunk); +end: + return ret_code; +} + +/* + * Set a session's current trace chunk. + * + * Must be called with the session lock held. + */ +int session_set_trace_chunk(struct ltt_session *session, + struct lttng_trace_chunk *new_trace_chunk) +{ + ASSERT_LOCKED(session->lock); + return _session_set_trace_chunk_no_lock_check(session, new_trace_chunk); +} + static void session_release(struct urcu_ref *ref) { @@ -410,9 +741,11 @@ void session_release(struct urcu_ref *ref) usess = session->ust_session; ksess = session->kernel_session; + (void) _session_set_trace_chunk_no_lock_check(session, NULL); /* Clean kernel session teardown */ kernel_destroy_session(ksess); + session->kernel_session = NULL; /* UST session teardown */ if (usess) { @@ -427,6 +760,7 @@ void session_release(struct urcu_ref *ref) /* Clean up the rest. */ trace_ust_destroy_session(usess); + session->ust_session = NULL; } /* @@ -439,11 +773,12 @@ void session_release(struct urcu_ref *ref) } DBG("Destroying session %s (id %" PRIu64 ")", session->name, session->id); - pthread_mutex_destroy(&session->lock); consumer_output_put(session->consumer); snapshot_destroy(&session->snapshot); + pthread_mutex_destroy(&session->lock); + if (session->published) { ASSERT_LOCKED(ltt_session_list.lock); del_session_list(session);