X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Faction-executor.c;fp=src%2Fbin%2Flttng-sessiond%2Faction-executor.c;h=0000000000000000000000000000000000000000;hp=94b819c46e5e4baeef67caabe6f844a85d20b8c3;hb=7966af5763c4aaca39df9bbfa9277ff15715c720;hpb=3a5f70173aa04d11ccb22694d5d31a702cad33ab diff --git a/src/bin/lttng-sessiond/action-executor.c b/src/bin/lttng-sessiond/action-executor.c deleted file mode 100644 index 94b819c46..000000000 --- a/src/bin/lttng-sessiond/action-executor.c +++ /dev/null @@ -1,1113 +0,0 @@ -/* - * Copyright (C) 2020 Jérémie Galarneau - * - * SPDX-License-Identifier: GPL-2.0-only - * - */ - -#include "action-executor.h" -#include "cmd.h" -#include "health-sessiond.h" -#include "lttng-sessiond.h" -#include "notification-thread-internal.h" -#include "session.h" -#include "thread.h" -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#define THREAD_NAME "Action Executor" -#define MAX_QUEUED_WORK_COUNT 8192 - -/* - * A work item is composed of a dynamic array of sub-items which - * represent a flattened, and augmented, version of a trigger's actions. - * - * We cannot rely solely on the trigger's actions since each action can have an - * execution context we need to comply with. - * - * The notion of execution context is required since for some actions the - * associated object are referenced by name and not by id. This can lead to - * a number of ambiguities when executing an action work item. - * - * For example, let's take a simple trigger such as: - * - condition: ust event a - * - action: start session S - * - * At time T, session S exists. - * At T + 1, the event A is hit. - * At T + 2, the tracer event notification is received and the work item is - * queued. Here session S have an id of 1. - * At T + 3, the session S is destroyed and a new session S is created, with a - * resulting id of 200. - * At T +4, the work item is popped from the queue and begin execution and will - * start session S with an id of 200 instead of the session S id 1 that was - * present at the queuing phase. - * - * The context to be respected is the one when the work item is queued. If the - * execution context is not the same at the moment of execution, we skip the - * execution of that sub-item. - * - * It is the same policy in regards to the validity of the associated - * trigger object at the moment of execution, if the trigger is found to be - * unregistered, the execution is skipped. - */ - -struct action_work_item { - uint64_t id; - - /* - * The actions to be executed with their respective execution context. - * See struct `action_work_subitem`. - */ - struct lttng_dynamic_array subitems; - - /* Execution context data */ - struct lttng_trigger *trigger; - struct lttng_evaluation *evaluation; - struct notification_client_list *client_list; - LTTNG_OPTIONAL(struct lttng_credentials) object_creds; - struct cds_list_head list_node; -}; - -struct action_work_subitem { - struct lttng_action *action; - struct { - /* Used by actions targeting a session. */ - LTTNG_OPTIONAL(uint64_t) session_id; - } context; -}; - -struct action_executor { - struct lttng_thread *thread; - struct notification_thread_handle *notification_thread_handle; - struct { - uint64_t pending_count; - struct cds_list_head list; - pthread_cond_t cond; - pthread_mutex_t lock; - } work; - bool should_quit; - uint64_t next_work_item_id; -}; - -/* - * Only return non-zero on a fatal error that should shut down the action - * executor. - */ -typedef int (*action_executor_handler)(struct action_executor *executor, - const struct action_work_item *, - struct action_work_subitem *item); - -static int action_executor_notify_handler(struct action_executor *executor, - const struct action_work_item *, - struct action_work_subitem *); -static int action_executor_start_session_handler( - struct action_executor *executor, - const struct action_work_item *, - struct action_work_subitem *); -static int action_executor_stop_session_handler( - struct action_executor *executor, - const struct action_work_item *, - struct action_work_subitem *); -static int action_executor_rotate_session_handler( - struct action_executor *executor, - const struct action_work_item *, - struct action_work_subitem *); -static int action_executor_snapshot_session_handler( - struct action_executor *executor, - const struct action_work_item *, - struct action_work_subitem *); -static int action_executor_list_handler(struct action_executor *executor, - const struct action_work_item *, - struct action_work_subitem *); -static int action_executor_generic_handler(struct action_executor *executor, - const struct action_work_item *, - struct action_work_subitem *); - -static const action_executor_handler action_executors[] = { - [LTTNG_ACTION_TYPE_NOTIFY] = action_executor_notify_handler, - [LTTNG_ACTION_TYPE_START_SESSION] = action_executor_start_session_handler, - [LTTNG_ACTION_TYPE_STOP_SESSION] = action_executor_stop_session_handler, - [LTTNG_ACTION_TYPE_ROTATE_SESSION] = action_executor_rotate_session_handler, - [LTTNG_ACTION_TYPE_SNAPSHOT_SESSION] = action_executor_snapshot_session_handler, - [LTTNG_ACTION_TYPE_LIST] = action_executor_list_handler, -}; - -/* Forward declaration */ -static int add_action_to_subitem_array(struct lttng_action *action, - struct lttng_dynamic_array *subitems); - -static int populate_subitem_array_from_trigger(struct lttng_trigger *trigger, - struct lttng_dynamic_array *subitems); - -static void action_work_subitem_destructor(void *element) -{ - struct action_work_subitem *subitem = element; - - lttng_action_put(subitem->action); -} - -static const char *get_action_name(const struct lttng_action *action) -{ - const enum lttng_action_type action_type = lttng_action_get_type(action); - - LTTNG_ASSERT(action_type != LTTNG_ACTION_TYPE_UNKNOWN); - - return lttng_action_type_string(action_type); -} - -/* Check if this trigger allowed to interect with a given session. */ -static bool is_trigger_allowed_for_session(const struct lttng_trigger *trigger, - struct ltt_session *session) -{ - bool is_allowed = false; - const struct lttng_credentials session_creds = { - .uid = LTTNG_OPTIONAL_INIT_VALUE(session->uid), - .gid = LTTNG_OPTIONAL_INIT_VALUE(session->gid), - }; - /* Can never be NULL. */ - const struct lttng_credentials *trigger_creds = - lttng_trigger_get_credentials(trigger); - - is_allowed = (lttng_credentials_is_equal_uid(trigger_creds, &session_creds)) || - (lttng_credentials_get_uid(trigger_creds) == 0); - if (!is_allowed) { - WARN("Trigger is not allowed to interact with session `%s`: session uid = %ld, session gid = %ld, trigger uid = %ld", - session->name, - (long int) session->uid, - (long int) session->gid, - (long int) lttng_credentials_get_uid(trigger_creds)); - } - - return is_allowed; -} - -static const char *get_trigger_name(const struct lttng_trigger *trigger) -{ - const char *trigger_name; - enum lttng_trigger_status trigger_status; - - trigger_status = lttng_trigger_get_name(trigger, &trigger_name); - switch (trigger_status) { - case LTTNG_TRIGGER_STATUS_OK: - break; - case LTTNG_TRIGGER_STATUS_UNSET: - trigger_name = "(anonymous)"; - break; - default: - trigger_name = "(failed to get name)"; - break; - } - - return trigger_name; -} - -static int client_handle_transmission_status( - struct notification_client *client, - enum client_transmission_status status, - void *user_data) -{ - int ret = 0; - struct action_executor *executor = user_data; - bool update_communication = true; - - switch (status) { - case CLIENT_TRANSMISSION_STATUS_COMPLETE: - DBG("Successfully sent full notification to client, client_id = %" PRIu64, - client->id); - update_communication = false; - break; - case CLIENT_TRANSMISSION_STATUS_QUEUED: - DBG("Queued notification in client outgoing buffer, client_id = %" PRIu64, - client->id); - break; - case CLIENT_TRANSMISSION_STATUS_FAIL: - DBG("Communication error occurred while sending notification to client, client_id = %" PRIu64, - client->id); - break; - default: - ERR("Fatal error encoutered while sending notification to client, client_id = %" PRIu64, - client->id); - ret = -1; - goto end; - } - - if (!update_communication) { - goto end; - } - - /* Safe to read client's id without locking as it is immutable. */ - ret = notification_thread_client_communication_update( - executor->notification_thread_handle, client->id, - status); -end: - return ret; -} - -static int action_executor_notify_handler(struct action_executor *executor, - const struct action_work_item *work_item, - struct action_work_subitem *item) -{ - return notification_client_list_send_evaluation(work_item->client_list, - work_item->trigger, - work_item->evaluation, - work_item->object_creds.is_set ? - &(work_item->object_creds.value) : - NULL, - client_handle_transmission_status, executor); -} - -static int action_executor_start_session_handler( - struct action_executor *executor, - const struct action_work_item *work_item, - struct action_work_subitem *item) -{ - int ret = 0; - const char *session_name; - enum lttng_action_status action_status; - struct ltt_session *session; - enum lttng_error_code cmd_ret; - struct lttng_action *action = item->action; - - action_status = lttng_action_start_session_get_session_name( - action, &session_name); - if (action_status != LTTNG_ACTION_STATUS_OK) { - ERR("Failed to get session name from `%s` action", - get_action_name(action)); - ret = -1; - goto end; - } - - /* - * Validate if at the moment of the action was queued the session - * existed. If not skip the action altogether. - */ - if (!item->context.session_id.is_set) { - DBG("Session `%s` was not present at the moment the work item was enqueued for %s` action of trigger `%s`", - session_name, get_action_name(action), - get_trigger_name(work_item->trigger)); - lttng_action_increase_execution_failure_count(action); - ret = 0; - goto end; - } - - session_lock_list(); - session = session_find_by_name(session_name); - if (!session) { - DBG("Failed to find session `%s` by name while executing `%s` action of trigger `%s`", - session_name, get_action_name(action), - get_trigger_name(work_item->trigger)); - goto error_unlock_list; - } - - /* - * Check if the session id is the same as when the work item was - * enqueued. - */ - if (session->id != LTTNG_OPTIONAL_GET(item->context.session_id)) { - DBG("Session id for session `%s` (id: %" PRIu64 - " is not the same that was sampled (id: %" PRIu64 - " at the moment the work item was enqueued for %s` action of trigger `%s`", - session_name, session->id, - LTTNG_OPTIONAL_GET(item->context.session_id), - get_action_name(action), - get_trigger_name(work_item->trigger)); - ret = 0; - goto error_unlock_list; - } - - session_lock(session); - if (!is_trigger_allowed_for_session(work_item->trigger, session)) { - goto error_dispose_session; - } - - cmd_ret = cmd_start_trace(session); - switch (cmd_ret) { - case LTTNG_OK: - DBG("Successfully started session `%s` on behalf of trigger `%s`", - session_name, get_trigger_name(work_item->trigger)); - break; - case LTTNG_ERR_TRACE_ALREADY_STARTED: - DBG("Attempted to start session `%s` on behalf of trigger `%s` but it was already started", - session_name, get_trigger_name(work_item->trigger)); - break; - default: - WARN("Failed to start session `%s` on behalf of trigger `%s`: %s", - session_name, get_trigger_name(work_item->trigger), - lttng_strerror(-cmd_ret)); - lttng_action_increase_execution_failure_count(action); - break; - } - -error_dispose_session: - session_unlock(session); - session_put(session); -error_unlock_list: - session_unlock_list(); -end: - return ret; -} - -static int action_executor_stop_session_handler( - struct action_executor *executor, - const struct action_work_item *work_item, - struct action_work_subitem *item) -{ - int ret = 0; - const char *session_name; - enum lttng_action_status action_status; - struct ltt_session *session; - enum lttng_error_code cmd_ret; - struct lttng_action *action = item->action; - - action_status = lttng_action_stop_session_get_session_name( - action, &session_name); - if (action_status != LTTNG_ACTION_STATUS_OK) { - ERR("Failed to get session name from `%s` action", - get_action_name(action)); - ret = -1; - goto end; - } - - /* - * Validate if, at the moment the action was queued, the target session - * existed. If not, skip the action altogether. - */ - if (!item->context.session_id.is_set) { - DBG("Session `%s` was not present at the moment the work item was enqueued for %s` action of trigger `%s`", - session_name, get_action_name(action), - get_trigger_name(work_item->trigger)); - lttng_action_increase_execution_failure_count(action); - ret = 0; - goto end; - } - - session_lock_list(); - session = session_find_by_name(session_name); - if (!session) { - DBG("Failed to find session `%s` by name while executing `%s` action of trigger `%s`", - session_name, get_action_name(action), - get_trigger_name(work_item->trigger)); - lttng_action_increase_execution_failure_count(action); - goto error_unlock_list; - } - - /* - * Check if the session id is the same as when the work item was - * enqueued - */ - if (session->id != LTTNG_OPTIONAL_GET(item->context.session_id)) { - DBG("Session id for session `%s` (id: %" PRIu64 - " is not the same that was sampled (id: %" PRIu64 - " at the moment the work item was enqueued for %s` action of trigger `%s`", - session_name, session->id, - LTTNG_OPTIONAL_GET(item->context.session_id), - get_action_name(action), - get_trigger_name(work_item->trigger)); - ret = 0; - goto error_unlock_list; - } - - session_lock(session); - if (!is_trigger_allowed_for_session(work_item->trigger, session)) { - goto error_dispose_session; - } - - cmd_ret = cmd_stop_trace(session); - switch (cmd_ret) { - case LTTNG_OK: - DBG("Successfully stopped session `%s` on behalf of trigger `%s`", - session_name, get_trigger_name(work_item->trigger)); - break; - case LTTNG_ERR_TRACE_ALREADY_STOPPED: - DBG("Attempted to stop session `%s` on behalf of trigger `%s` but it was already stopped", - session_name, get_trigger_name(work_item->trigger)); - break; - default: - WARN("Failed to stop session `%s` on behalf of trigger `%s`: %s", - session_name, get_trigger_name(work_item->trigger), - lttng_strerror(-cmd_ret)); - lttng_action_increase_execution_failure_count(action); - break; - } - -error_dispose_session: - session_unlock(session); - session_put(session); -error_unlock_list: - session_unlock_list(); -end: - return ret; -} - -static int action_executor_rotate_session_handler( - struct action_executor *executor, - const struct action_work_item *work_item, - struct action_work_subitem *item) -{ - int ret = 0; - const char *session_name; - enum lttng_action_status action_status; - struct ltt_session *session; - enum lttng_error_code cmd_ret; - struct lttng_action *action = item->action; - - action_status = lttng_action_rotate_session_get_session_name( - action, &session_name); - if (action_status != LTTNG_ACTION_STATUS_OK) { - ERR("Failed to get session name from `%s` action", - get_action_name(action)); - ret = -1; - goto end; - } - - /* - * Validate if, at the moment the action was queued, the target session - * existed. If not, skip the action altogether. - */ - if (!item->context.session_id.is_set) { - DBG("Session `%s` was not present at the moment the work item was enqueued for %s` action of trigger `%s`", - session_name, get_action_name(action), - get_trigger_name(work_item->trigger)); - lttng_action_increase_execution_failure_count(action); - ret = 0; - goto end; - } - - session_lock_list(); - session = session_find_by_name(session_name); - if (!session) { - DBG("Failed to find session `%s` by name while executing `%s` action of trigger `%s`", - session_name, get_action_name(action), - get_trigger_name(work_item->trigger)); - lttng_action_increase_execution_failure_count(action); - goto error_unlock_list; - } - - /* - * Check if the session id is the same as when the work item was - * enqueued. - */ - if (session->id != LTTNG_OPTIONAL_GET(item->context.session_id)) { - DBG("Session id for session `%s` (id: %" PRIu64 - " is not the same that was sampled (id: %" PRIu64 - " at the moment the work item was enqueued for %s` action of trigger `%s`", - session_name, session->id, - LTTNG_OPTIONAL_GET(item->context.session_id), - get_action_name(action), - get_trigger_name(work_item->trigger)); - ret = 0; - goto error_unlock_list; - } - - session_lock(session); - if (!is_trigger_allowed_for_session(work_item->trigger, session)) { - goto error_dispose_session; - } - - cmd_ret = cmd_rotate_session(session, NULL, false, - LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED); - switch (cmd_ret) { - case LTTNG_OK: - DBG("Successfully started rotation of session `%s` on behalf of trigger `%s`", - session_name, get_trigger_name(work_item->trigger)); - break; - case LTTNG_ERR_ROTATION_PENDING: - DBG("Attempted to start a rotation of session `%s` on behalf of trigger `%s` but a rotation is already ongoing", - session_name, get_trigger_name(work_item->trigger)); - lttng_action_increase_execution_failure_count(action); - break; - case LTTNG_ERR_ROTATION_MULTIPLE_AFTER_STOP: - case LTTNG_ERR_ROTATION_AFTER_STOP_CLEAR: - DBG("Attempted to start a rotation of session `%s` on behalf of trigger `%s` but a rotation has already been completed since the last stop or clear", - session_name, get_trigger_name(work_item->trigger)); - break; - default: - WARN("Failed to start a rotation of session `%s` on behalf of trigger `%s`: %s", - session_name, get_trigger_name(work_item->trigger), - lttng_strerror(-cmd_ret)); - lttng_action_increase_execution_failure_count(action); - break; - } - -error_dispose_session: - session_unlock(session); - session_put(session); -error_unlock_list: - session_unlock_list(); -end: - return ret; -} - -static int action_executor_snapshot_session_handler( - struct action_executor *executor, - const struct action_work_item *work_item, - struct action_work_subitem *item) -{ - int ret = 0; - const char *session_name; - enum lttng_action_status action_status; - struct ltt_session *session; - const struct lttng_snapshot_output default_snapshot_output = { - .max_size = UINT64_MAX, - }; - const struct lttng_snapshot_output *snapshot_output = - &default_snapshot_output; - enum lttng_error_code cmd_ret; - struct lttng_action *action = item->action; - - /* - * Validate if, at the moment the action was queued, the target session - * existed. If not, skip the action altogether. - */ - if (!item->context.session_id.is_set) { - DBG("Session was not present at the moment the work item was enqueued for %s` action of trigger `%s`", - get_action_name(action), - get_trigger_name(work_item->trigger)); - lttng_action_increase_execution_failure_count(action); - ret = 0; - goto end; - } - - action_status = lttng_action_snapshot_session_get_session_name( - action, &session_name); - if (action_status != LTTNG_ACTION_STATUS_OK) { - ERR("Failed to get session name from `%s` action", - get_action_name(action)); - ret = -1; - goto end; - } - - action_status = lttng_action_snapshot_session_get_output( - action, &snapshot_output); - if (action_status != LTTNG_ACTION_STATUS_OK && - action_status != LTTNG_ACTION_STATUS_UNSET) { - ERR("Failed to get output from `%s` action", - get_action_name(action)); - ret = -1; - goto end; - } - - session_lock_list(); - session = session_find_by_name(session_name); - if (!session) { - DBG("Failed to find session `%s` by name while executing `%s` action of trigger `%s`", - session_name, get_action_name(action), - get_trigger_name(work_item->trigger)); - lttng_action_increase_execution_failure_count(action); - goto error_unlock_list; - } - - /* - * Check if the session id is the same as when the work item was - * enqueued. - */ - if (session->id != LTTNG_OPTIONAL_GET(item->context.session_id)) { - DBG("Session id for session `%s` (id: %" PRIu64 - " is not the same that was sampled (id: %" PRIu64 - " at the moment the work item was enqueued for %s` action of trigger `%s`", - session_name, session->id, - LTTNG_OPTIONAL_GET(item->context.session_id), - get_action_name(action), - get_trigger_name(work_item->trigger)); - ret = 0; - goto error_unlock_list; - } - - session_lock(session); - if (!is_trigger_allowed_for_session(work_item->trigger, session)) { - goto error_dispose_session; - } - - cmd_ret = cmd_snapshot_record(session, snapshot_output, 0); - switch (cmd_ret) { - case LTTNG_OK: - DBG("Successfully recorded snapshot of session `%s` on behalf of trigger `%s`", - session_name, get_trigger_name(work_item->trigger)); - break; - default: - WARN("Failed to record snapshot of session `%s` on behalf of trigger `%s`: %s", - session_name, get_trigger_name(work_item->trigger), - lttng_strerror(-cmd_ret)); - lttng_action_increase_execution_failure_count(action); - break; - } - -error_dispose_session: - session_unlock(session); - session_put(session); -error_unlock_list: - session_unlock_list(); -end: - return ret; -} - -static int action_executor_list_handler(struct action_executor *executor, - const struct action_work_item *work_item, - struct action_work_subitem *item) -{ - ERR("Execution of a list action by the action executor should never occur"); - abort(); -} - -static int action_executor_generic_handler(struct action_executor *executor, - const struct action_work_item *work_item, - struct action_work_subitem *item) -{ - int ret; - struct lttng_action *action = item->action; - const enum lttng_action_type action_type = lttng_action_get_type(action); - - LTTNG_ASSERT(action_type != LTTNG_ACTION_TYPE_UNKNOWN); - - lttng_action_increase_execution_request_count(action); - if (!lttng_action_should_execute(action)) { - DBG("Policy prevented execution of action `%s` of trigger `%s` action work item %" PRIu64, - get_action_name(action), - get_trigger_name(work_item->trigger), - work_item->id); - ret = 0; - goto end; - } - - lttng_action_increase_execution_count(action); - DBG("Executing action `%s` of trigger `%s` action work item %" PRIu64, - get_action_name(action), - get_trigger_name(work_item->trigger), - work_item->id); - ret = action_executors[action_type](executor, work_item, item); -end: - return ret; -} - -static int action_work_item_execute(struct action_executor *executor, - struct action_work_item *work_item) -{ - int ret; - size_t count, i; - - DBG("Starting execution of action work item %" PRIu64 " of trigger `%s`", - work_item->id, get_trigger_name(work_item->trigger)); - - count = lttng_dynamic_array_get_count(&work_item->subitems); - for (i = 0; i < count; i++) { - struct action_work_subitem *item; - - item = lttng_dynamic_array_get_element(&work_item->subitems, i); - ret = action_executor_generic_handler( - executor, work_item, item); - if (ret) { - goto end; - } - } -end: - DBG("Completed execution of action work item %" PRIu64 " of trigger `%s`", - work_item->id, get_trigger_name(work_item->trigger)); - return ret; -} - -static void action_work_item_destroy(struct action_work_item *work_item) -{ - lttng_trigger_put(work_item->trigger); - lttng_evaluation_destroy(work_item->evaluation); - notification_client_list_put(work_item->client_list); - lttng_dynamic_array_reset(&work_item->subitems); - free(work_item); -} - -static void *action_executor_thread(void *_data) -{ - struct action_executor *executor = _data; - - LTTNG_ASSERT(executor); - - health_register(the_health_sessiond, - HEALTH_SESSIOND_TYPE_ACTION_EXECUTOR); - - rcu_register_thread(); - rcu_thread_online(); - - DBG("Entering work execution loop"); - pthread_mutex_lock(&executor->work.lock); - while (!executor->should_quit) { - int ret = 0; - struct action_work_item *work_item; - - health_code_update(); - if (executor->work.pending_count == 0) { - health_poll_entry(); - DBG("No work items enqueued, entering wait"); - pthread_cond_wait(&executor->work.cond, - &executor->work.lock); - DBG("Woke-up from wait"); - health_poll_exit(); - continue; - } - - /* Pop item from front of the list with work lock held. */ - work_item = cds_list_first_entry(&executor->work.list, - struct action_work_item, list_node); - cds_list_del(&work_item->list_node); - executor->work.pending_count--; - - /* - * Work can be performed without holding the work lock, - * allowing new items to be queued. - */ - pthread_mutex_unlock(&executor->work.lock); - - /* Execute item only if a trigger is registered. */ - lttng_trigger_lock(work_item->trigger); - if (!lttng_trigger_is_registered(work_item->trigger)) { - const char *trigger_name = NULL; - uid_t trigger_owner_uid; - enum lttng_trigger_status trigger_status; - - trigger_name = get_trigger_name(work_item->trigger); - - trigger_status = lttng_trigger_get_owner_uid( - work_item->trigger, &trigger_owner_uid); - LTTNG_ASSERT(trigger_status == LTTNG_TRIGGER_STATUS_OK); - - DBG("Work item skipped since the associated trigger is no longer registered: work item id = %" PRIu64 ", trigger name = '%s', trigger owner uid = %d", - work_item->id, trigger_name, - (int) trigger_owner_uid); - ret = 0; - goto skip_execute; - } - - ret = action_work_item_execute(executor, work_item); - - skip_execute: - lttng_trigger_unlock(work_item->trigger); - action_work_item_destroy(work_item); - if (ret) { - /* Fatal error. */ - break; - } - - health_code_update(); - pthread_mutex_lock(&executor->work.lock); - } - - if (executor->should_quit) { - pthread_mutex_unlock(&executor->work.lock); - } - DBG("Left work execution loop"); - - health_code_update(); - - rcu_thread_offline(); - rcu_unregister_thread(); - health_unregister(the_health_sessiond); - - return NULL; -} - -static bool shutdown_action_executor_thread(void *_data) -{ - struct action_executor *executor = _data; - - pthread_mutex_lock(&executor->work.lock); - executor->should_quit = true; - pthread_cond_signal(&executor->work.cond); - pthread_mutex_unlock(&executor->work.lock); - return true; -} - -static void clean_up_action_executor_thread(void *_data) -{ - struct action_executor *executor = _data; - - LTTNG_ASSERT(cds_list_empty(&executor->work.list)); - - pthread_mutex_destroy(&executor->work.lock); - pthread_cond_destroy(&executor->work.cond); - free(executor); -} - -struct action_executor *action_executor_create( - struct notification_thread_handle *handle) -{ - struct action_executor *executor = zmalloc(sizeof(*executor)); - - if (!executor) { - goto end; - } - - CDS_INIT_LIST_HEAD(&executor->work.list); - pthread_cond_init(&executor->work.cond, NULL); - pthread_mutex_init(&executor->work.lock, NULL); - executor->notification_thread_handle = handle; - - executor->thread = lttng_thread_create(THREAD_NAME, - action_executor_thread, shutdown_action_executor_thread, - clean_up_action_executor_thread, executor); -end: - return executor; -} - -void action_executor_destroy(struct action_executor *executor) -{ - struct action_work_item *work_item, *tmp; - - /* TODO Wait for work list to drain? */ - lttng_thread_shutdown(executor->thread); - pthread_mutex_lock(&executor->work.lock); - if (executor->work.pending_count != 0) { - WARN("%" PRIu64 - " trigger action%s still queued for execution and will be discarded", - executor->work.pending_count, - executor->work.pending_count == 1 ? " is" : - "s are"); - } - - cds_list_for_each_entry_safe ( - work_item, tmp, &executor->work.list, list_node) { - WARN("Discarding action work item %" PRIu64 - " associated to trigger `%s`", - work_item->id, get_trigger_name(work_item->trigger)); - cds_list_del(&work_item->list_node); - action_work_item_destroy(work_item); - } - pthread_mutex_unlock(&executor->work.lock); - lttng_thread_put(executor->thread); -} - -/* RCU read-lock must be held by the caller. */ -enum action_executor_status action_executor_enqueue_trigger( - struct action_executor *executor, - struct lttng_trigger *trigger, - struct lttng_evaluation *evaluation, - const struct lttng_credentials *object_creds, - struct notification_client_list *client_list) -{ - int ret; - enum action_executor_status executor_status = ACTION_EXECUTOR_STATUS_OK; - const uint64_t work_item_id = executor->next_work_item_id++; - struct action_work_item *work_item; - bool signal = false; - - LTTNG_ASSERT(trigger); - - pthread_mutex_lock(&executor->work.lock); - /* Check for queue overflow. */ - if (executor->work.pending_count >= MAX_QUEUED_WORK_COUNT) { - /* Most likely spammy, remove if it is the case. */ - DBG("Refusing to enqueue action for trigger (overflow): trigger name = `%s`, work item id = %" PRIu64, - get_trigger_name(trigger), work_item_id); - executor_status = ACTION_EXECUTOR_STATUS_OVERFLOW; - goto error_unlock; - } - - work_item = zmalloc(sizeof(*work_item)); - if (!work_item) { - PERROR("Failed to allocate action executor work item: trigger name = '%s'", - get_trigger_name(trigger)); - executor_status = ACTION_EXECUTOR_STATUS_ERROR; - goto error_unlock; - } - - lttng_trigger_get(trigger); - if (client_list) { - const bool reference_acquired = - notification_client_list_get(client_list); - - LTTNG_ASSERT(reference_acquired); - } - - *work_item = (typeof(*work_item)){ - .id = work_item_id, - .trigger = trigger, - /* Ownership transferred to the work item. */ - .evaluation = evaluation, - .object_creds = { - .is_set = !!object_creds, - .value = object_creds ? *object_creds : - (typeof(work_item->object_creds.value)) {}, - }, - .client_list = client_list, - .list_node = CDS_LIST_HEAD_INIT(work_item->list_node), - }; - - evaluation = NULL; - - /* Build the array of action work subitems for the passed trigger. */ - lttng_dynamic_array_init(&work_item->subitems, - sizeof(struct action_work_subitem), - action_work_subitem_destructor); - - ret = populate_subitem_array_from_trigger( - trigger, &work_item->subitems); - if (ret) { - ERR("Failed to populate work item sub items on behalf of trigger: trigger name = `%s`", - get_trigger_name(trigger)); - executor_status = ACTION_EXECUTOR_STATUS_ERROR; - goto error_unlock; - } - - cds_list_add_tail(&work_item->list_node, &executor->work.list); - executor->work.pending_count++; - DBG("Enqueued action for trigger: trigger name = `%s`, work item id = %" PRIu64, - get_trigger_name(trigger), work_item_id); - signal = true; - -error_unlock: - if (signal) { - pthread_cond_signal(&executor->work.cond); - } - - pthread_mutex_unlock(&executor->work.lock); - lttng_evaluation_destroy(evaluation); - return executor_status; -} - -static int add_action_to_subitem_array(struct lttng_action *action, - struct lttng_dynamic_array *subitems) -{ - int ret = 0; - enum lttng_action_type type = lttng_action_get_type(action); - const char *session_name = NULL; - enum lttng_action_status status; - struct action_work_subitem subitem = { - .action = NULL, - .context = { - .session_id = LTTNG_OPTIONAL_INIT_UNSET, - }, - }; - - LTTNG_ASSERT(action); - LTTNG_ASSERT(subitems); - - if (type == LTTNG_ACTION_TYPE_LIST) { - unsigned int count, i; - - status = lttng_action_list_get_count(action, &count); - LTTNG_ASSERT(status == LTTNG_ACTION_STATUS_OK); - - for (i = 0; i < count; i++) { - struct lttng_action *inner_action = NULL; - - inner_action = lttng_action_list_borrow_mutable_at_index( - action, i); - LTTNG_ASSERT(inner_action); - ret = add_action_to_subitem_array( - inner_action, subitems); - if (ret) { - goto end; - } - } - - /* - * Go directly to the end since there is no need to add the - * list action by itself to the subitems array. - */ - goto end; - } - - /* Gather execution context. */ - switch (type) { - case LTTNG_ACTION_TYPE_NOTIFY: - break; - case LTTNG_ACTION_TYPE_START_SESSION: - status = lttng_action_start_session_get_session_name( - action, &session_name); - LTTNG_ASSERT(status == LTTNG_ACTION_STATUS_OK); - break; - case LTTNG_ACTION_TYPE_STOP_SESSION: - status = lttng_action_stop_session_get_session_name( - action, &session_name); - LTTNG_ASSERT(status == LTTNG_ACTION_STATUS_OK); - break; - case LTTNG_ACTION_TYPE_ROTATE_SESSION: - status = lttng_action_rotate_session_get_session_name( - action, &session_name); - LTTNG_ASSERT(status == LTTNG_ACTION_STATUS_OK); - break; - case LTTNG_ACTION_TYPE_SNAPSHOT_SESSION: - status = lttng_action_snapshot_session_get_session_name( - action, &session_name); - LTTNG_ASSERT(status == LTTNG_ACTION_STATUS_OK); - break; - case LTTNG_ACTION_TYPE_LIST: - case LTTNG_ACTION_TYPE_UNKNOWN: - /* Fallthrough */ - default: - abort(); - break; - } - - /* - * Fetch the session execution context info as needed. - * Note that we could decide to not add an action for which we know the - * execution will not happen (i.e no session exists for that name). For - * now we leave the decision to skip to the action executor for sake of - * simplicity and consistency. - */ - if (session_name != NULL) { - uint64_t session_id; - - /* - * Instantaneous sampling of the session id if present. - * - * This method is preferred over `sessiond_find_by_name` then - * fetching the session'd id since `sessiond_find_by_name` - * requires the session list lock to be taken. - * - * Taking the session list lock can lead to a deadlock - * between the action executor and the notification thread - * (caller of add_action_to_subitem_array). It is okay if the - * session state changes between the enqueuing time and the - * execution time. The execution context is validated at - * execution time. - */ - if (sample_session_id_by_name(session_name, &session_id)) { - LTTNG_OPTIONAL_SET(&subitem.context.session_id, - session_id); - } - } - - /* Get a reference to the action. */ - lttng_action_get(action); - subitem.action = action; - - ret = lttng_dynamic_array_add_element(subitems, &subitem); - if (ret) { - ERR("Failed to add work subitem to the subitem array"); - lttng_action_put(action); - ret = -1; - goto end; - } - -end: - return ret; -} - -static int populate_subitem_array_from_trigger(struct lttng_trigger *trigger, - struct lttng_dynamic_array *subitems) -{ - struct lttng_action *action; - - action = lttng_trigger_get_action(trigger); - LTTNG_ASSERT(action); - - return add_action_to_subitem_array(action, subitems); -}