X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Faction-executor.c;fp=src%2Fbin%2Flttng-sessiond%2Faction-executor.c;h=8e3f79813205ce3b2c4f5c60ec4015cc1e0b75a8;hp=f150dbb44af438b37aac1734ebffd85d116e1beb;hb=72365501d3148ca977a09bad8de0ec51b427bdd8;hpb=5c5373c3cbfebddb0068fe13600766bb381048da diff --git a/src/bin/lttng-sessiond/action-executor.c b/src/bin/lttng-sessiond/action-executor.c index f150dbb44..8e3f79813 100644 --- a/src/bin/lttng-sessiond/action-executor.c +++ b/src/bin/lttng-sessiond/action-executor.c @@ -12,6 +12,7 @@ #include "notification-thread-internal.h" #include "session.h" #include "thread.h" +#include #include #include #include @@ -35,8 +36,50 @@ #define THREAD_NAME "Action Executor" #define MAX_QUEUED_WORK_COUNT 8192 +/* + * A work item is composed of a dynamic array of sub-items which + * represent a flattened, and augmented, version of a trigger's actions. + * + * We cannot rely solely on the trigger's actions since each action can have an + * execution context we need to comply with. + * + * The notion of execution context is required since for some actions the + * associated object are referenced by name and not by id. This can lead to + * a number of ambiguities when executing an action work item. + * + * For example, let's take a simple trigger such as: + * - condition: ust event a + * - action: start session S + * + * At time T, session S exists. + * At T + 1, the event A is hit. + * At T + 2, the tracer event notification is received and the work item is + * queued. Here session S have an id of 1. + * At T + 3, the session S is destroyed and a new session S is created, with a + * resulting id of 200. + * At T +4, the work item is popped from the queue and begin execution and will + * start session S with an id of 200 instead of the session S id 1 that was + * present at the queuing phase. + * + * The context to be respected is the one when the work item is queued. If the + * execution context is not the same at the moment of execution, we skip the + * execution of that sub-item. + * + * It is the same policy in regards to the validity of the associated + * trigger object at the moment of execution, if the trigger is found to be + * unregistered, the execution is skipped. + */ + struct action_work_item { uint64_t id; + + /* + * The actions to be executed with their respective execution context. + * See struct `action_work_subitem`. + */ + struct lttng_dynamic_array *subitems; + + /* Execution context data */ struct lttng_trigger *trigger; struct lttng_evaluation *evaluation; struct notification_client_list *client_list; @@ -44,6 +87,14 @@ struct action_work_item { struct cds_list_head list_node; }; +struct action_work_subitem { + struct lttng_action *action; + struct { + /* Used by actions targeting a session. */ + LTTNG_OPTIONAL(uint64_t) session_id; + } context; +}; + struct action_executor { struct lttng_thread *thread; struct notification_thread_handle *notification_thread_handle; @@ -63,33 +114,33 @@ struct action_executor { */ typedef int (*action_executor_handler)(struct action_executor *executor, const struct action_work_item *, - struct lttng_action *action); + struct action_work_subitem *item); static int action_executor_notify_handler(struct action_executor *executor, const struct action_work_item *, - struct lttng_action *); + struct action_work_subitem *); static int action_executor_start_session_handler( struct action_executor *executor, const struct action_work_item *, - struct lttng_action *); + struct action_work_subitem *); static int action_executor_stop_session_handler( struct action_executor *executor, const struct action_work_item *, - struct lttng_action *); + struct action_work_subitem *); static int action_executor_rotate_session_handler( struct action_executor *executor, const struct action_work_item *, - struct lttng_action *); + struct action_work_subitem *); static int action_executor_snapshot_session_handler( struct action_executor *executor, const struct action_work_item *, - struct lttng_action *); + struct action_work_subitem *); static int action_executor_group_handler(struct action_executor *executor, const struct action_work_item *, - struct lttng_action *); + struct action_work_subitem *); static int action_executor_generic_handler(struct action_executor *executor, const struct action_work_item *, - struct lttng_action *); + struct action_work_subitem *); static const action_executor_handler action_executors[] = { [LTTNG_ACTION_TYPE_NOTIFY] = action_executor_notify_handler, @@ -100,6 +151,20 @@ static const action_executor_handler action_executors[] = { [LTTNG_ACTION_TYPE_GROUP] = action_executor_group_handler, }; +/* Forward declaration */ +static int add_action_to_subitem_array(struct lttng_action *action, + struct lttng_dynamic_array *subitems); + +static int populate_subitem_array_from_trigger(struct lttng_trigger *trigger, + struct lttng_dynamic_array *subitems); + +static void action_work_subitem_destructor(void *element) +{ + struct action_work_subitem *subitem = element; + + lttng_action_put(subitem->action); +} + static const char *get_action_name(const struct lttng_action *action) { const enum lttng_action_type action_type = lttng_action_get_type(action); @@ -190,7 +255,7 @@ end: static int action_executor_notify_handler(struct action_executor *executor, const struct action_work_item *work_item, - struct lttng_action *action) + struct action_work_subitem *item) { return notification_client_list_send_evaluation(work_item->client_list, work_item->trigger, @@ -204,13 +269,14 @@ static int action_executor_notify_handler(struct action_executor *executor, static int action_executor_start_session_handler( struct action_executor *executor, const struct action_work_item *work_item, - struct lttng_action *action) + struct action_work_subitem *item) { int ret = 0; const char *session_name; enum lttng_action_status action_status; struct ltt_session *session; enum lttng_error_code cmd_ret; + struct lttng_action *action = item->action; action_status = lttng_action_start_session_get_session_name( action, &session_name); @@ -221,6 +287,19 @@ static int action_executor_start_session_handler( goto end; } + /* + * Validate if at the moment of the action was queued the session + * existed. If not skip the action altogether. + */ + if (!item->context.session_id.is_set) { + DBG("Session `%s` was not present at the moment the work item was enqueued for %s` action of trigger `%s`", + session_name, get_action_name(action), + get_trigger_name(work_item->trigger)); + lttng_action_increase_execution_failure_count(action); + ret = 0; + goto end; + } + session_lock_list(); session = session_find_by_name(session_name); if (!session) { @@ -230,6 +309,22 @@ static int action_executor_start_session_handler( goto error_unlock_list; } + /* + * Check if the session id is the same as when the work item was + * enqueued. + */ + if (session->id != LTTNG_OPTIONAL_GET(item->context.session_id)) { + DBG("Session id for session `%s` (id: %" PRIu64 + " is not the same that was sampled (id: %" PRIu64 + " at the moment the work item was enqueued for %s` action of trigger `%s`", + session_name, session->id, + LTTNG_OPTIONAL_GET(item->context.session_id), + get_action_name(action), + get_trigger_name(work_item->trigger)); + ret = 0; + goto error_unlock_list; + } + session_lock(session); if (!is_trigger_allowed_for_session(work_item->trigger, session)) { goto error_dispose_session; @@ -265,13 +360,14 @@ end: static int action_executor_stop_session_handler( struct action_executor *executor, const struct action_work_item *work_item, - struct lttng_action *action) + struct action_work_subitem *item) { int ret = 0; const char *session_name; enum lttng_action_status action_status; struct ltt_session *session; enum lttng_error_code cmd_ret; + struct lttng_action *action = item->action; action_status = lttng_action_stop_session_get_session_name( action, &session_name); @@ -282,6 +378,19 @@ static int action_executor_stop_session_handler( goto end; } + /* + * Validate if, at the moment the action was queued, the target session + * existed. If not, skip the action altogether. + */ + if (!item->context.session_id.is_set) { + DBG("Session `%s` was not present at the moment the work item was enqueued for %s` action of trigger `%s`", + session_name, get_action_name(action), + get_trigger_name(work_item->trigger)); + lttng_action_increase_execution_failure_count(action); + ret = 0; + goto end; + } + session_lock_list(); session = session_find_by_name(session_name); if (!session) { @@ -292,6 +401,22 @@ static int action_executor_stop_session_handler( goto error_unlock_list; } + /* + * Check if the session id is the same as when the work item was + * enqueued + */ + if (session->id != LTTNG_OPTIONAL_GET(item->context.session_id)) { + DBG("Session id for session `%s` (id: %" PRIu64 + " is not the same that was sampled (id: %" PRIu64 + " at the moment the work item was enqueued for %s` action of trigger `%s`", + session_name, session->id, + LTTNG_OPTIONAL_GET(item->context.session_id), + get_action_name(action), + get_trigger_name(work_item->trigger)); + ret = 0; + goto error_unlock_list; + } + session_lock(session); if (!is_trigger_allowed_for_session(work_item->trigger, session)) { goto error_dispose_session; @@ -327,13 +452,14 @@ end: static int action_executor_rotate_session_handler( struct action_executor *executor, const struct action_work_item *work_item, - struct lttng_action *action) + struct action_work_subitem *item) { int ret = 0; const char *session_name; enum lttng_action_status action_status; struct ltt_session *session; enum lttng_error_code cmd_ret; + struct lttng_action *action = item->action; action_status = lttng_action_rotate_session_get_session_name( action, &session_name); @@ -344,6 +470,19 @@ static int action_executor_rotate_session_handler( goto end; } + /* + * Validate if, at the moment the action was queued, the target session + * existed. If not, skip the action altogether. + */ + if (!item->context.session_id.is_set) { + DBG("Session `%s` was not present at the moment the work item was enqueued for %s` action of trigger `%s`", + session_name, get_action_name(action), + get_trigger_name(work_item->trigger)); + lttng_action_increase_execution_failure_count(action); + ret = 0; + goto end; + } + session_lock_list(); session = session_find_by_name(session_name); if (!session) { @@ -354,6 +493,22 @@ static int action_executor_rotate_session_handler( goto error_unlock_list; } + /* + * Check if the session id is the same as when the work item was + * enqueued. + */ + if (session->id != LTTNG_OPTIONAL_GET(item->context.session_id)) { + DBG("Session id for session `%s` (id: %" PRIu64 + " is not the same that was sampled (id: %" PRIu64 + " at the moment the work item was enqueued for %s` action of trigger `%s`", + session_name, session->id, + LTTNG_OPTIONAL_GET(item->context.session_id), + get_action_name(action), + get_trigger_name(work_item->trigger)); + ret = 0; + goto error_unlock_list; + } + session_lock(session); if (!is_trigger_allowed_for_session(work_item->trigger, session)) { goto error_dispose_session; @@ -396,7 +551,7 @@ end: static int action_executor_snapshot_session_handler( struct action_executor *executor, const struct action_work_item *work_item, - struct lttng_action *action) + struct action_work_subitem *item) { int ret = 0; const char *session_name; @@ -408,6 +563,20 @@ static int action_executor_snapshot_session_handler( const struct lttng_snapshot_output *snapshot_output = &default_snapshot_output; enum lttng_error_code cmd_ret; + struct lttng_action *action = item->action; + + /* + * Validate if, at the moment the action was queued, the target session + * existed. If not, skip the action altogether. + */ + if (!item->context.session_id.is_set) { + DBG("Session `%s` was not present at the moment the work item was enqueued for %s` action of trigger `%s`", + session_name, get_action_name(action), + get_trigger_name(work_item->trigger)); + lttng_action_increase_execution_failure_count(action); + ret = 0; + goto end; + } action_status = lttng_action_snapshot_session_get_session_name( action, &session_name); @@ -438,6 +607,21 @@ static int action_executor_snapshot_session_handler( goto error_unlock_list; } + /* + * Check if the session id is the same as when the work item was + * enqueued. + */ + if (session->id != LTTNG_OPTIONAL_GET(item->context.session_id)) { + DBG("Session id for session `%s` (id: %" PRIu64 + " is not the same that was sampled (id: %" PRIu64 + " at the moment the work item was enqueued for %s` action of trigger `%s`", + session_name, session->id, + LTTNG_OPTIONAL_GET(item->context.session_id), + get_action_name(action), + get_trigger_name(work_item->trigger)); + ret = 0; + goto error_unlock_list; + } session_lock(session); if (!is_trigger_allowed_for_session(work_item->trigger, session)) { @@ -469,43 +653,18 @@ end: static int action_executor_group_handler(struct action_executor *executor, const struct action_work_item *work_item, - struct lttng_action *action_group) + struct action_work_subitem *item) { - int ret = 0; - unsigned int i, count; - enum lttng_action_status action_status; - - action_status = lttng_action_group_get_count(action_group, &count); - if (action_status != LTTNG_ACTION_STATUS_OK) { - /* Fatal error. */ - ERR("Failed to get count of action in action group"); - ret = -1; - goto end; - } - - DBG("Action group has %u action%s", count, count != 1 ? "s" : ""); - for (i = 0; i < count; i++) { - struct lttng_action *action = - lttng_action_group_borrow_mutable_at_index( - action_group, i); - - ret = action_executor_generic_handler( - executor, work_item, action); - if (ret) { - ERR("Stopping the execution of the action group of trigger `%s` following a fatal error", - get_trigger_name(work_item->trigger)); - goto end; - } - } -end: - return ret; + ERR("Execution of a group action by the action executor should never occur"); + abort(); } static int action_executor_generic_handler(struct action_executor *executor, const struct action_work_item *work_item, - struct lttng_action *action) + struct action_work_subitem *item) { int ret; + struct lttng_action *action = item->action; const enum lttng_action_type action_type = lttng_action_get_type(action); assert(action_type != LTTNG_ACTION_TYPE_UNKNOWN); @@ -525,7 +684,7 @@ static int action_executor_generic_handler(struct action_executor *executor, get_action_name(action), get_trigger_name(work_item->trigger), work_item->id); - ret = action_executors[action_type](executor, work_item, action); + ret = action_executors[action_type](executor, work_item, item); end: return ret; } @@ -534,12 +693,23 @@ static int action_work_item_execute(struct action_executor *executor, struct action_work_item *work_item) { int ret; - struct lttng_action *action = - lttng_trigger_get_action(work_item->trigger); + size_t count, i; DBG("Starting execution of action work item %" PRIu64 " of trigger `%s`", work_item->id, get_trigger_name(work_item->trigger)); - ret = action_executor_generic_handler(executor, work_item, action); + + count = lttng_dynamic_array_get_count(work_item->subitems); + for (i = 0; i < count; i++) { + struct action_work_subitem *item; + + item = lttng_dynamic_array_get_element(work_item->subitems, i); + ret = action_executor_generic_handler( + executor, work_item, item); + if (ret) { + goto end; + } + } +end: DBG("Completed execution of action work item %" PRIu64 " of trigger `%s`", work_item->id, get_trigger_name(work_item->trigger)); return ret; @@ -550,6 +720,7 @@ static void action_work_item_destroy(struct action_work_item *work_item) lttng_trigger_put(work_item->trigger); lttng_evaluation_destroy(work_item->evaluation); notification_client_list_put(work_item->client_list); + lttng_dynamic_array_reset(work_item->subitems); free(work_item); } @@ -723,31 +894,55 @@ void action_executor_destroy(struct action_executor *executor) } /* RCU read-lock must be held by the caller. */ -enum action_executor_status action_executor_enqueue( +enum action_executor_status action_executor_enqueue_trigger( struct action_executor *executor, struct lttng_trigger *trigger, struct lttng_evaluation *evaluation, const struct lttng_credentials *object_creds, struct notification_client_list *client_list) { + int ret; enum action_executor_status executor_status = ACTION_EXECUTOR_STATUS_OK; const uint64_t work_item_id = executor->next_work_item_id++; struct action_work_item *work_item; bool signal = false; + struct lttng_dynamic_array *subitems = NULL; + + assert(trigger); + + /* Build the array of action work subitems for the passed trigger. */ + subitems = zmalloc(sizeof(*subitems)); + if (!subitems) { + PERROR("Failed to allocate action executor subitems array: trigger name = `%s`", + get_trigger_name(trigger)); + executor_status = ACTION_EXECUTOR_STATUS_ERROR; + goto error_unlock; + } + + lttng_dynamic_array_init(subitems, sizeof(struct action_work_subitem), + action_work_subitem_destructor); + + ret = populate_subitem_array_from_trigger(trigger, subitems); + if (ret) { + ERR("Failed to populate work item sub items on behalf of trigger: trigger name = `%s`", + get_trigger_name(trigger)); + executor_status = ACTION_EXECUTOR_STATUS_ERROR; + goto error_unlock; + } pthread_mutex_lock(&executor->work.lock); /* Check for queue overflow. */ if (executor->work.pending_count >= MAX_QUEUED_WORK_COUNT) { /* Most likely spammy, remove if it is the case. */ - DBG("Refusing to enqueue action for trigger `%s` as work item %" PRIu64 - " (overflow)", get_trigger_name(trigger), work_item_id); + DBG("Refusing to enqueue action for trigger (overflow): trigger name = `%s`, work item id = %" PRIu64, + get_trigger_name(trigger), work_item_id); executor_status = ACTION_EXECUTOR_STATUS_OVERFLOW; goto error_unlock; } work_item = zmalloc(sizeof(*work_item)); if (!work_item) { - PERROR("Failed to allocate action executor work item on behalf of trigger `%s`", + PERROR("Failed to allocate action executor work item: trigger name = '%s'", get_trigger_name(trigger)); executor_status = ACTION_EXECUTOR_STATUS_ERROR; goto error_unlock; @@ -763,6 +958,8 @@ enum action_executor_status action_executor_enqueue( *work_item = (typeof(*work_item)){ .id = work_item_id, + /* Ownership transferred to the work item. */ + .subitems = subitems, .trigger = trigger, /* Ownership transferred to the work item. */ .evaluation = evaluation, @@ -776,9 +973,10 @@ enum action_executor_status action_executor_enqueue( }; evaluation = NULL; + subitems = NULL; cds_list_add_tail(&work_item->list_node, &executor->work.list); executor->work.pending_count++; - DBG("Enqueued action for trigger `%s` as work item #%" PRIu64, + DBG("Enqueued action for trigger: trigger name = `%s`, work item id = %" PRIu64, get_trigger_name(trigger), work_item_id); signal = true; @@ -789,5 +987,132 @@ error_unlock: pthread_mutex_unlock(&executor->work.lock); lttng_evaluation_destroy(evaluation); + if (subitems) { + lttng_dynamic_array_reset(subitems); + free(subitems); + } return executor_status; } + +static int add_action_to_subitem_array(struct lttng_action *action, + struct lttng_dynamic_array *subitems) +{ + int ret; + enum lttng_action_type type = lttng_action_get_type(action); + const char *session_name = NULL; + enum lttng_action_status status; + struct action_work_subitem subitem = { + .action = NULL, + .context = { + .session_id = LTTNG_OPTIONAL_INIT_UNSET, + }, + }; + + assert(action); + assert(subitems); + + if (type == LTTNG_ACTION_TYPE_GROUP) { + unsigned int count, i; + + status = lttng_action_group_get_count(action, &count); + assert(status == LTTNG_ACTION_STATUS_OK); + + for (i = 0; i < count; i++) { + struct lttng_action *inner_action = NULL; + + inner_action = lttng_action_group_borrow_mutable_at_index( + action, i); + assert(inner_action); + ret = add_action_to_subitem_array( + inner_action, subitems); + if (ret) { + goto end; + } + } + + /* + * Go directly to the end since there is no need to add the + * group action by itself to the subitems array. + */ + goto end; + } + + /* Gather execution context. */ + switch (type) { + case LTTNG_ACTION_TYPE_NOTIFY: + break; + case LTTNG_ACTION_TYPE_START_SESSION: + status = lttng_action_start_session_get_session_name( + action, &session_name); + assert(status == LTTNG_ACTION_STATUS_OK); + break; + case LTTNG_ACTION_TYPE_STOP_SESSION: + status = lttng_action_stop_session_get_session_name( + action, &session_name); + assert(status == LTTNG_ACTION_STATUS_OK); + break; + case LTTNG_ACTION_TYPE_ROTATE_SESSION: + status = lttng_action_rotate_session_get_session_name( + action, &session_name); + assert(status == LTTNG_ACTION_STATUS_OK); + break; + case LTTNG_ACTION_TYPE_SNAPSHOT_SESSION: + status = lttng_action_snapshot_session_get_session_name( + action, &session_name); + assert(status == LTTNG_ACTION_STATUS_OK); + break; + case LTTNG_ACTION_TYPE_GROUP: + case LTTNG_ACTION_TYPE_UNKNOWN: + /* Fallthrough */ + default: + abort(); + break; + } + + /* + * Fetch the session execution context info as needed. + * Note that we could decide to not add an action for which we know the + * execution will not happen (i.e no session exists for that name). For + * now we leave the decision to skip to the action executor for sake of + * simplicity and consistency. + */ + if (session_name != NULL) { + struct ltt_session *session = NULL; + + session_lock_list(); + session = session_find_by_name(session_name); + if (session) { + LTTNG_OPTIONAL_SET(&subitem.context.session_id, + session->id); + session_put(session); + } + + session_unlock_list(); + } + + /* Get a reference to the action. */ + lttng_action_get(action); + subitem.action = action; + + ret = lttng_dynamic_array_add_element(subitems, &subitem); + if (ret) { + ERR("Failed to add work subitem to the subitem array"); + lttng_action_put(action); + ret = -1; + goto end; + } + +end: + return ret; +} + +static int populate_subitem_array_from_trigger(struct lttng_trigger *trigger, + struct lttng_dynamic_array *subitems) +{ + struct lttng_action *action; + + action = lttng_trigger_get_action(trigger); + assert(action); + + return add_action_to_subitem_array(action, subitems); +}