action: Mark parameter of lttng_action_get_type as const
[lttng-tools.git] / src / bin / lttng-sessiond / action-executor.c
CommitLineData
f2b3ef9f
JG
1/*
2 * Copyright (C) 2020 Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * SPDX-License-Identifier: GPL-2.0-only
5 *
6 */
7
8#include "action-executor.h"
9#include "cmd.h"
10#include "health-sessiond.h"
11#include "lttng-sessiond.h"
12#include "notification-thread-internal.h"
13#include "session.h"
14#include "thread.h"
15#include <common/macros.h>
16#include <common/optional.h>
17#include <lttng/action/action-internal.h>
18#include <lttng/action/group.h>
19#include <lttng/action/notify.h>
20#include <lttng/action/rotate-session.h>
21#include <lttng/action/snapshot-session.h>
22#include <lttng/action/start-session.h>
23#include <lttng/action/stop-session.h>
24#include <lttng/condition/evaluation.h>
25#include <lttng/lttng-error.h>
26#include <lttng/trigger/trigger-internal.h>
27#include <pthread.h>
28#include <stdbool.h>
29#include <stddef.h>
30#include <urcu/list.h>
31
32#define THREAD_NAME "Action Executor"
33#define MAX_QUEUED_WORK_COUNT 8192
34
35struct action_work_item {
36 uint64_t id;
37 struct lttng_trigger *trigger;
38 struct lttng_evaluation *evaluation;
39 struct notification_client_list *client_list;
40 LTTNG_OPTIONAL(struct lttng_credentials) object_creds;
41 struct cds_list_head list_node;
42};
43
44struct action_executor {
45 struct lttng_thread *thread;
46 struct notification_thread_handle *notification_thread_handle;
47 struct {
48 uint64_t pending_count;
49 struct cds_list_head list;
50 pthread_cond_t cond;
51 pthread_mutex_t lock;
52 } work;
53 bool should_quit;
54 uint64_t next_work_item_id;
55};
56
57/*
58 * Only return non-zero on a fatal error that should shut down the action
59 * executor.
60 */
61typedef int (*action_executor_handler)(struct action_executor *executor,
62 const struct action_work_item *,
63 const struct lttng_action *action);
64
65static int action_executor_notify_handler(struct action_executor *executor,
66 const struct action_work_item *,
67 const struct lttng_action *);
68static int action_executor_start_session_handler(struct action_executor *executor,
69 const struct action_work_item *,
70 const struct lttng_action *);
71static int action_executor_stop_session_handler(struct action_executor *executor,
72 const struct action_work_item *,
73 const struct lttng_action *);
74static int action_executor_rotate_session_handler(struct action_executor *executor,
75 const struct action_work_item *,
76 const struct lttng_action *);
77static int action_executor_snapshot_session_handler(struct action_executor *executor,
78 const struct action_work_item *,
79 const struct lttng_action *);
80static int action_executor_group_handler(struct action_executor *executor,
81 const struct action_work_item *,
82 const struct lttng_action *);
83static int action_executor_generic_handler(struct action_executor *executor,
84 const struct action_work_item *,
85 const struct lttng_action *);
86
87static const action_executor_handler action_executors[] = {
88 [LTTNG_ACTION_TYPE_NOTIFY] = action_executor_notify_handler,
89 [LTTNG_ACTION_TYPE_START_SESSION] = action_executor_start_session_handler,
90 [LTTNG_ACTION_TYPE_STOP_SESSION] = action_executor_stop_session_handler,
91 [LTTNG_ACTION_TYPE_ROTATE_SESSION] = action_executor_rotate_session_handler,
92 [LTTNG_ACTION_TYPE_SNAPSHOT_SESSION] = action_executor_snapshot_session_handler,
93 [LTTNG_ACTION_TYPE_GROUP] = action_executor_group_handler,
94};
95
96static const char *action_type_names[] = {
97 [LTTNG_ACTION_TYPE_NOTIFY] = "Notify",
98 [LTTNG_ACTION_TYPE_START_SESSION] = "Start session",
99 [LTTNG_ACTION_TYPE_STOP_SESSION] = "Stop session",
100 [LTTNG_ACTION_TYPE_ROTATE_SESSION] = "Rotate session",
101 [LTTNG_ACTION_TYPE_SNAPSHOT_SESSION] = "Snapshot session",
102 [LTTNG_ACTION_TYPE_GROUP] = "Group",
103};
104
105static const char *get_action_name(const struct lttng_action *action)
106{
17182cfd 107 return action_type_names[lttng_action_get_type(action)];
f2b3ef9f
JG
108}
109
110/* Check if this trigger allowed to interect with a given session. */
111static bool is_trigger_allowed_for_session(const struct lttng_trigger *trigger,
112 struct ltt_session *session)
113{
114 bool is_allowed = false;
115 const struct lttng_credentials session_creds = {
116 .uid = session->uid,
117 .gid = session->gid,
118 };
119 /* Can never be NULL. */
120 const struct lttng_credentials *trigger_creds =
121 lttng_trigger_get_credentials(trigger);
122
123 is_allowed = (trigger_creds->uid == session_creds.uid) ||
124 (trigger_creds->uid == 0);
125 if (!is_allowed) {
126 WARN("Trigger is not allowed to interact with session `%s`: session uid = %ld, session gid = %ld, trigger uid = %ld, trigger gid = %ld",
127 session->name,
128 (long int) session->uid,
129 (long int) session->gid,
130 (long int) trigger_creds->uid,
131 (long int) trigger_creds->gid);
132 }
133
134 return is_allowed;
135}
136
137static int client_handle_transmission_status(
138 struct notification_client *client,
139 enum client_transmission_status status,
140 void *user_data)
141{
142 int ret = 0;
143 struct action_executor *executor = user_data;
144 bool update_communication = true;
145
f2b3ef9f
JG
146 switch (status) {
147 case CLIENT_TRANSMISSION_STATUS_COMPLETE:
148 DBG("Successfully sent full notification to client, client_id = %" PRIu64,
149 client->id);
150 update_communication = false;
151 break;
152 case CLIENT_TRANSMISSION_STATUS_QUEUED:
153 DBG("Queued notification in client outgoing buffer, client_id = %" PRIu64,
154 client->id);
155 break;
156 case CLIENT_TRANSMISSION_STATUS_FAIL:
157 DBG("Communication error occurred while sending notification to client, client_id = %" PRIu64,
158 client->id);
f2b3ef9f
JG
159 break;
160 default:
161 ERR("Fatal error encoutered while sending notification to client, client_id = %" PRIu64,
162 client->id);
f2b3ef9f
JG
163 ret = -1;
164 goto end;
165 }
166
167 if (!update_communication) {
168 goto end;
169 }
170
6c24d3fd 171 /* Safe to read client's id without locking as it is immutable. */
f2b3ef9f
JG
172 ret = notification_thread_client_communication_update(
173 executor->notification_thread_handle, client->id,
174 status);
175end:
176 return ret;
177}
178
179static int action_executor_notify_handler(struct action_executor *executor,
180 const struct action_work_item *work_item,
181 const struct lttng_action *action)
182{
183 return notification_client_list_send_evaluation(work_item->client_list,
184 lttng_trigger_get_const_condition(work_item->trigger),
185 work_item->evaluation,
186 lttng_trigger_get_credentials(work_item->trigger),
187 LTTNG_OPTIONAL_GET_PTR(work_item->object_creds),
188 client_handle_transmission_status,
189 executor);
190}
191
192static int action_executor_start_session_handler(struct action_executor *executor,
193 const struct action_work_item *work_item,
194 const struct lttng_action *action)
195{
196 int ret = 0;
197 const char *session_name;
198 enum lttng_action_status action_status;
199 struct ltt_session *session;
200 enum lttng_error_code cmd_ret;
201
202 action_status = lttng_action_start_session_get_session_name(
203 action, &session_name);
204 if (action_status != LTTNG_ACTION_STATUS_OK) {
205 ERR("Failed to get session name from `%s` action",
206 get_action_name(action));
207 ret = -1;
208 goto end;
209 }
210
211 session_lock_list();
212 session = session_find_by_name(session_name);
213 if (!session) {
214 DBG("Failed to find session `%s` by name while executing `%s` action of trigger `%p`",
215 session_name, get_action_name(action),
216 work_item->trigger);
217 goto error_unlock_list;
218 }
219
220 session_lock(session);
221 if (!is_trigger_allowed_for_session(work_item->trigger, session)) {
222 goto error_dispose_session;
223 }
224
225 cmd_ret = cmd_start_trace(session);
226 switch (cmd_ret) {
227 case LTTNG_OK:
228 DBG("Successfully started session `%s` on behalf of trigger `%p`",
229 session_name, work_item->trigger);
230 break;
231 case LTTNG_ERR_TRACE_ALREADY_STARTED:
232 DBG("Attempted to start session `%s` on behalf of trigger `%p` but it was already started",
233 session_name, work_item->trigger);
234 break;
235 default:
236 WARN("Failed to start session `%s` on behalf of trigger `%p`: %s",
237 session_name, work_item->trigger,
238 lttng_strerror(-cmd_ret));
239 break;
240 }
241
242error_dispose_session:
243 session_unlock(session);
244 session_put(session);
245error_unlock_list:
246 session_unlock_list();
247end:
248 return ret;
249}
250
251static int action_executor_stop_session_handler(struct action_executor *executor,
252 const struct action_work_item *work_item,
253 const struct lttng_action *action)
254{
255 int ret = 0;
256 const char *session_name;
257 enum lttng_action_status action_status;
258 struct ltt_session *session;
259 enum lttng_error_code cmd_ret;
260
261 action_status = lttng_action_stop_session_get_session_name(
262 action, &session_name);
263 if (action_status != LTTNG_ACTION_STATUS_OK) {
264 ERR("Failed to get session name from `%s` action",
265 get_action_name(action));
266 ret = -1;
267 goto end;
268 }
269
270 session_lock_list();
271 session = session_find_by_name(session_name);
272 if (!session) {
273 DBG("Failed to find session `%s` by name while executing `%s` action of trigger `%p`",
274 session_name, get_action_name(action),
275 work_item->trigger);
276 goto error_unlock_list;
277 }
278
279 session_lock(session);
280 if (!is_trigger_allowed_for_session(work_item->trigger, session)) {
281 goto error_dispose_session;
282 }
283
284 cmd_ret = cmd_stop_trace(session);
285 switch (cmd_ret) {
286 case LTTNG_OK:
287 DBG("Successfully stopped session `%s` on behalf of trigger `%p`",
288 session_name, work_item->trigger);
289 break;
290 case LTTNG_ERR_TRACE_ALREADY_STOPPED:
291 DBG("Attempted to stop session `%s` on behalf of trigger `%p` but it was already stopped",
292 session_name, work_item->trigger);
293 break;
294 default:
295 WARN("Failed to stop session `%s` on behalf of trigger `%p`: %s",
296 session_name, work_item->trigger,
297 lttng_strerror(-cmd_ret));
298 break;
299 }
300
301error_dispose_session:
302 session_unlock(session);
303 session_put(session);
304error_unlock_list:
305 session_unlock_list();
306end:
307 return ret;
308}
309
310static int action_executor_rotate_session_handler(struct action_executor *executor,
311 const struct action_work_item *work_item,
312 const struct lttng_action *action)
313{
314 int ret = 0;
315 const char *session_name;
316 enum lttng_action_status action_status;
317 struct ltt_session *session;
318 enum lttng_error_code cmd_ret;
319
320 action_status = lttng_action_rotate_session_get_session_name(
321 action, &session_name);
322 if (action_status != LTTNG_ACTION_STATUS_OK) {
323 ERR("Failed to get session name from `%s` action",
324 get_action_name(action));
325 ret = -1;
326 goto end;
327 }
328
329 session_lock_list();
330 session = session_find_by_name(session_name);
331 if (!session) {
332 DBG("Failed to find session `%s` by name while executing `%s` action of trigger `%p`",
333 session_name, get_action_name(action),
334 work_item->trigger);
335 goto error_unlock_list;
336 }
337
338 session_lock(session);
339 if (!is_trigger_allowed_for_session(work_item->trigger, session)) {
340 goto error_dispose_session;
341 }
342
343 cmd_ret = cmd_rotate_session(session, NULL, false,
344 LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
345 switch (cmd_ret) {
346 case LTTNG_OK:
347 DBG("Successfully started rotation of session `%s` on behalf of trigger `%p`",
348 session_name, work_item->trigger);
349 break;
350 case LTTNG_ERR_ROTATION_PENDING:
351 DBG("Attempted to start a rotation of session `%s` on behalf of trigger `%p` but a rotation is already ongoing",
352 session_name, work_item->trigger);
353 break;
354 case LTTNG_ERR_ROTATION_MULTIPLE_AFTER_STOP:
355 case LTTNG_ERR_ROTATION_AFTER_STOP_CLEAR:
356 DBG("Attempted to start a rotation of session `%s` on behalf of trigger `%p` but a rotation has already been completed since the last stop or clear",
357 session_name, work_item->trigger);
358 break;
359 default:
360 WARN("Failed to start a rotation of session `%s` on behalf of trigger `%p`: %s",
361 session_name, work_item->trigger,
362 lttng_strerror(-cmd_ret));
363 break;
364 }
365
366error_dispose_session:
367 session_unlock(session);
368 session_put(session);
369error_unlock_list:
370 session_unlock_list();
371end:
372 return ret;
373}
374
375static int action_executor_snapshot_session_handler(struct action_executor *executor,
376 const struct action_work_item *work_item,
377 const struct lttng_action *action)
378{
379 int ret = 0;
380 const char *session_name;
381 enum lttng_action_status action_status;
382 struct ltt_session *session;
383 const struct lttng_snapshot_output default_snapshot_output = {
384 .max_size = UINT64_MAX,
385 };
386 const struct lttng_snapshot_output *snapshot_output =
387 &default_snapshot_output;
388 enum lttng_error_code cmd_ret;
389
390 action_status = lttng_action_snapshot_session_get_session_name(
391 action, &session_name);
392 if (action_status != LTTNG_ACTION_STATUS_OK) {
393 ERR("Failed to get session name from `%s` action",
394 get_action_name(action));
395 ret = -1;
396 goto end;
397 }
398
399 action_status = lttng_action_snapshot_session_get_output(
400 action, &snapshot_output);
401 if (action_status != LTTNG_ACTION_STATUS_OK &&
402 action_status != LTTNG_ACTION_STATUS_UNSET) {
403 ERR("Failed to get output from `%s` action",
404 get_action_name(action));
405 ret = -1;
406 goto end;
407 }
408
409 session_lock_list();
410 session = session_find_by_name(session_name);
411 if (!session) {
412 DBG("Failed to find session `%s` by name while executing `%s` action of trigger `%p`",
413 session_name, get_action_name(action),
414 work_item->trigger);
415 goto error_unlock_list;
416 }
417
418
419 session_lock(session);
420 if (!is_trigger_allowed_for_session(work_item->trigger, session)) {
421 goto error_dispose_session;
422 }
423
424 cmd_ret = cmd_snapshot_record(session, snapshot_output, 0);
425 switch (cmd_ret) {
426 case LTTNG_OK:
427 DBG("Successfully recorded snapshot of session `%s` on behalf of trigger `%p`",
428 session_name, work_item->trigger);
429 break;
430 default:
431 WARN("Failed to record snapshot of session `%s` on behalf of trigger `%p`: %s",
432 session_name, work_item->trigger,
433 lttng_strerror(-cmd_ret));
434 break;
435 }
436
437error_dispose_session:
438 session_unlock(session);
439 session_put(session);
440error_unlock_list:
441 session_unlock_list();
442end:
443 return ret;
444}
445
446static int action_executor_group_handler(struct action_executor *executor,
447 const struct action_work_item *work_item,
448 const struct lttng_action *action_group)
449{
450 int ret = 0;
451 unsigned int i, count;
452 enum lttng_action_status action_status;
453
454 action_status = lttng_action_group_get_count(action_group, &count);
455 if (action_status != LTTNG_ACTION_STATUS_OK) {
456 /* Fatal error. */
457 ERR("Failed to get count of action in action group");
458 ret = -1;
459 goto end;
460 }
461
462 DBG("Action group has %u action%s", count, count != 1 ? "s" : "");
463 for (i = 0; i < count; i++) {
464 const struct lttng_action *action =
465 lttng_action_group_get_at_index(
466 action_group, i);
467
468 ret = action_executor_generic_handler(
469 executor, work_item, action);
470 if (ret) {
471 ERR("Stopping the execution of the action group of trigger `%p` following a fatal error",
472 work_item->trigger);
473 goto end;
474 }
475 }
476end:
477 return ret;
478}
479
480static int action_executor_generic_handler(struct action_executor *executor,
481 const struct action_work_item *work_item,
482 const struct lttng_action *action)
483{
484 DBG("Executing action `%s` of trigger `%p` action work item %" PRIu64,
485 get_action_name(action),
486 work_item->trigger,
487 work_item->id);
488
17182cfd 489 return action_executors[lttng_action_get_type(action)](
f2b3ef9f
JG
490 executor, work_item, action);
491}
492
493static int action_work_item_execute(struct action_executor *executor,
494 struct action_work_item *work_item)
495{
496 int ret;
497 const struct lttng_action *action =
498 lttng_trigger_get_const_action(work_item->trigger);
499
500 DBG("Starting execution of action work item %" PRIu64 " of trigger `%p`",
501 work_item->id, work_item->trigger);
502 ret = action_executor_generic_handler(executor, work_item, action);
503 DBG("Completed execution of action work item %" PRIu64 " of trigger `%p`",
504 work_item->id, work_item->trigger);
505 return ret;
506}
507
508static void action_work_item_destroy(struct action_work_item *work_item)
509{
510 lttng_trigger_put(work_item->trigger);
511 lttng_evaluation_destroy(work_item->evaluation);
512 notification_client_list_put(work_item->client_list);
513 free(work_item);
514}
515
516static void *action_executor_thread(void *_data)
517{
518 struct action_executor *executor = _data;
519
520 assert(executor);
521
522 health_register(health_sessiond, HEALTH_SESSIOND_TYPE_ACTION_EXECUTOR);
523
524 rcu_register_thread();
525 rcu_thread_online();
526
527 DBG("Entering work execution loop");
528 pthread_mutex_lock(&executor->work.lock);
529 while (!executor->should_quit) {
530 int ret;
531 struct action_work_item *work_item;
532
533 health_code_update();
534 if (executor->work.pending_count == 0) {
535 health_poll_entry();
536 DBG("No work items enqueued, entering wait");
537 pthread_cond_wait(&executor->work.cond,
538 &executor->work.lock);
539 DBG("Woke-up from wait");
540 health_poll_exit();
541 continue;
542 }
543
544 /* Pop item from front of the listwith work lock held. */
545 work_item = cds_list_first_entry(&executor->work.list,
546 struct action_work_item, list_node);
547 cds_list_del(&work_item->list_node);
548 executor->work.pending_count--;
549
550 /*
551 * Work can be performed without holding the work lock,
552 * allowing new items to be queued.
553 */
554 pthread_mutex_unlock(&executor->work.lock);
555 ret = action_work_item_execute(executor, work_item);
556 action_work_item_destroy(work_item);
557 if (ret) {
558 /* Fatal error. */
559 break;
560 }
561
562 health_code_update();
563 pthread_mutex_lock(&executor->work.lock);
564 }
565
f5f5c54d
JG
566 if (executor->should_quit) {
567 pthread_mutex_unlock(&executor->work.lock);
568 }
f2b3ef9f
JG
569 DBG("Left work execution loop");
570
571 health_code_update();
572
573 rcu_thread_offline();
574 rcu_unregister_thread();
575 health_unregister(health_sessiond);
576
577 return NULL;
578}
579
580static bool shutdown_action_executor_thread(void *_data)
581{
582 struct action_executor *executor = _data;
583
584 executor->should_quit = true;
585 pthread_cond_signal(&executor->work.cond);
586 return true;
587}
588
589static void clean_up_action_executor_thread(void *_data)
590{
591 struct action_executor *executor = _data;
592
593 assert(cds_list_empty(&executor->work.list));
594
595 pthread_mutex_destroy(&executor->work.lock);
596 pthread_cond_destroy(&executor->work.cond);
597 free(executor);
598}
599
600struct action_executor *action_executor_create(
601 struct notification_thread_handle *handle)
602{
603 struct action_executor *executor = zmalloc(sizeof(*executor));
604
605 if (!executor) {
606 goto end;
607 }
608
609 CDS_INIT_LIST_HEAD(&executor->work.list);
610 pthread_cond_init(&executor->work.cond, NULL);
611 pthread_mutex_init(&executor->work.lock, NULL);
612 executor->notification_thread_handle = handle;
613
614 executor->thread = lttng_thread_create(THREAD_NAME,
615 action_executor_thread, shutdown_action_executor_thread,
616 clean_up_action_executor_thread, executor);
617end:
618 return executor;
619}
620
621void action_executor_destroy(struct action_executor *executor)
622{
623 struct action_work_item *work_item, *tmp;
624
625 /* TODO Wait for work list to drain? */
626 lttng_thread_shutdown(executor->thread);
627 pthread_mutex_lock(&executor->work.lock);
628 if (executor->work.pending_count != 0) {
629 WARN("%" PRIu64
630 " trigger action%s still queued for execution and will be discarded",
631 executor->work.pending_count,
632 executor->work.pending_count == 1 ? " is" :
633 "s are");
634 }
635
636 cds_list_for_each_entry_safe (
637 work_item, tmp, &executor->work.list, list_node) {
638 WARN("Discarding action work item %" PRIu64
639 " associated to trigger `%p`",
640 work_item->id, work_item->trigger);
641 cds_list_del(&work_item->list_node);
642 action_work_item_destroy(work_item);
643 }
644 pthread_mutex_unlock(&executor->work.lock);
645 lttng_thread_put(executor->thread);
646}
647
648/* RCU read-lock must be held by the caller. */
649enum action_executor_status action_executor_enqueue(
650 struct action_executor *executor,
651 struct lttng_trigger *trigger,
652 struct lttng_evaluation *evaluation,
653 const struct lttng_credentials *object_creds,
654 struct notification_client_list *client_list)
655{
656 enum action_executor_status executor_status = ACTION_EXECUTOR_STATUS_OK;
657 const uint64_t work_item_id = executor->next_work_item_id++;
658 struct action_work_item *work_item;
659 bool signal = false;
660
661 pthread_mutex_lock(&executor->work.lock);
662 /* Check for queue overflow. */
663 if (executor->work.pending_count >= MAX_QUEUED_WORK_COUNT) {
664 /* Most likely spammy, remove if it is the case. */
665 DBG("Refusing to enqueue action for trigger `%p` as work item %" PRIu64
666 " (overflow)",
667 trigger, work_item_id);
668 executor_status = ACTION_EXECUTOR_STATUS_OVERFLOW;
669 goto error_unlock;
670 }
671
672 work_item = zmalloc(sizeof(*work_item));
673 if (!work_item) {
674 PERROR("Failed to allocate action executor work item on behalf of trigger `%p`",
675 trigger);
676 executor_status = ACTION_EXECUTOR_STATUS_ERROR;
677 goto error_unlock;
678 }
679
680 lttng_trigger_get(trigger);
681 if (client_list) {
682 const bool reference_acquired =
683 notification_client_list_get(client_list);
684
685 assert(reference_acquired);
686 }
687
688 *work_item = (typeof(*work_item)){
689 .id = work_item_id,
690 .trigger = trigger,
691 /* Ownership transferred to the work item. */
692 .evaluation = evaluation,
693 .object_creds = {
694 .is_set = !!object_creds,
695 .value = object_creds ? *object_creds :
696 (typeof(work_item->object_creds.value)) {},
697 },
698 .client_list = client_list,
699 .list_node = CDS_LIST_HEAD_INIT(work_item->list_node),
700 };
701
702 evaluation = NULL;
703 cds_list_add_tail(&work_item->list_node, &executor->work.list);
704 executor->work.pending_count++;
705 DBG("Enqueued action for trigger `%p` as work item %" PRIu64,
706 trigger, work_item_id);
707 signal = true;
708
709error_unlock:
710 pthread_mutex_unlock(&executor->work.lock);
711 if (signal) {
712 pthread_cond_signal(&executor->work.cond);
713 }
714
715 lttng_evaluation_destroy(evaluation);
716 return executor_status;
717}
This page took 0.077554 seconds and 4 git commands to generate.