X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fnotification-thread-events.cpp;h=43879c99fe8957d021e2dacbb2218997cd3c73d8;hb=a3c9aa3ccf6bf710701074ffa97f2b7a59b0fc16;hp=d6c0d813550a21398a8e6e1987e4f2d437697c83;hpb=f149493493fbd8a3efa4748832c03278c96c38ca;p=lttng-tools.git diff --git a/src/bin/lttng-sessiond/notification-thread-events.cpp b/src/bin/lttng-sessiond/notification-thread-events.cpp index d6c0d8135..43879c99f 100644 --- a/src/bin/lttng-sessiond/notification-thread-events.cpp +++ b/src/bin/lttng-sessiond/notification-thread-events.cpp @@ -33,6 +33,7 @@ #include #include #include +#include #include #include @@ -80,11 +81,7 @@ struct lttng_channel_trigger_list { * - lttng_session_trigger_list_add() */ struct lttng_session_trigger_list { - /* - * Not owned by this; points to the session_info structure's - * session name. - */ - const char *session_name; + char *session_name; /* List of struct lttng_trigger_list_element. */ struct cds_list_head list; /* Node in the session_triggers_ht */ @@ -137,7 +134,6 @@ struct channel_state_sample { struct cds_lfht_node channel_state_ht_node; uint64_t highest_usage; uint64_t lowest_usage; - uint64_t channel_total_consumed; /* call_rcu delayed reclaim. */ struct rcu_head rcu_node; }; @@ -149,8 +145,6 @@ static int evaluate_buffer_condition(const struct lttng_condition *condition, const struct notification_thread_state *state, const struct channel_state_sample *previous_sample, const struct channel_state_sample *latest_sample, - uint64_t previous_session_consumed_total, - uint64_t latest_session_consumed_total, struct channel_info *channel_info); static int send_evaluation_to_clients(const struct lttng_trigger *trigger, @@ -168,13 +162,14 @@ void session_info_get(struct session_info *session_info); static void session_info_put(struct session_info *session_info); static -struct session_info *session_info_create(const char *name, - uid_t uid, gid_t gid, +struct session_info *session_info_create(uint64_t id, + const char *name, + uid_t uid, + gid_t gid, struct lttng_session_trigger_list *trigger_list, struct cds_lfht *sessions_ht); -static -void session_info_add_channel(struct session_info *session_info, - struct channel_info *channel_info); +static void session_info_add_channel( + struct session_info *session_info, struct channel_info *channel_info); static void session_info_remove_channel(struct session_info *session_info, struct channel_info *channel_info); @@ -225,8 +220,8 @@ int match_client_id(struct cds_lfht_node *node, const void *key) { /* This double-cast is intended to supress pointer-to-cast warning. */ const notification_client_id id = *((notification_client_id *) key); - const struct notification_client *client = caa_container_of( - node, struct notification_client, client_id_ht_node); + const struct notification_client *client = lttng::utils::container_of( + node, ¬ification_client::client_id_ht_node); return client->id == id; } @@ -322,13 +317,60 @@ int match_client_list_condition(struct cds_lfht_node *node, const void *key) } static -int match_session(struct cds_lfht_node *node, const void *key) +int match_session_info(struct cds_lfht_node *node, const void *key) { - const char *name = (const char *) key; - struct session_info *session_info = caa_container_of( - node, struct session_info, sessions_ht_node); + const auto session_id = *((uint64_t *) key); + const auto *session_info = lttng::utils::container_of( + node, &session_info::sessions_ht_node); + + return session_id == session_info->id; +} - return !strcmp(session_info->name, name); +static +unsigned long hash_session_info_id(uint64_t id) +{ + return hash_key_u64(&id, lttng_ht_seed); +} + +static +unsigned long hash_session_info(const struct session_info *session_info) +{ + return hash_session_info_id(session_info->id); +} + +static +struct session_info *get_session_info_by_id( + const struct notification_thread_state *state, uint64_t id) +{ + struct cds_lfht_iter iter; + struct cds_lfht_node *node; + lttng::urcu::read_lock_guard read_lock_guard; + + cds_lfht_lookup(state->sessions_ht, + hash_session_info_id(id), + match_session_info, + &id, + &iter); + node = cds_lfht_iter_get_node(&iter); + + if (node) { + auto session_info = lttng::utils::container_of(node, &session_info::sessions_ht_node); + + session_info_get(session_info); + return session_info; + } + + return NULL; +} + +static +struct session_info *get_session_info_by_name( + const struct notification_thread_state *state, const char *name) +{ + uint64_t session_id; + const auto found = sample_session_id_by_name(name, &session_id); + + return found ? get_session_info_by_id(state, session_id) : NULL; } static @@ -344,6 +386,10 @@ const char *notification_command_type_str( return "ADD_CHANNEL"; case NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL: return "REMOVE_CHANNEL"; + case NOTIFICATION_COMMAND_TYPE_ADD_SESSION: + return "ADD_SESSION"; + case NOTIFICATION_COMMAND_TYPE_REMOVE_SESSION: + return "REMOVE_SESSION"; case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING: return "SESSION_ROTATION_ONGOING"; case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED: @@ -478,10 +524,10 @@ enum lttng_object_type get_condition_binding_object( switch (lttng_condition_get_type(condition)) { case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW: case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH: - case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE: return LTTNG_OBJECT_TYPE_CHANNEL; case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING: case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED: + case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE: return LTTNG_OBJECT_TYPE_SESSION; case LTTNG_CONDITION_TYPE_EVENT_RULE_MATCHES: return LTTNG_OBJECT_TYPE_NONE; @@ -493,7 +539,7 @@ enum lttng_object_type get_condition_binding_object( static void free_channel_info_rcu(struct rcu_head *node) { - free(caa_container_of(node, struct channel_info, rcu_node)); + free(lttng::utils::container_of(node, &channel_info::rcu_node)); } static @@ -517,7 +563,7 @@ void channel_info_destroy(struct channel_info *channel_info) static void free_session_info_rcu(struct rcu_head *node) { - free(caa_container_of(node, struct session_info, rcu_node)); + free(lttng::utils::container_of(node, &session_info::rcu_node)); } /* Don't call directly, use the ref-counting mechanism. */ @@ -541,6 +587,7 @@ void session_info_destroy(void *_data) &session_info->sessions_ht_node); rcu_read_unlock(); free(session_info->name); + lttng_trace_archive_location_put(session_info->last_state_sample.rotation.location); call_rcu(&session_info->rcu_node, free_session_info_rcu); } @@ -563,7 +610,10 @@ void session_info_put(struct session_info *session_info) } static -struct session_info *session_info_create(const char *name, uid_t uid, gid_t gid, +struct session_info *session_info_create(uint64_t id, + const char *name, + uid_t uid, + gid_t gid, struct lttng_session_trigger_list *trigger_list, struct cds_lfht *sessions_ht) { @@ -575,6 +625,7 @@ struct session_info *session_info_create(const char *name, uid_t uid, gid_t gid, if (!session_info) { goto end; } + lttng_ref_init(&session_info->ref, session_info_destroy); session_info->channel_infos_ht = cds_lfht_new(DEFAULT_HT_SIZE, @@ -584,10 +635,12 @@ struct session_info *session_info_create(const char *name, uid_t uid, gid_t gid, } cds_lfht_node_init(&session_info->sessions_ht_node); + session_info->id = id; session_info->name = strdup(name); if (!session_info->name) { goto error; } + session_info->uid = uid; session_info->gid = gid; session_info->trigger_list = trigger_list; @@ -672,7 +725,7 @@ static void notification_client_list_release(struct urcu_ref *list_ref) { struct notification_client_list *list = - container_of(list_ref, typeof(*list), ref); + lttng::utils::container_of(list_ref, ¬ification_client_list::ref); struct notification_client_list_element *client_list_element, *tmp; lttng_condition_put(list->condition); @@ -816,8 +869,8 @@ struct notification_client_list *get_client_list_from_condition( &iter); node = cds_lfht_iter_get_node(&iter); if (node) { - list = container_of(node, struct notification_client_list, - notification_trigger_clients_ht_node); + list = lttng::utils::container_of(node, + ¬ification_client_list::notification_trigger_clients_ht_node); list = notification_client_list_get(list) ? list : NULL; } @@ -906,7 +959,6 @@ int evaluate_channel_condition_for_client( ret = evaluate_buffer_condition(condition, evaluation, state, NULL, last_sample, - 0, channel_info->session_info->consumed_data_size, channel_info); if (ret) { WARN("Fatal error occurred while evaluating a newly subscribed-to condition"); @@ -953,73 +1005,141 @@ end: } static -int evaluate_session_condition_for_client( +bool evaluate_session_rotation_ongoing_condition(const struct lttng_condition *condition + __attribute__((unused)), + const struct session_state_sample *sample) +{ + return sample->rotation.ongoing; +} + +static +bool evaluate_session_consumed_size_condition( const struct lttng_condition *condition, - struct notification_thread_state *state, - struct lttng_evaluation **evaluation, - uid_t *session_uid, gid_t *session_gid) + const struct session_state_sample *sample) +{ + uint64_t threshold; + const struct lttng_condition_session_consumed_size *size_condition = + lttng::utils::container_of(condition, + <tng_condition_session_consumed_size::parent); + + threshold = size_condition->consumed_threshold_bytes.value; + DBG("Session consumed size condition being evaluated: threshold = %" PRIu64 ", current size = %" PRIu64, + threshold, sample->consumed_data_size); + return sample->consumed_data_size >= threshold; +} + +/* + * `new_state` can be NULL to indicate that we are not evaluating a + * state transition. A client subscribed or a trigger was registered and + * we wish to perform an initial evaluation. + */ +static +int evaluate_session_condition( + const struct lttng_condition *condition, + const struct session_info *session_info, + const struct session_state_sample *new_state, + struct lttng_evaluation **evaluation) { int ret; - struct cds_lfht_iter iter; - struct cds_lfht_node *node; - const char *session_name; - struct session_info *session_info = NULL; + bool previous_result, newest_result; - rcu_read_lock(); - session_name = get_condition_session_name(condition); + switch (lttng_condition_get_type(condition)) { + case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING: + if (new_state) { + previous_result = evaluate_session_rotation_ongoing_condition( + condition, &session_info->last_state_sample); + newest_result = evaluate_session_rotation_ongoing_condition( + condition, new_state); + } else { + previous_result = false; + newest_result = evaluate_session_rotation_ongoing_condition( + condition, &session_info->last_state_sample); + } + break; + case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE: + if (new_state) { + previous_result = evaluate_session_consumed_size_condition( + condition, &session_info->last_state_sample); + newest_result = evaluate_session_consumed_size_condition( + condition, new_state); + } else { + previous_result = false; + newest_result = evaluate_session_consumed_size_condition( + condition, &session_info->last_state_sample); + } + break; + case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED: + /* + * Note that LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED is + * evaluated differently to only consider state transitions without regard for the + * initial state. This is a deliberate choice as it is unlikely that a user would + * expect an action to occur for a rotation that occurred long before the trigger or + * subscription occurred. + */ + if (!new_state) { + ret = 0; + goto end; + } - /* Find the session associated with the trigger. */ - cds_lfht_lookup(state->sessions_ht, - hash_key_str(session_name, lttng_ht_seed), - match_session, - session_name, - &iter); - node = cds_lfht_iter_get_node(&iter); - if (!node) { - DBG("No known session matching name \"%s\"", - session_name); + previous_result = !session_info->last_state_sample.rotation.ongoing; + newest_result = !new_state->rotation.ongoing; + break; + default: ret = 0; goto end; } - session_info = caa_container_of(node, struct session_info, - sessions_ht_node); - session_info_get(session_info); + if (!newest_result || (previous_result == newest_result)) { + /* Not a state transition, evaluate to false. */ + ret = 0; + goto end; + } - /* - * Evaluation is performed in-line here since only one type of - * session-bound condition is handled for the moment. - */ switch (lttng_condition_get_type(condition)) { case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING: - if (!session_info->rotation.ongoing) { - ret = 0; - goto end_session_put; - } + { + const auto rotation_id = new_state ? + new_state->rotation.id : + session_info->last_state_sample.rotation.id; - *evaluation = lttng_evaluation_session_rotation_ongoing_create( - session_info->rotation.id); - if (!*evaluation) { - /* Fatal error. */ - ERR("Failed to create session rotation ongoing evaluation for session \"%s\"", - session_info->name); - ret = -1; - goto end_session_put; - } - ret = 0; + *evaluation = lttng_evaluation_session_rotation_ongoing_create(rotation_id); + break; + } + case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED: + { + const auto rotation_id = new_state ? + new_state->rotation.id : + session_info->last_state_sample.rotation.id; + + /* Callee acquires a reference to location. */ + *evaluation = lttng_evaluation_session_rotation_completed_create( + rotation_id, new_state->rotation.location); break; + } + case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE: + { + const auto latest_session_consumed_total = new_state ? + new_state->consumed_data_size : + session_info->last_state_sample.consumed_data_size; + + *evaluation = lttng_evaluation_session_consumed_size_create( + latest_session_consumed_total); + break; + } default: - ret = 0; - goto end_session_put; + abort(); } - *session_uid = session_info->uid; - *session_gid = session_info->gid; + if (!*evaluation) { + /* Fatal error. */ + ERR("Failed to create session condition evaluation: session name = `%s`", + session_info->name); + ret = -1; + goto end; + } -end_session_put: - session_info_put(session_info); + ret = 0; end: - rcu_read_unlock(); return ret; } @@ -1052,9 +1172,25 @@ int evaluate_condition_for_client(const struct lttng_trigger *trigger, switch (get_condition_binding_object(condition)) { case LTTNG_OBJECT_TYPE_SESSION: - ret = evaluate_session_condition_for_client(condition, state, - &evaluation, &object_uid, &object_gid); + { + /* Find the session associated with the condition. */ + const auto *session_name = get_condition_session_name(condition); + auto session_info = get_session_info_by_name(state, session_name); + if (!session_info) { + /* Not an error, the session doesn't exist yet. */ + DBG("Session not found while evaluating session condition for client: session name = `%s`", + session_name); + ret = 0; + goto end; + } + + object_uid = session_info->uid; + object_gid = session_info->gid; + + ret = evaluate_session_condition(condition, session_info, NULL, &evaluation); + session_info_put(session_info); break; + } case LTTNG_OBJECT_TYPE_CHANNEL: ret = evaluate_channel_condition_for_client(condition, state, &evaluation, &object_uid, &object_gid); @@ -1284,7 +1420,7 @@ end: static void free_notification_client_rcu(struct rcu_head *node) { - free(caa_container_of(node, struct notification_client, rcu_node)); + free(lttng::utils::container_of(node, ¬ification_client::rcu_node)); } static @@ -1406,27 +1542,6 @@ fail: return false; } -static -bool session_consumed_size_condition_applies_to_channel( - const struct lttng_condition *condition, - const struct channel_info *channel_info) -{ - enum lttng_condition_status status; - const char *condition_session_name = NULL; - - status = lttng_condition_session_consumed_size_get_session_name( - condition, &condition_session_name); - LTTNG_ASSERT((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name); - - if (strcmp(channel_info->session_info->name, condition_session_name)) { - goto fail; - } - - return true; -fail: - return false; -} - static bool trigger_applies_to_channel(const struct lttng_trigger *trigger, const struct channel_info *channel_info) @@ -1445,10 +1560,6 @@ bool trigger_applies_to_channel(const struct lttng_trigger *trigger, trigger_applies = buffer_usage_condition_applies_to_channel( condition, channel_info); break; - case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE: - trigger_applies = session_consumed_size_condition_applies_to_channel( - condition, channel_info); - break; default: goto fail; } @@ -1496,23 +1607,27 @@ end: /* * Allocate an empty lttng_session_trigger_list for the session named * 'session_name'. - * - * No ownership of 'session_name' is assumed by the session trigger list. - * It is the caller's responsability to ensure the session name is alive - * for as long as this list is. */ static struct lttng_session_trigger_list *lttng_session_trigger_list_create( const char *session_name, struct cds_lfht *session_triggers_ht) { - struct lttng_session_trigger_list *list; + struct lttng_session_trigger_list *list = NULL; + char *session_name_copy = strdup(session_name); + + if (!session_name_copy) { + PERROR("Failed to allocate session name while building trigger list"); + goto end; + } list = zmalloc(); if (!list) { + PERROR("Failed to allocate session trigger list while building trigger list"); goto end; } - list->session_name = session_name; + + list->session_name = session_name_copy; CDS_INIT_LIST_HEAD(&list->list); cds_lfht_node_init(&list->session_triggers_ht_node); list->session_triggers_ht = session_triggers_ht; @@ -1530,8 +1645,11 @@ end: static void free_session_trigger_list_rcu(struct rcu_head *node) { - free(caa_container_of(node, struct lttng_session_trigger_list, - rcu_node)); + struct lttng_session_trigger_list *list = + caa_container_of(node, struct lttng_session_trigger_list, rcu_node); + + free(list->session_name); + free(list); } static @@ -1583,17 +1701,11 @@ bool trigger_applies_to_session(const struct lttng_trigger *trigger, switch (lttng_condition_get_type(condition)) { case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING: case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED: + case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE: { - enum lttng_condition_status condition_status; const char *condition_session_name; - condition_status = lttng_condition_session_rotation_get_session_name( - condition, &condition_session_name); - if (condition_status != LTTNG_CONDITION_STATUS_OK) { - ERR("Failed to retrieve session rotation condition's session name"); - goto end; - } - + condition_session_name = get_condition_session_name(condition); LTTNG_ASSERT(condition_session_name); applies = !strcmp(condition_session_name, session_name); break; @@ -1608,10 +1720,6 @@ end: /* * Allocate and initialize an lttng_session_trigger_list which contains * all triggers that apply to the session named 'session_name'. - * - * No ownership of 'session_name' is assumed by the session trigger list. - * It is the caller's responsability to ensure the session name is alive - * for as long as this list is. */ static struct lttng_session_trigger_list *lttng_session_trigger_list_build( @@ -1654,39 +1762,22 @@ error: } static -struct session_info *find_or_create_session_info( - struct notification_thread_state *state, - const char *name, uid_t uid, gid_t gid) +struct session_info *create_and_publish_session_info(struct notification_thread_state *state, + uint64_t id, + const char *name, + uid_t uid, + gid_t gid) { struct session_info *session = NULL; - struct cds_lfht_node *node; - struct cds_lfht_iter iter; struct lttng_session_trigger_list *trigger_list; rcu_read_lock(); - cds_lfht_lookup(state->sessions_ht, - hash_key_str(name, lttng_ht_seed), - match_session, - name, - &iter); - node = cds_lfht_iter_get_node(&iter); - if (node) { - DBG("Found session info of session \"%s\" (uid = %i, gid = %i)", - name, uid, gid); - session = caa_container_of(node, struct session_info, - sessions_ht_node); - LTTNG_ASSERT(session->uid == uid); - LTTNG_ASSERT(session->gid == gid); - session_info_get(session); - goto end; - } - trigger_list = lttng_session_trigger_list_build(state, name); if (!trigger_list) { goto error; } - session = session_info_create(name, uid, gid, trigger_list, + session = session_info_create(id, name, uid, gid, trigger_list, state->sessions_ht); if (!session) { ERR("Failed to allocation session info for session \"%s\" (uid = %i, gid = %i)", @@ -1694,11 +1785,16 @@ struct session_info *find_or_create_session_info( lttng_session_trigger_list_destroy(trigger_list); goto error; } + + /* Transferred ownership to the new session. */ trigger_list = NULL; - cds_lfht_add(state->sessions_ht, hash_key_str(name, lttng_ht_seed), - &session->sessions_ht_node); -end: + if (cds_lfht_add_unique(state->sessions_ht, hash_session_info(session), match_session_info, + &id, &session->sessions_ht_node) != &session->sessions_ht_node) { + ERR("Duplicate session found: name = `%s`, id = %" PRIu64, name, id); + goto error; + } + rcu_read_unlock(); return session; error: @@ -1708,11 +1804,12 @@ error: } static -int handle_notification_thread_command_add_channel( - struct notification_thread_state *state, - const char *session_name, uid_t session_uid, gid_t session_gid, - const char *channel_name, enum lttng_domain_type channel_domain, - uint64_t channel_key_int, uint64_t channel_capacity, +int handle_notification_thread_command_add_channel(struct notification_thread_state *state, + uint64_t session_id, + const char *channel_name, + enum lttng_domain_type channel_domain, + uint64_t channel_key_int, + uint64_t channel_capacity, enum lttng_error_code *cmd_result) { struct cds_list_head trigger_list; @@ -1727,16 +1824,17 @@ int handle_notification_thread_command_add_channel( struct cds_lfht_iter iter; struct session_info *session_info = NULL; - DBG("Adding channel %s from session %s, channel key = %" PRIu64 " in %s domain", - channel_name, session_name, channel_key_int, + DBG("Adding channel: channel name = `%s`, session id = %" PRIu64 ", channel key = %" PRIu64 ", domain = %s", + channel_name, session_id, channel_key_int, lttng_domain_type_str(channel_domain)); CDS_INIT_LIST_HEAD(&trigger_list); - session_info = find_or_create_session_info(state, session_name, - session_uid, session_gid); + session_info = get_session_info_by_id(state, session_id); if (!session_info) { - /* Allocation error or an internal error occurred. */ + /* Fatal logic error. */ + ERR("Failed to find session while adding channel: session id = %" PRIu64, + session_id); goto error; } @@ -1802,6 +1900,67 @@ error: return 1; } +static +int handle_notification_thread_command_add_session(struct notification_thread_state *state, + uint64_t session_id, + const char *session_name, + uid_t session_uid, + gid_t session_gid, + enum lttng_error_code *cmd_result) +{ + int ret; + + DBG("Adding session: session name = `%s`, session id = %" PRIu64 ", session uid = %d, session gid = %d", + session_name, session_id, session_uid, session_gid); + + auto session = create_and_publish_session_info(state, session_id, session_name, session_uid, session_gid); + if (!session) { + PERROR("Failed to add session: session name = `%s`, session id = %" PRIu64 ", session uid = %d, session gid = %d", + session_name, session_id, session_uid, session_gid); + ret = -1; + *cmd_result = LTTNG_ERR_NOMEM; + goto end; + } + + /* + * Note that the reference to `session` is not released; this reference is + * the "global" reference that is used to allow look-ups. This reference will + * only be released when the session is removed. See + * handle_notification_thread_command_remove_session. + */ + ret = 0; + *cmd_result = LTTNG_OK; +end: + return ret; +} + +static +int handle_notification_thread_command_remove_session( + struct notification_thread_state *state, + uint64_t session_id, + enum lttng_error_code *cmd_result) +{ + int ret; + + DBG("Removing session: session id = %" PRIu64, session_id); + + auto session = get_session_info_by_id(state, session_id); + if (!session) { + ERR("Failed to remove session: session id = %" PRIu64, session_id); + ret = -1; + *cmd_result = LTTNG_ERR_NO_SESSION; + goto end; + } + + /* Release the reference returned by the look-up, and then release the global reference. */ + session_info_put(session); + session_info_put(session); + ret = 0; + *cmd_result = LTTNG_OK; +end: + return ret; +} + static void free_channel_trigger_list_rcu(struct rcu_head *node) { @@ -1902,7 +2061,7 @@ static int handle_notification_thread_command_session_rotation( struct notification_thread_state *state, enum notification_thread_command_type cmd_type, - const char *session_name, uid_t session_uid, gid_t session_gid, + uint64_t session_id, uint64_t trace_archive_chunk_id, struct lttng_trace_archive_location *location, enum lttng_error_code *_cmd_result) @@ -1912,77 +2071,75 @@ int handle_notification_thread_command_session_rotation( struct lttng_session_trigger_list *trigger_list; struct lttng_trigger_list_element *trigger_list_element; struct session_info *session_info; - const struct lttng_credentials session_creds = { - .uid = LTTNG_OPTIONAL_INIT_VALUE(session_uid), - .gid = LTTNG_OPTIONAL_INIT_VALUE(session_gid), - }; + struct lttng_credentials session_creds; + struct session_state_sample new_session_state; rcu_read_lock(); - session_info = find_or_create_session_info(state, session_name, - session_uid, session_gid); + session_info = get_session_info_by_id(state, session_id); if (!session_info) { - /* Allocation error or an internal error occurred. */ + /* Fatal logic error. */ + ERR("Failed to find session while handling rotation state change: session id = %" PRIu64, + session_id); ret = -1; - cmd_result = LTTNG_ERR_NOMEM; + cmd_result = LTTNG_ERR_FATAL; goto end; } - session_info->rotation.ongoing = - cmd_type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING; - session_info->rotation.id = trace_archive_chunk_id; - trigger_list = get_session_trigger_list(state, session_name); - if (!trigger_list) { - DBG("No triggers applying to session \"%s\" found", - session_name); - goto end; + new_session_state = session_info->last_state_sample; + if (location) { + lttng_trace_archive_location_get(location); + new_session_state.rotation.location = location; + } else { + new_session_state.rotation.location = NULL; } + session_creds = { + .uid = LTTNG_OPTIONAL_INIT_VALUE(session_info->uid), + .gid = LTTNG_OPTIONAL_INIT_VALUE(session_info->gid), + }; + + new_session_state.rotation.ongoing = cmd_type == + NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING; + new_session_state.rotation.id = trace_archive_chunk_id; + + trigger_list = get_session_trigger_list(state, session_info->name); + LTTNG_ASSERT(trigger_list); + cds_list_for_each_entry(trigger_list_element, &trigger_list->list, node) { const struct lttng_condition *condition; struct lttng_trigger *trigger; struct notification_client_list *client_list; struct lttng_evaluation *evaluation = NULL; - enum lttng_condition_type condition_type; enum action_executor_status executor_status; trigger = trigger_list_element->trigger; condition = lttng_trigger_get_const_condition(trigger); LTTNG_ASSERT(condition); - condition_type = lttng_condition_get_type(condition); - if (condition_type == LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING && - cmd_type != NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING) { - continue; - } else if (condition_type == LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED && - cmd_type != NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED) { - continue; - } - - client_list = get_client_list_from_condition(state, condition); - if (cmd_type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING) { - evaluation = lttng_evaluation_session_rotation_ongoing_create( - trace_archive_chunk_id); - } else { - evaluation = lttng_evaluation_session_rotation_completed_create( - trace_archive_chunk_id, location); + ret = evaluate_session_condition( + condition, session_info, &new_session_state, &evaluation); + if (ret) { + ret = -1; + cmd_result = LTTNG_ERR_NOMEM; + goto end; } if (!evaluation) { - /* Internal error */ - ret = -1; - cmd_result = LTTNG_ERR_UNK; - goto put_list; + continue; } /* * Ownership of `evaluation` transferred to the action executor - * no matter the result. + * no matter the result. The callee acquires a reference to the + * client list: we can release our own. */ + client_list = get_client_list_from_condition(state, condition); executor_status = action_executor_enqueue_trigger( state->executor, trigger, evaluation, &session_creds, client_list); + notification_client_list_put(client_list); evaluation = NULL; switch (executor_status) { case ACTION_EXECUTOR_STATUS_OK: @@ -1995,7 +2152,7 @@ int handle_notification_thread_command_session_rotation( */ ERR("Fatal error occurred while enqueuing action associated with session rotation trigger"); ret = -1; - goto put_list; + goto end; case ACTION_EXECUTOR_STATUS_OVERFLOW: /* * TODO Add trigger identification (name/id) when @@ -2005,18 +2162,19 @@ int handle_notification_thread_command_session_rotation( */ WARN("No space left when enqueuing action associated with session rotation trigger"); ret = 0; - goto put_list; + goto end; default: abort(); } - -put_list: - notification_client_list_put(client_list); - if (caa_unlikely(ret)) { - break; - } } + end: + if (session_info) { + /* Ownership of new_session_state::location is transferred. */ + lttng_trace_archive_location_put(session_info->last_state_sample.rotation.location); + session_info->last_state_sample = new_session_state; + } + session_info_put(session_info); *_cmd_result = cmd_result; rcu_read_unlock(); @@ -2435,25 +2593,7 @@ int bind_trigger_to_matching_session(struct lttng_trigger *trigger, ASSERT_RCU_READ_LOCKED(); condition = lttng_trigger_get_const_condition(trigger); - switch (lttng_condition_get_type(condition)) { - case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING: - case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED: - { - enum lttng_condition_status status; - - status = lttng_condition_session_rotation_get_session_name( - condition, &session_name); - if (status != LTTNG_CONDITION_STATUS_OK) { - ERR("Failed to bind trigger to session: unable to get 'session_rotation' condition's session name"); - ret = -1; - goto end; - } - break; - } - default: - ret = -1; - goto end; - } + session_name = get_condition_session_name(condition); trigger_list = get_session_trigger_list(state, session_name); if (!trigger_list) { @@ -2825,6 +2965,7 @@ int handle_notification_thread_command_register_trigger( client_list = notification_client_list_create(state, condition); if (!client_list) { ERR("Error creating notification client list for trigger %s", trigger->name); + ret = -1; goto error_free_ht_element; } } @@ -2900,12 +3041,25 @@ int handle_notification_thread_command_register_trigger( */ switch (get_condition_binding_object(condition)) { case LTTNG_OBJECT_TYPE_SESSION: - ret = evaluate_session_condition_for_client(condition, state, - &evaluation, &object_uid, - &object_gid); - LTTNG_OPTIONAL_SET(&object_creds.uid, object_uid); - LTTNG_OPTIONAL_SET(&object_creds.gid, object_gid); + { + /* Find the session associated with the condition. */ + const auto *session_name = get_condition_session_name(condition); + auto session_info = get_session_info_by_name(state, session_name); + if (!session_info) { + /* Not an error, the session doesn't exist yet. */ + DBG("Session not found while evaluating session condition during registration of trigger: session name = `%s`", + session_name); + ret = 0; + goto success; + } + + LTTNG_OPTIONAL_SET(&object_creds.uid, session_info->uid); + LTTNG_OPTIONAL_SET(&object_creds.gid, session_info->gid); + + ret = evaluate_session_condition(condition, session_info, NULL, &evaluation); + session_info_put(session_info); break; + } case LTTNG_OBJECT_TYPE_CHANNEL: ret = evaluate_channel_condition_for_client(condition, state, &evaluation, &object_uid, @@ -3038,6 +3192,35 @@ void teardown_tracer_notifier(struct notification_thread_state *state, } } +static +void remove_trigger_from_session_trigger_list( + struct lttng_session_trigger_list *trigger_list, + const struct lttng_trigger *trigger) +{ + bool found = false; + struct lttng_trigger_list_element *trigger_element, *tmp; + + cds_list_for_each_entry_safe (trigger_element, tmp, &trigger_list->list, node) { + if (!lttng_trigger_is_equal(trigger, trigger_element->trigger)) { + continue; + } + + DBG("Removed trigger from session_triggers_ht"); + cds_list_del(&trigger_element->node); + free(trigger_element); + /* A trigger can only appear once per session. */ + found = true; + break; + } + + if (!found) { + ERR("Failed to find trigger associated with session: session name = `%s`", + trigger_list->session_name); + } + + LTTNG_ASSERT(found); +} + static int handle_notification_thread_command_unregister_trigger( struct notification_thread_state *state, @@ -3046,7 +3229,6 @@ int handle_notification_thread_command_unregister_trigger( { struct cds_lfht_iter iter; struct cds_lfht_node *triggers_ht_node; - struct lttng_channel_trigger_list *trigger_list; struct notification_client_list *client_list; struct lttng_trigger_ht_element *trigger_ht_element = NULL; const struct lttng_condition *condition = lttng_trigger_get_const_condition( @@ -3071,22 +3253,57 @@ int handle_notification_thread_command_unregister_trigger( trigger_ht_element = caa_container_of(triggers_ht_node, struct lttng_trigger_ht_element, node); - /* Remove trigger from channel_triggers_ht. */ - cds_lfht_for_each_entry(state->channel_triggers_ht, &iter, trigger_list, - channel_triggers_ht_node) { - struct lttng_trigger_list_element *trigger_element, *tmp; + switch (get_condition_binding_object(condition)) { + case LTTNG_OBJECT_TYPE_CHANNEL: + { + struct lttng_channel_trigger_list *trigger_list; - cds_list_for_each_entry_safe(trigger_element, tmp, - &trigger_list->list, node) { - if (!lttng_trigger_is_equal(trigger, trigger_element->trigger)) { - continue; + /* + * Remove trigger from channel_triggers_ht. + * + * Note that multiple channels may have matched the trigger's + * condition (e.g. all instances of a given channel in per-pid buffering + * mode). + * + * Iterate on all lists since we don't know the target channels' keys. + */ + cds_lfht_for_each_entry(state->channel_triggers_ht, &iter, trigger_list, + channel_triggers_ht_node) { + struct lttng_trigger_list_element *trigger_element, *tmp; + + cds_list_for_each_entry_safe( + trigger_element, tmp, &trigger_list->list, node) { + if (!lttng_trigger_is_equal(trigger, trigger_element->trigger)) { + continue; + } + + DBG("Removed trigger from channel_triggers_ht"); + cds_list_del(&trigger_element->node); + free(trigger_element); + /* A trigger can only appear once per channel */ + break; } + } + break; + } + case LTTNG_OBJECT_TYPE_SESSION: + { + auto session = get_session_info_by_name( + state, get_condition_session_name(condition)); - DBG("Removed trigger from channel_triggers_ht"); - cds_list_del(&trigger_element->node); - /* A trigger can only appear once per channel */ + /* Session doesn't exist, no trigger to remove. */ + if (!session) { break; } + + auto session_trigger_list = get_session_trigger_list(state, session->name); + remove_trigger_from_session_trigger_list(session_trigger_list, trigger); + session_info_put(session); + } + case LTTNG_OBJECT_TYPE_NONE: + break; + default: + abort(); } if (lttng_trigger_needs_tracer_notifier(trigger)) { @@ -3180,9 +3397,7 @@ int handle_notification_thread_command( case NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL: ret = handle_notification_thread_command_add_channel( state, - cmd->parameters.add_channel.session.name, - cmd->parameters.add_channel.session.uid, - cmd->parameters.add_channel.session.gid, + cmd->parameters.add_channel.session.id, cmd->parameters.add_channel.channel.name, cmd->parameters.add_channel.channel.domain, cmd->parameters.add_channel.channel.key, @@ -3195,14 +3410,23 @@ int handle_notification_thread_command( cmd->parameters.remove_channel.domain, &cmd->reply_code); break; + case NOTIFICATION_COMMAND_TYPE_ADD_SESSION: + ret = handle_notification_thread_command_add_session(state, + cmd->parameters.add_session.session_id, + cmd->parameters.add_session.session_name, + cmd->parameters.add_session.session_uid, + cmd->parameters.add_session.session_gid, &cmd->reply_code); + break; + case NOTIFICATION_COMMAND_TYPE_REMOVE_SESSION: + ret = handle_notification_thread_command_remove_session( + state, cmd->parameters.remove_session.session_id, &cmd->reply_code); + break; case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING: case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED: ret = handle_notification_thread_command_session_rotation( state, cmd->type, - cmd->parameters.session_rotation.session_name, - cmd->parameters.session_rotation.uid, - cmd->parameters.session_rotation.gid, + cmd->parameters.session_rotation.session_id, cmd->parameters.session_rotation.trace_archive_chunk_id, cmd->parameters.session_rotation.location, &cmd->reply_code); @@ -3748,14 +3972,14 @@ int client_send_command_reply(struct notification_client *client, struct lttng_notification_channel_command_reply reply = { .status = (int8_t) status, }; - struct lttng_notification_channel_message msg = { - .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY, - .size = sizeof(reply), - .fds = 0, - }; + struct lttng_notification_channel_message msg; char buffer[sizeof(msg) + sizeof(reply)]; enum client_transmission_status transmission_status; + msg.type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY; + msg.size = sizeof(reply); + msg.fds = 0; + memcpy(buffer, &msg, sizeof(msg)); memcpy(buffer + sizeof(msg), &reply, sizeof(reply)); DBG("Send command reply (%i)", (int) status); @@ -3853,15 +4077,15 @@ int client_handle_message_handshake(struct notification_client *client, .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR, .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR, }; - const struct lttng_notification_channel_message msg_header = { - .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE, - .size = sizeof(handshake_reply), - .fds = 0, - }; + struct lttng_notification_channel_message msg_header; enum lttng_notification_channel_status status = LTTNG_NOTIFICATION_CHANNEL_STATUS_OK; char send_buffer[sizeof(msg_header) + sizeof(handshake_reply)]; + msg_header.type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE; + msg_header.size = sizeof(handshake_reply); + msg_header.fds = 0; + memcpy(send_buffer, &msg_header, sizeof(msg_header)); memcpy(send_buffer + sizeof(msg_header), &handshake_reply, sizeof(handshake_reply)); @@ -4207,9 +4431,8 @@ bool evaluate_buffer_usage_condition(const struct lttng_condition *condition, bool result = false; uint64_t threshold; enum lttng_condition_type condition_type; - const struct lttng_condition_buffer_usage *use_condition = container_of( - condition, struct lttng_condition_buffer_usage, - parent); + const struct lttng_condition_buffer_usage *use_condition = lttng::utils::container_of( + condition, <tng_condition_buffer_usage::parent); if (use_condition->threshold_bytes.set) { threshold = use_condition->threshold_bytes.value; @@ -4258,31 +4481,12 @@ bool evaluate_buffer_usage_condition(const struct lttng_condition *condition, return result; } -static -bool evaluate_session_consumed_size_condition( - const struct lttng_condition *condition, - uint64_t session_consumed_size) -{ - uint64_t threshold; - const struct lttng_condition_session_consumed_size *size_condition = - container_of(condition, - struct lttng_condition_session_consumed_size, - parent); - - threshold = size_condition->consumed_threshold_bytes.value; - DBG("Session consumed size condition being evaluated: threshold = %" PRIu64 ", current size = %" PRIu64, - threshold, session_consumed_size); - return session_consumed_size >= threshold; -} - static int evaluate_buffer_condition(const struct lttng_condition *condition, struct lttng_evaluation **evaluation, const struct notification_thread_state *state __attribute__((unused)), const struct channel_state_sample *previous_sample, const struct channel_state_sample *latest_sample, - uint64_t previous_session_consumed_total, - uint64_t latest_session_consumed_total, struct channel_info *channel_info) { int ret = 0; @@ -4305,18 +4509,6 @@ int evaluate_buffer_condition(const struct lttng_condition *condition, condition, latest_sample, channel_info->capacity); break; - case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE: - if (caa_likely(previous_sample_available)) { - previous_sample_result = - evaluate_session_consumed_size_condition( - condition, - previous_session_consumed_total); - } - latest_sample_result = - evaluate_session_consumed_size_condition( - condition, - latest_session_consumed_total); - break; default: /* Unknown condition type; internal error. */ abort(); @@ -4345,10 +4537,6 @@ int evaluate_buffer_condition(const struct lttng_condition *condition, latest_sample->highest_usage, channel_info->capacity); break; - case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE: - *evaluation = lttng_evaluation_session_consumed_size_create( - latest_session_consumed_total); - break; default: abort(); } @@ -4365,11 +4553,11 @@ static int client_notification_overflow(struct notification_client *client) { int ret = 0; - const struct lttng_notification_channel_message msg = { - .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED, - .size = 0, - .fds = 0, - }; + struct lttng_notification_channel_message msg; + + msg.type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED; + msg.size = 0; + msg.fds = 0; ASSERT_LOCKED(client->lock); @@ -4471,16 +4659,16 @@ int notification_client_list_send_evaluation( .trigger = (struct lttng_trigger *) trigger, .evaluation = (struct lttng_evaluation *) evaluation, }; - struct lttng_notification_channel_message msg_header = { - .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION, - .size = 0, - .fds = 0, - }; + struct lttng_notification_channel_message msg_header; const struct lttng_credentials *trigger_creds = lttng_trigger_get_credentials(trigger); lttng_payload_init(&msg_payload); + msg_header.type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION; + msg_header.size = 0; + msg_header.fds = 0; + ret = lttng_dynamic_buffer_append(&msg_payload.buffer, &msg_header, sizeof(msg_header)); if (ret) { @@ -4750,10 +4938,9 @@ int dispatch_one_event_notifier_notification(struct notification_thread_state *s } evaluation = lttng_evaluation_event_rule_matches_create( - container_of(lttng_trigger_get_const_condition( + lttng::utils::container_of(lttng_trigger_get_const_condition( element->trigger), - struct lttng_condition_event_rule_matches, - parent), + <tng_condition_event_rule_matches::parent), notification->capture_buffer, notification->capture_buf_size, false); @@ -4878,15 +5065,18 @@ int handle_notification_thread_channel_sample( { int ret = 0; struct lttcomm_consumer_channel_monitor_msg sample_msg; - struct channel_info *channel_info; + struct channel_info *channel_info = NULL; struct cds_lfht_node *node; struct cds_lfht_iter iter; - struct lttng_channel_trigger_list *trigger_list; + struct lttng_channel_trigger_list *channel_trigger_list; + struct lttng_session_trigger_list *session_trigger_list; struct lttng_trigger_list_element *trigger_list_element; bool previous_sample_available = false; - struct channel_state_sample previous_sample, latest_sample; - uint64_t previous_session_consumed_total, latest_session_consumed_total; - struct lttng_credentials channel_creds; + struct channel_state_sample channel_previous_sample, channel_new_sample; + struct session_state_sample session_new_sample; + struct lttng_credentials channel_creds = {}; + struct lttng_credentials session_creds = {}; + struct session_info *session; /* * The monitoring pipe only holds messages smaller than PIPE_BUF, @@ -4901,19 +5091,95 @@ int handle_notification_thread_channel_sample( } ret = 0; - latest_sample.key.key = sample_msg.key; - latest_sample.key.domain = domain; - latest_sample.highest_usage = sample_msg.highest; - latest_sample.lowest_usage = sample_msg.lowest; - latest_sample.channel_total_consumed = sample_msg.total_consumed; + channel_new_sample.key.key = sample_msg.key; + channel_new_sample.key.domain = domain; + channel_new_sample.highest_usage = sample_msg.highest; + channel_new_sample.lowest_usage = sample_msg.lowest; rcu_read_lock(); + session = get_session_info_by_id(state, sample_msg.session_id); + if (!session) { + DBG("Received a sample for an unknown session from consumerd: session id = %" PRIu64, + sample_msg.session_id); + goto end_unlock; + } + + session_new_sample = session->last_state_sample; + session_new_sample.consumed_data_size += sample_msg.consumed_since_last_sample; + session_creds = { + .uid = LTTNG_OPTIONAL_INIT_VALUE(session->uid), + .gid = LTTNG_OPTIONAL_INIT_VALUE(session->gid), + }; + + session_trigger_list = get_session_trigger_list(state, session->name); + LTTNG_ASSERT(session_trigger_list); + cds_list_for_each_entry(trigger_list_element, &session_trigger_list->list, + node) { + const struct lttng_condition *condition; + struct lttng_trigger *trigger; + struct notification_client_list *client_list = NULL; + struct lttng_evaluation *evaluation = NULL; + enum action_executor_status executor_status; + + ret = 0; + trigger = trigger_list_element->trigger; + condition = lttng_trigger_get_const_condition(trigger); + LTTNG_ASSERT(condition); + + ret = evaluate_session_condition( + condition, session, &session_new_sample, &evaluation); + if (caa_unlikely(ret)) { + break; + } + + if (caa_likely(!evaluation)) { + continue; + } + + /* + * Ownership of `evaluation` transferred to the action executor + * no matter the result. The callee acquires a reference to the + * client list: we can release our own. + */ + client_list = get_client_list_from_condition(state, condition); + executor_status = action_executor_enqueue_trigger( + state->executor, trigger, evaluation, + &session_creds, client_list); + notification_client_list_put(client_list); + evaluation = NULL; + switch (executor_status) { + case ACTION_EXECUTOR_STATUS_OK: + break; + case ACTION_EXECUTOR_STATUS_ERROR: + case ACTION_EXECUTOR_STATUS_INVALID: + /* + * TODO Add trigger identification (name/id) when + * it is added to the API. + */ + ERR("Fatal error occurred while enqueuing action associated with buffer-condition trigger"); + ret = -1; + goto end_unlock; + case ACTION_EXECUTOR_STATUS_OVERFLOW: + /* + * TODO Add trigger identification (name/id) when + * it is added to the API. + * + * Not a fatal error. + */ + WARN("No space left when enqueuing action associated with buffer-condition trigger"); + ret = 0; + goto end_unlock; + default: + abort(); + } + } + /* Retrieve the channel's informations */ cds_lfht_lookup(state->channels_ht, - hash_channel_key(&latest_sample.key), + hash_channel_key(&channel_new_sample.key), match_channel_info, - &latest_sample.key, + &channel_new_sample.key, &iter); node = cds_lfht_iter_get_node(&iter); if (caa_unlikely(!node)) { @@ -4924,28 +5190,26 @@ int handle_notification_thread_channel_sample( * sample. */ DBG("Received a sample for an unknown channel from consumerd, key = %" PRIu64 " in %s domain", - latest_sample.key.key, + channel_new_sample.key.key, lttng_domain_type_str(domain)); goto end_unlock; } + channel_info = caa_container_of(node, struct channel_info, channels_ht_node); - DBG("Handling channel sample for channel %s (key = %" PRIu64 ") in session %s (highest usage = %" PRIu64 ", lowest usage = %" PRIu64", total consumed = %" PRIu64")", + DBG("Handling channel sample for channel %s (key = %" PRIu64 ") in session %s (highest usage = %" PRIu64 ", lowest usage = %" PRIu64", consumed since last sample = %" PRIu64")", channel_info->name, - latest_sample.key.key, + channel_new_sample.key.key, channel_info->session_info->name, - latest_sample.highest_usage, - latest_sample.lowest_usage, - latest_sample.channel_total_consumed); - - previous_session_consumed_total = - channel_info->session_info->consumed_data_size; + channel_new_sample.highest_usage, + channel_new_sample.lowest_usage, + sample_msg.consumed_since_last_sample); /* Retrieve the channel's last sample, if it exists, and update it. */ cds_lfht_lookup(state->channel_state_ht, - hash_channel_key(&latest_sample.key), + hash_channel_key(&channel_new_sample.key), match_channel_state_sample, - &latest_sample.key, + &channel_new_sample.key, &iter); node = cds_lfht_iter_get_node(&iter); if (caa_likely(node)) { @@ -4956,16 +5220,11 @@ int handle_notification_thread_channel_sample( struct channel_state_sample, channel_state_ht_node); - memcpy(&previous_sample, stored_sample, - sizeof(previous_sample)); - stored_sample->highest_usage = latest_sample.highest_usage; - stored_sample->lowest_usage = latest_sample.lowest_usage; - stored_sample->channel_total_consumed = latest_sample.channel_total_consumed; + memcpy(&channel_previous_sample, stored_sample, + sizeof(channel_previous_sample)); + stored_sample->highest_usage = channel_new_sample.highest_usage; + stored_sample->lowest_usage = channel_new_sample.lowest_usage; previous_sample_available = true; - - latest_session_consumed_total = - previous_session_consumed_total + - (latest_sample.channel_total_consumed - previous_sample.channel_total_consumed); } else { /* * This is the channel's first sample, allocate space for and @@ -4979,39 +5238,30 @@ int handle_notification_thread_channel_sample( goto end_unlock; } - memcpy(stored_sample, &latest_sample, sizeof(*stored_sample)); + memcpy(stored_sample, &channel_new_sample, sizeof(*stored_sample)); cds_lfht_node_init(&stored_sample->channel_state_ht_node); cds_lfht_add(state->channel_state_ht, hash_channel_key(&stored_sample->key), &stored_sample->channel_state_ht_node); - - latest_session_consumed_total = - previous_session_consumed_total + - latest_sample.channel_total_consumed; } - channel_info->session_info->consumed_data_size = - latest_session_consumed_total; - /* Find triggers associated with this channel. */ cds_lfht_lookup(state->channel_triggers_ht, - hash_channel_key(&latest_sample.key), + hash_channel_key(&channel_new_sample.key), match_channel_trigger_list, - &latest_sample.key, + &channel_new_sample.key, &iter); node = cds_lfht_iter_get_node(&iter); - if (caa_likely(!node)) { - goto end_unlock; - } + LTTNG_ASSERT(node); channel_creds = (typeof(channel_creds)) { .uid = LTTNG_OPTIONAL_INIT_VALUE(channel_info->session_info->uid), .gid = LTTNG_OPTIONAL_INIT_VALUE(channel_info->session_info->gid), }; - trigger_list = caa_container_of(node, struct lttng_channel_trigger_list, + channel_trigger_list = caa_container_of(node, struct lttng_channel_trigger_list, channel_triggers_ht_node); - cds_list_for_each_entry(trigger_list_element, &trigger_list->list, + cds_list_for_each_entry(trigger_list_element, &channel_trigger_list->list, node) { const struct lttng_condition *condition; struct lttng_trigger *trigger; @@ -5024,33 +5274,28 @@ int handle_notification_thread_channel_sample( condition = lttng_trigger_get_const_condition(trigger); LTTNG_ASSERT(condition); - /* - * Check if any client is subscribed to the result of this - * evaluation. - */ - client_list = get_client_list_from_condition(state, condition); - ret = evaluate_buffer_condition(condition, &evaluation, state, - previous_sample_available ? &previous_sample : NULL, - &latest_sample, - previous_session_consumed_total, - latest_session_consumed_total, + previous_sample_available ? &channel_previous_sample : NULL, + &channel_new_sample, channel_info); if (caa_unlikely(ret)) { - goto put_list; + break; } if (caa_likely(!evaluation)) { - goto put_list; + continue; } /* * Ownership of `evaluation` transferred to the action executor - * no matter the result. + * no matter the result. The callee acquires a reference to the + * client list: we can release our own. */ + client_list = get_client_list_from_condition(state, condition); executor_status = action_executor_enqueue_trigger( state->executor, trigger, evaluation, &channel_creds, client_list); + notification_client_list_put(client_list); evaluation = NULL; switch (executor_status) { case ACTION_EXECUTOR_STATUS_OK: @@ -5063,7 +5308,7 @@ int handle_notification_thread_channel_sample( */ ERR("Fatal error occurred while enqueuing action associated with buffer-condition trigger"); ret = -1; - goto put_list; + goto end_unlock; case ACTION_EXECUTOR_STATUS_OVERFLOW: /* * TODO Add trigger identification (name/id) when @@ -5073,18 +5318,16 @@ int handle_notification_thread_channel_sample( */ WARN("No space left when enqueuing action associated with buffer-condition trigger"); ret = 0; - goto put_list; + goto end_unlock; default: abort(); } - -put_list: - notification_client_list_put(client_list); - if (caa_unlikely(ret)) { - break; - } } end_unlock: + if (session) { + session->last_state_sample = session_new_sample; + } + session_info_put(session); rcu_read_unlock(); end: return ret;