2 * Copyright (C) 2020 Jérémie Galarneau <jeremie.galarneau@efficios.com>
4 * SPDX-License-Identifier: GPL-2.0-only
8 #include "action-executor.h"
10 #include "health-sessiond.h"
11 #include "lttng-sessiond.h"
12 #include "notification-thread-internal.h"
15 #include <common/macros.h>
16 #include <common/optional.h>
17 #include <lttng/action/action-internal.h>
18 #include <lttng/action/group-internal.h>
19 #include <lttng/action/group.h>
20 #include <lttng/action/notify-internal.h>
21 #include <lttng/action/notify.h>
22 #include <lttng/action/rotate-session.h>
23 #include <lttng/action/snapshot-session.h>
24 #include <lttng/action/start-session.h>
25 #include <lttng/action/stop-session.h>
26 #include <lttng/condition/evaluation.h>
27 #include <lttng/condition/on-event-internal.h>
28 #include <lttng/lttng-error.h>
29 #include <lttng/trigger/trigger-internal.h>
33 #include <urcu/list.h>
35 #define THREAD_NAME "Action Executor"
36 #define MAX_QUEUED_WORK_COUNT 8192
38 struct action_work_item
{
40 struct lttng_trigger
*trigger
;
41 struct lttng_evaluation
*evaluation
;
42 struct notification_client_list
*client_list
;
43 LTTNG_OPTIONAL(struct lttng_credentials
) object_creds
;
44 struct cds_list_head list_node
;
47 struct action_executor
{
48 struct lttng_thread
*thread
;
49 struct notification_thread_handle
*notification_thread_handle
;
51 uint64_t pending_count
;
52 struct cds_list_head list
;
57 uint64_t next_work_item_id
;
61 * Only return non-zero on a fatal error that should shut down the action
64 typedef int (*action_executor_handler
)(struct action_executor
*executor
,
65 const struct action_work_item
*,
66 struct lttng_action
*action
);
68 static int action_executor_notify_handler(struct action_executor
*executor
,
69 const struct action_work_item
*,
70 struct lttng_action
*);
71 static int action_executor_start_session_handler(
72 struct action_executor
*executor
,
73 const struct action_work_item
*,
74 struct lttng_action
*);
75 static int action_executor_stop_session_handler(
76 struct action_executor
*executor
,
77 const struct action_work_item
*,
78 struct lttng_action
*);
79 static int action_executor_rotate_session_handler(
80 struct action_executor
*executor
,
81 const struct action_work_item
*,
82 struct lttng_action
*);
83 static int action_executor_snapshot_session_handler(
84 struct action_executor
*executor
,
85 const struct action_work_item
*,
86 struct lttng_action
*);
87 static int action_executor_group_handler(struct action_executor
*executor
,
88 const struct action_work_item
*,
89 struct lttng_action
*);
90 static int action_executor_generic_handler(struct action_executor
*executor
,
91 const struct action_work_item
*,
92 struct lttng_action
*);
94 static const action_executor_handler action_executors
[] = {
95 [LTTNG_ACTION_TYPE_NOTIFY
] = action_executor_notify_handler
,
96 [LTTNG_ACTION_TYPE_START_SESSION
] = action_executor_start_session_handler
,
97 [LTTNG_ACTION_TYPE_STOP_SESSION
] = action_executor_stop_session_handler
,
98 [LTTNG_ACTION_TYPE_ROTATE_SESSION
] = action_executor_rotate_session_handler
,
99 [LTTNG_ACTION_TYPE_SNAPSHOT_SESSION
] = action_executor_snapshot_session_handler
,
100 [LTTNG_ACTION_TYPE_GROUP
] = action_executor_group_handler
,
103 static const char *get_action_name(const struct lttng_action
*action
)
105 const enum lttng_action_type action_type
= lttng_action_get_type(action
);
107 assert(action_type
!= LTTNG_ACTION_TYPE_UNKNOWN
);
109 return lttng_action_type_string(action_type
);
112 /* Check if this trigger allowed to interect with a given session. */
113 static bool is_trigger_allowed_for_session(const struct lttng_trigger
*trigger
,
114 struct ltt_session
*session
)
116 bool is_allowed
= false;
117 const struct lttng_credentials session_creds
= {
118 .uid
= LTTNG_OPTIONAL_INIT_VALUE(session
->uid
),
119 .gid
= LTTNG_OPTIONAL_INIT_VALUE(session
->gid
),
121 /* Can never be NULL. */
122 const struct lttng_credentials
*trigger_creds
=
123 lttng_trigger_get_credentials(trigger
);
125 is_allowed
= (lttng_credentials_is_equal_uid(trigger_creds
, &session_creds
)) ||
126 (lttng_credentials_get_uid(trigger_creds
) == 0);
128 WARN("Trigger is not allowed to interact with session `%s`: session uid = %ld, session gid = %ld, trigger uid = %ld",
130 (long int) session
->uid
,
131 (long int) session
->gid
,
132 (long int) lttng_credentials_get_uid(trigger_creds
));
138 static const char *get_trigger_name(const struct lttng_trigger
*trigger
)
140 const char *trigger_name
;
141 enum lttng_trigger_status trigger_status
;
143 trigger_status
= lttng_trigger_get_name(trigger
, &trigger_name
);
144 assert(trigger_status
== LTTNG_TRIGGER_STATUS_OK
);
149 static int client_handle_transmission_status(
150 struct notification_client
*client
,
151 enum client_transmission_status status
,
155 struct action_executor
*executor
= user_data
;
156 bool update_communication
= true;
159 case CLIENT_TRANSMISSION_STATUS_COMPLETE
:
160 DBG("Successfully sent full notification to client, client_id = %" PRIu64
,
162 update_communication
= false;
164 case CLIENT_TRANSMISSION_STATUS_QUEUED
:
165 DBG("Queued notification in client outgoing buffer, client_id = %" PRIu64
,
168 case CLIENT_TRANSMISSION_STATUS_FAIL
:
169 DBG("Communication error occurred while sending notification to client, client_id = %" PRIu64
,
173 ERR("Fatal error encoutered while sending notification to client, client_id = %" PRIu64
,
179 if (!update_communication
) {
183 /* Safe to read client's id without locking as it is immutable. */
184 ret
= notification_thread_client_communication_update(
185 executor
->notification_thread_handle
, client
->id
,
191 static int action_executor_notify_handler(struct action_executor
*executor
,
192 const struct action_work_item
*work_item
,
193 struct lttng_action
*action
)
195 return notification_client_list_send_evaluation(work_item
->client_list
,
197 work_item
->evaluation
,
198 work_item
->object_creds
.is_set
?
199 &(work_item
->object_creds
.value
) :
201 client_handle_transmission_status
, executor
);
204 static int action_executor_start_session_handler(
205 struct action_executor
*executor
,
206 const struct action_work_item
*work_item
,
207 struct lttng_action
*action
)
210 const char *session_name
;
211 enum lttng_action_status action_status
;
212 struct ltt_session
*session
;
213 enum lttng_error_code cmd_ret
;
215 action_status
= lttng_action_start_session_get_session_name(
216 action
, &session_name
);
217 if (action_status
!= LTTNG_ACTION_STATUS_OK
) {
218 ERR("Failed to get session name from `%s` action",
219 get_action_name(action
));
225 session
= session_find_by_name(session_name
);
227 DBG("Failed to find session `%s` by name while executing `%s` action of trigger `%s`",
228 session_name
, get_action_name(action
),
229 get_trigger_name(work_item
->trigger
));
230 goto error_unlock_list
;
233 session_lock(session
);
234 if (!is_trigger_allowed_for_session(work_item
->trigger
, session
)) {
235 goto error_dispose_session
;
238 cmd_ret
= cmd_start_trace(session
);
241 DBG("Successfully started session `%s` on behalf of trigger `%s`",
242 session_name
, get_trigger_name(work_item
->trigger
));
244 case LTTNG_ERR_TRACE_ALREADY_STARTED
:
245 DBG("Attempted to start session `%s` on behalf of trigger `%s` but it was already started",
246 session_name
, get_trigger_name(work_item
->trigger
));
249 WARN("Failed to start session `%s` on behalf of trigger `%s`: %s",
250 session_name
, get_trigger_name(work_item
->trigger
),
251 lttng_strerror(-cmd_ret
));
252 lttng_action_increase_execution_failure_count(action
);
256 error_dispose_session
:
257 session_unlock(session
);
258 session_put(session
);
260 session_unlock_list();
265 static int action_executor_stop_session_handler(
266 struct action_executor
*executor
,
267 const struct action_work_item
*work_item
,
268 struct lttng_action
*action
)
271 const char *session_name
;
272 enum lttng_action_status action_status
;
273 struct ltt_session
*session
;
274 enum lttng_error_code cmd_ret
;
276 action_status
= lttng_action_stop_session_get_session_name(
277 action
, &session_name
);
278 if (action_status
!= LTTNG_ACTION_STATUS_OK
) {
279 ERR("Failed to get session name from `%s` action",
280 get_action_name(action
));
286 session
= session_find_by_name(session_name
);
288 DBG("Failed to find session `%s` by name while executing `%s` action of trigger `%s`",
289 session_name
, get_action_name(action
),
290 get_trigger_name(work_item
->trigger
));
291 lttng_action_increase_execution_failure_count(action
);
292 goto error_unlock_list
;
295 session_lock(session
);
296 if (!is_trigger_allowed_for_session(work_item
->trigger
, session
)) {
297 goto error_dispose_session
;
300 cmd_ret
= cmd_stop_trace(session
);
303 DBG("Successfully stopped session `%s` on behalf of trigger `%s`",
304 session_name
, get_trigger_name(work_item
->trigger
));
306 case LTTNG_ERR_TRACE_ALREADY_STOPPED
:
307 DBG("Attempted to stop session `%s` on behalf of trigger `%s` but it was already stopped",
308 session_name
, get_trigger_name(work_item
->trigger
));
311 WARN("Failed to stop session `%s` on behalf of trigger `%s`: %s",
312 session_name
, get_trigger_name(work_item
->trigger
),
313 lttng_strerror(-cmd_ret
));
314 lttng_action_increase_execution_failure_count(action
);
318 error_dispose_session
:
319 session_unlock(session
);
320 session_put(session
);
322 session_unlock_list();
327 static int action_executor_rotate_session_handler(
328 struct action_executor
*executor
,
329 const struct action_work_item
*work_item
,
330 struct lttng_action
*action
)
333 const char *session_name
;
334 enum lttng_action_status action_status
;
335 struct ltt_session
*session
;
336 enum lttng_error_code cmd_ret
;
338 action_status
= lttng_action_rotate_session_get_session_name(
339 action
, &session_name
);
340 if (action_status
!= LTTNG_ACTION_STATUS_OK
) {
341 ERR("Failed to get session name from `%s` action",
342 get_action_name(action
));
348 session
= session_find_by_name(session_name
);
350 DBG("Failed to find session `%s` by name while executing `%s` action of trigger `%s`",
351 session_name
, get_action_name(action
),
352 get_trigger_name(work_item
->trigger
));
353 lttng_action_increase_execution_failure_count(action
);
354 goto error_unlock_list
;
357 session_lock(session
);
358 if (!is_trigger_allowed_for_session(work_item
->trigger
, session
)) {
359 goto error_dispose_session
;
362 cmd_ret
= cmd_rotate_session(session
, NULL
, false,
363 LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED
);
366 DBG("Successfully started rotation of session `%s` on behalf of trigger `%s`",
367 session_name
, get_trigger_name(work_item
->trigger
));
369 case LTTNG_ERR_ROTATION_PENDING
:
370 DBG("Attempted to start a rotation of session `%s` on behalf of trigger `%s` but a rotation is already ongoing",
371 session_name
, get_trigger_name(work_item
->trigger
));
372 lttng_action_increase_execution_failure_count(action
);
374 case LTTNG_ERR_ROTATION_MULTIPLE_AFTER_STOP
:
375 case LTTNG_ERR_ROTATION_AFTER_STOP_CLEAR
:
376 DBG("Attempted to start a rotation of session `%s` on behalf of trigger `%s` but a rotation has already been completed since the last stop or clear",
377 session_name
, get_trigger_name(work_item
->trigger
));
380 WARN("Failed to start a rotation of session `%s` on behalf of trigger `%s`: %s",
381 session_name
, get_trigger_name(work_item
->trigger
),
382 lttng_strerror(-cmd_ret
));
383 lttng_action_increase_execution_failure_count(action
);
387 error_dispose_session
:
388 session_unlock(session
);
389 session_put(session
);
391 session_unlock_list();
396 static int action_executor_snapshot_session_handler(
397 struct action_executor
*executor
,
398 const struct action_work_item
*work_item
,
399 struct lttng_action
*action
)
402 const char *session_name
;
403 enum lttng_action_status action_status
;
404 struct ltt_session
*session
;
405 const struct lttng_snapshot_output default_snapshot_output
= {
406 .max_size
= UINT64_MAX
,
408 const struct lttng_snapshot_output
*snapshot_output
=
409 &default_snapshot_output
;
410 enum lttng_error_code cmd_ret
;
412 action_status
= lttng_action_snapshot_session_get_session_name(
413 action
, &session_name
);
414 if (action_status
!= LTTNG_ACTION_STATUS_OK
) {
415 ERR("Failed to get session name from `%s` action",
416 get_action_name(action
));
421 action_status
= lttng_action_snapshot_session_get_output(
422 action
, &snapshot_output
);
423 if (action_status
!= LTTNG_ACTION_STATUS_OK
&&
424 action_status
!= LTTNG_ACTION_STATUS_UNSET
) {
425 ERR("Failed to get output from `%s` action",
426 get_action_name(action
));
432 session
= session_find_by_name(session_name
);
434 DBG("Failed to find session `%s` by name while executing `%s` action of trigger `%s`",
435 session_name
, get_action_name(action
),
436 get_trigger_name(work_item
->trigger
));
437 lttng_action_increase_execution_failure_count(action
);
438 goto error_unlock_list
;
442 session_lock(session
);
443 if (!is_trigger_allowed_for_session(work_item
->trigger
, session
)) {
444 goto error_dispose_session
;
447 cmd_ret
= cmd_snapshot_record(session
, snapshot_output
, 0);
450 DBG("Successfully recorded snapshot of session `%s` on behalf of trigger `%s`",
451 session_name
, get_trigger_name(work_item
->trigger
));
454 WARN("Failed to record snapshot of session `%s` on behalf of trigger `%s`: %s",
455 session_name
, get_trigger_name(work_item
->trigger
),
456 lttng_strerror(-cmd_ret
));
457 lttng_action_increase_execution_failure_count(action
);
461 error_dispose_session
:
462 session_unlock(session
);
463 session_put(session
);
465 session_unlock_list();
470 static int action_executor_group_handler(struct action_executor
*executor
,
471 const struct action_work_item
*work_item
,
472 struct lttng_action
*action_group
)
475 unsigned int i
, count
;
476 enum lttng_action_status action_status
;
478 action_status
= lttng_action_group_get_count(action_group
, &count
);
479 if (action_status
!= LTTNG_ACTION_STATUS_OK
) {
481 ERR("Failed to get count of action in action group");
486 DBG("Action group has %u action%s", count
, count
!= 1 ? "s" : "");
487 for (i
= 0; i
< count
; i
++) {
488 struct lttng_action
*action
=
489 lttng_action_group_borrow_mutable_at_index(
492 ret
= action_executor_generic_handler(
493 executor
, work_item
, action
);
495 ERR("Stopping the execution of the action group of trigger `%s` following a fatal error",
496 get_trigger_name(work_item
->trigger
));
504 static int action_executor_generic_handler(struct action_executor
*executor
,
505 const struct action_work_item
*work_item
,
506 struct lttng_action
*action
)
509 const enum lttng_action_type action_type
= lttng_action_get_type(action
);
511 assert(action_type
!= LTTNG_ACTION_TYPE_UNKNOWN
);
513 lttng_action_increase_execution_request_count(action
);
514 if (!lttng_action_should_execute(action
)) {
515 DBG("Policy prevented execution of action `%s` of trigger `%s` action work item %" PRIu64
,
516 get_action_name(action
),
517 get_trigger_name(work_item
->trigger
),
523 lttng_action_increase_execution_count(action
);
524 DBG("Executing action `%s` of trigger `%s` action work item %" PRIu64
,
525 get_action_name(action
),
526 get_trigger_name(work_item
->trigger
),
528 ret
= action_executors
[action_type
](executor
, work_item
, action
);
533 static int action_work_item_execute(struct action_executor
*executor
,
534 struct action_work_item
*work_item
)
537 struct lttng_action
*action
=
538 lttng_trigger_get_action(work_item
->trigger
);
540 DBG("Starting execution of action work item %" PRIu64
" of trigger `%s`",
541 work_item
->id
, get_trigger_name(work_item
->trigger
));
542 ret
= action_executor_generic_handler(executor
, work_item
, action
);
543 DBG("Completed execution of action work item %" PRIu64
" of trigger `%s`",
544 work_item
->id
, get_trigger_name(work_item
->trigger
));
548 static void action_work_item_destroy(struct action_work_item
*work_item
)
550 lttng_trigger_put(work_item
->trigger
);
551 lttng_evaluation_destroy(work_item
->evaluation
);
552 notification_client_list_put(work_item
->client_list
);
556 static void *action_executor_thread(void *_data
)
558 struct action_executor
*executor
= _data
;
562 health_register(the_health_sessiond
,
563 HEALTH_SESSIOND_TYPE_ACTION_EXECUTOR
);
565 rcu_register_thread();
568 DBG("Entering work execution loop");
569 pthread_mutex_lock(&executor
->work
.lock
);
570 while (!executor
->should_quit
) {
572 struct action_work_item
*work_item
;
574 health_code_update();
575 if (executor
->work
.pending_count
== 0) {
577 DBG("No work items enqueued, entering wait");
578 pthread_cond_wait(&executor
->work
.cond
,
579 &executor
->work
.lock
);
580 DBG("Woke-up from wait");
585 /* Pop item from front of the list with work lock held. */
586 work_item
= cds_list_first_entry(&executor
->work
.list
,
587 struct action_work_item
, list_node
);
588 cds_list_del(&work_item
->list_node
);
589 executor
->work
.pending_count
--;
592 * Work can be performed without holding the work lock,
593 * allowing new items to be queued.
595 pthread_mutex_unlock(&executor
->work
.lock
);
596 ret
= action_work_item_execute(executor
, work_item
);
597 action_work_item_destroy(work_item
);
603 health_code_update();
604 pthread_mutex_lock(&executor
->work
.lock
);
607 if (executor
->should_quit
) {
608 pthread_mutex_unlock(&executor
->work
.lock
);
610 DBG("Left work execution loop");
612 health_code_update();
614 rcu_thread_offline();
615 rcu_unregister_thread();
616 health_unregister(the_health_sessiond
);
621 static bool shutdown_action_executor_thread(void *_data
)
623 struct action_executor
*executor
= _data
;
625 pthread_mutex_lock(&executor
->work
.lock
);
626 executor
->should_quit
= true;
627 pthread_cond_signal(&executor
->work
.cond
);
628 pthread_mutex_unlock(&executor
->work
.lock
);
632 static void clean_up_action_executor_thread(void *_data
)
634 struct action_executor
*executor
= _data
;
636 assert(cds_list_empty(&executor
->work
.list
));
638 pthread_mutex_destroy(&executor
->work
.lock
);
639 pthread_cond_destroy(&executor
->work
.cond
);
643 struct action_executor
*action_executor_create(
644 struct notification_thread_handle
*handle
)
646 struct action_executor
*executor
= zmalloc(sizeof(*executor
));
652 CDS_INIT_LIST_HEAD(&executor
->work
.list
);
653 pthread_cond_init(&executor
->work
.cond
, NULL
);
654 pthread_mutex_init(&executor
->work
.lock
, NULL
);
655 executor
->notification_thread_handle
= handle
;
657 executor
->thread
= lttng_thread_create(THREAD_NAME
,
658 action_executor_thread
, shutdown_action_executor_thread
,
659 clean_up_action_executor_thread
, executor
);
664 void action_executor_destroy(struct action_executor
*executor
)
666 struct action_work_item
*work_item
, *tmp
;
668 /* TODO Wait for work list to drain? */
669 lttng_thread_shutdown(executor
->thread
);
670 pthread_mutex_lock(&executor
->work
.lock
);
671 if (executor
->work
.pending_count
!= 0) {
673 " trigger action%s still queued for execution and will be discarded",
674 executor
->work
.pending_count
,
675 executor
->work
.pending_count
== 1 ? " is" :
679 cds_list_for_each_entry_safe (
680 work_item
, tmp
, &executor
->work
.list
, list_node
) {
681 WARN("Discarding action work item %" PRIu64
682 " associated to trigger `%s`",
683 work_item
->id
, get_trigger_name(work_item
->trigger
));
684 cds_list_del(&work_item
->list_node
);
685 action_work_item_destroy(work_item
);
687 pthread_mutex_unlock(&executor
->work
.lock
);
688 lttng_thread_put(executor
->thread
);
691 /* RCU read-lock must be held by the caller. */
692 enum action_executor_status
action_executor_enqueue(
693 struct action_executor
*executor
,
694 struct lttng_trigger
*trigger
,
695 struct lttng_evaluation
*evaluation
,
696 const struct lttng_credentials
*object_creds
,
697 struct notification_client_list
*client_list
)
699 enum action_executor_status executor_status
= ACTION_EXECUTOR_STATUS_OK
;
700 const uint64_t work_item_id
= executor
->next_work_item_id
++;
701 struct action_work_item
*work_item
;
704 pthread_mutex_lock(&executor
->work
.lock
);
705 /* Check for queue overflow. */
706 if (executor
->work
.pending_count
>= MAX_QUEUED_WORK_COUNT
) {
707 /* Most likely spammy, remove if it is the case. */
708 DBG("Refusing to enqueue action for trigger `%s` as work item %" PRIu64
709 " (overflow)", get_trigger_name(trigger
), work_item_id
);
710 executor_status
= ACTION_EXECUTOR_STATUS_OVERFLOW
;
714 work_item
= zmalloc(sizeof(*work_item
));
716 PERROR("Failed to allocate action executor work item on behalf of trigger `%s`",
717 get_trigger_name(trigger
));
718 executor_status
= ACTION_EXECUTOR_STATUS_ERROR
;
722 lttng_trigger_get(trigger
);
724 const bool reference_acquired
=
725 notification_client_list_get(client_list
);
727 assert(reference_acquired
);
730 *work_item
= (typeof(*work_item
)){
733 /* Ownership transferred to the work item. */
734 .evaluation
= evaluation
,
736 .is_set
= !!object_creds
,
737 .value
= object_creds
? *object_creds
:
738 (typeof(work_item
->object_creds
.value
)) {},
740 .client_list
= client_list
,
741 .list_node
= CDS_LIST_HEAD_INIT(work_item
->list_node
),
745 cds_list_add_tail(&work_item
->list_node
, &executor
->work
.list
);
746 executor
->work
.pending_count
++;
747 DBG("Enqueued action for trigger `%s` as work item #%" PRIu64
,
748 get_trigger_name(trigger
), work_item_id
);
753 pthread_cond_signal(&executor
->work
.cond
);
755 pthread_mutex_unlock(&executor
->work
.lock
);
757 lttng_evaluation_destroy(evaluation
);
758 return executor_status
;