c51bb7f639cd7143d3301fbac8c2b385715ece11
[lttng-tools.git] / src / bin / lttng-sessiond / action-executor.c
1 /*
2 * Copyright (C) 2020 Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * SPDX-License-Identifier: GPL-2.0-only
5 *
6 */
7
8 #include "action-executor.h"
9 #include "cmd.h"
10 #include "health-sessiond.h"
11 #include "lttng-sessiond.h"
12 #include "notification-thread-internal.h"
13 #include "session.h"
14 #include "thread.h"
15 #include <common/macros.h>
16 #include <common/optional.h>
17 #include <lttng/action/action-internal.h>
18 #include <lttng/action/group.h>
19 #include <lttng/action/notify.h>
20 #include <lttng/action/rotate-session.h>
21 #include <lttng/action/snapshot-session.h>
22 #include <lttng/action/start-session.h>
23 #include <lttng/action/stop-session.h>
24 #include <lttng/condition/evaluation.h>
25 #include <lttng/lttng-error.h>
26 #include <lttng/trigger/trigger-internal.h>
27 #include <pthread.h>
28 #include <stdbool.h>
29 #include <stddef.h>
30 #include <urcu/list.h>
31
32 #define THREAD_NAME "Action Executor"
33 #define MAX_QUEUED_WORK_COUNT 8192
34
35 struct action_work_item {
36 uint64_t id;
37 struct lttng_trigger *trigger;
38 struct lttng_evaluation *evaluation;
39 struct notification_client_list *client_list;
40 LTTNG_OPTIONAL(struct lttng_credentials) object_creds;
41 struct cds_list_head list_node;
42 };
43
44 struct action_executor {
45 struct lttng_thread *thread;
46 struct notification_thread_handle *notification_thread_handle;
47 struct {
48 uint64_t pending_count;
49 struct cds_list_head list;
50 pthread_cond_t cond;
51 pthread_mutex_t lock;
52 } work;
53 bool should_quit;
54 uint64_t next_work_item_id;
55 };
56
57 /*
58 * Only return non-zero on a fatal error that should shut down the action
59 * executor.
60 */
61 typedef int (*action_executor_handler)(struct action_executor *executor,
62 const struct action_work_item *,
63 const struct lttng_action *action);
64
65 static int action_executor_notify_handler(struct action_executor *executor,
66 const struct action_work_item *,
67 const struct lttng_action *);
68 static int action_executor_start_session_handler(struct action_executor *executor,
69 const struct action_work_item *,
70 const struct lttng_action *);
71 static int action_executor_stop_session_handler(struct action_executor *executor,
72 const struct action_work_item *,
73 const struct lttng_action *);
74 static int action_executor_rotate_session_handler(struct action_executor *executor,
75 const struct action_work_item *,
76 const struct lttng_action *);
77 static int action_executor_snapshot_session_handler(struct action_executor *executor,
78 const struct action_work_item *,
79 const struct lttng_action *);
80 static int action_executor_group_handler(struct action_executor *executor,
81 const struct action_work_item *,
82 const struct lttng_action *);
83 static int action_executor_generic_handler(struct action_executor *executor,
84 const struct action_work_item *,
85 const struct lttng_action *);
86
87 static const action_executor_handler action_executors[] = {
88 [LTTNG_ACTION_TYPE_NOTIFY] = action_executor_notify_handler,
89 [LTTNG_ACTION_TYPE_START_SESSION] = action_executor_start_session_handler,
90 [LTTNG_ACTION_TYPE_STOP_SESSION] = action_executor_stop_session_handler,
91 [LTTNG_ACTION_TYPE_ROTATE_SESSION] = action_executor_rotate_session_handler,
92 [LTTNG_ACTION_TYPE_SNAPSHOT_SESSION] = action_executor_snapshot_session_handler,
93 [LTTNG_ACTION_TYPE_GROUP] = action_executor_group_handler,
94 };
95
96 static const char *action_type_names[] = {
97 [LTTNG_ACTION_TYPE_NOTIFY] = "Notify",
98 [LTTNG_ACTION_TYPE_START_SESSION] = "Start session",
99 [LTTNG_ACTION_TYPE_STOP_SESSION] = "Stop session",
100 [LTTNG_ACTION_TYPE_ROTATE_SESSION] = "Rotate session",
101 [LTTNG_ACTION_TYPE_SNAPSHOT_SESSION] = "Snapshot session",
102 [LTTNG_ACTION_TYPE_GROUP] = "Group",
103 };
104
105 static const char *get_action_name(const struct lttng_action *action)
106 {
107 return action_type_names[lttng_action_get_type_const(action)];
108 }
109
110 /* Check if this trigger allowed to interect with a given session. */
111 static bool is_trigger_allowed_for_session(const struct lttng_trigger *trigger,
112 struct ltt_session *session)
113 {
114 bool is_allowed = false;
115 const struct lttng_credentials session_creds = {
116 .uid = session->uid,
117 .gid = session->gid,
118 };
119 /* Can never be NULL. */
120 const struct lttng_credentials *trigger_creds =
121 lttng_trigger_get_credentials(trigger);
122
123 is_allowed = (trigger_creds->uid == session_creds.uid) ||
124 (trigger_creds->uid == 0);
125 if (!is_allowed) {
126 WARN("Trigger is not allowed to interact with session `%s`: session uid = %ld, session gid = %ld, trigger uid = %ld, trigger gid = %ld",
127 session->name,
128 (long int) session->uid,
129 (long int) session->gid,
130 (long int) trigger_creds->uid,
131 (long int) trigger_creds->gid);
132 }
133
134 return is_allowed;
135 }
136
137 static int client_handle_transmission_status(
138 struct notification_client *client,
139 enum client_transmission_status status,
140 void *user_data)
141 {
142 int ret = 0;
143 struct action_executor *executor = user_data;
144 bool update_communication = true;
145
146 ASSERT_LOCKED(client->lock);
147
148 switch (status) {
149 case CLIENT_TRANSMISSION_STATUS_COMPLETE:
150 DBG("Successfully sent full notification to client, client_id = %" PRIu64,
151 client->id);
152 update_communication = false;
153 break;
154 case CLIENT_TRANSMISSION_STATUS_QUEUED:
155 DBG("Queued notification in client outgoing buffer, client_id = %" PRIu64,
156 client->id);
157 break;
158 case CLIENT_TRANSMISSION_STATUS_FAIL:
159 DBG("Communication error occurred while sending notification to client, client_id = %" PRIu64,
160 client->id);
161 client->communication.active = false;
162 break;
163 default:
164 ERR("Fatal error encoutered while sending notification to client, client_id = %" PRIu64,
165 client->id);
166 client->communication.active = false;
167 ret = -1;
168 goto end;
169 }
170
171 if (!update_communication) {
172 goto end;
173 }
174
175 ret = notification_thread_client_communication_update(
176 executor->notification_thread_handle, client->id,
177 status);
178 end:
179 return ret;
180 }
181
182 static int action_executor_notify_handler(struct action_executor *executor,
183 const struct action_work_item *work_item,
184 const struct lttng_action *action)
185 {
186 return notification_client_list_send_evaluation(work_item->client_list,
187 lttng_trigger_get_const_condition(work_item->trigger),
188 work_item->evaluation,
189 lttng_trigger_get_credentials(work_item->trigger),
190 LTTNG_OPTIONAL_GET_PTR(work_item->object_creds),
191 client_handle_transmission_status,
192 executor);
193 }
194
195 static int action_executor_start_session_handler(struct action_executor *executor,
196 const struct action_work_item *work_item,
197 const struct lttng_action *action)
198 {
199 int ret = 0;
200 const char *session_name;
201 enum lttng_action_status action_status;
202 struct ltt_session *session;
203 enum lttng_error_code cmd_ret;
204
205 action_status = lttng_action_start_session_get_session_name(
206 action, &session_name);
207 if (action_status != LTTNG_ACTION_STATUS_OK) {
208 ERR("Failed to get session name from `%s` action",
209 get_action_name(action));
210 ret = -1;
211 goto end;
212 }
213
214 session_lock_list();
215 session = session_find_by_name(session_name);
216 if (!session) {
217 DBG("Failed to find session `%s` by name while executing `%s` action of trigger `%p`",
218 session_name, get_action_name(action),
219 work_item->trigger);
220 goto error_unlock_list;
221 }
222
223 session_lock(session);
224 if (!is_trigger_allowed_for_session(work_item->trigger, session)) {
225 goto error_dispose_session;
226 }
227
228 cmd_ret = cmd_start_trace(session);
229 switch (cmd_ret) {
230 case LTTNG_OK:
231 DBG("Successfully started session `%s` on behalf of trigger `%p`",
232 session_name, work_item->trigger);
233 break;
234 case LTTNG_ERR_TRACE_ALREADY_STARTED:
235 DBG("Attempted to start session `%s` on behalf of trigger `%p` but it was already started",
236 session_name, work_item->trigger);
237 break;
238 default:
239 WARN("Failed to start session `%s` on behalf of trigger `%p`: %s",
240 session_name, work_item->trigger,
241 lttng_strerror(-cmd_ret));
242 break;
243 }
244
245 error_dispose_session:
246 session_unlock(session);
247 session_put(session);
248 error_unlock_list:
249 session_unlock_list();
250 end:
251 return ret;
252 }
253
254 static int action_executor_stop_session_handler(struct action_executor *executor,
255 const struct action_work_item *work_item,
256 const struct lttng_action *action)
257 {
258 int ret = 0;
259 const char *session_name;
260 enum lttng_action_status action_status;
261 struct ltt_session *session;
262 enum lttng_error_code cmd_ret;
263
264 action_status = lttng_action_stop_session_get_session_name(
265 action, &session_name);
266 if (action_status != LTTNG_ACTION_STATUS_OK) {
267 ERR("Failed to get session name from `%s` action",
268 get_action_name(action));
269 ret = -1;
270 goto end;
271 }
272
273 session_lock_list();
274 session = session_find_by_name(session_name);
275 if (!session) {
276 DBG("Failed to find session `%s` by name while executing `%s` action of trigger `%p`",
277 session_name, get_action_name(action),
278 work_item->trigger);
279 goto error_unlock_list;
280 }
281
282 session_lock(session);
283 if (!is_trigger_allowed_for_session(work_item->trigger, session)) {
284 goto error_dispose_session;
285 }
286
287 cmd_ret = cmd_stop_trace(session);
288 switch (cmd_ret) {
289 case LTTNG_OK:
290 DBG("Successfully stopped session `%s` on behalf of trigger `%p`",
291 session_name, work_item->trigger);
292 break;
293 case LTTNG_ERR_TRACE_ALREADY_STOPPED:
294 DBG("Attempted to stop session `%s` on behalf of trigger `%p` but it was already stopped",
295 session_name, work_item->trigger);
296 break;
297 default:
298 WARN("Failed to stop session `%s` on behalf of trigger `%p`: %s",
299 session_name, work_item->trigger,
300 lttng_strerror(-cmd_ret));
301 break;
302 }
303
304 error_dispose_session:
305 session_unlock(session);
306 session_put(session);
307 error_unlock_list:
308 session_unlock_list();
309 end:
310 return ret;
311 }
312
313 static int action_executor_rotate_session_handler(struct action_executor *executor,
314 const struct action_work_item *work_item,
315 const struct lttng_action *action)
316 {
317 int ret = 0;
318 const char *session_name;
319 enum lttng_action_status action_status;
320 struct ltt_session *session;
321 enum lttng_error_code cmd_ret;
322
323 action_status = lttng_action_rotate_session_get_session_name(
324 action, &session_name);
325 if (action_status != LTTNG_ACTION_STATUS_OK) {
326 ERR("Failed to get session name from `%s` action",
327 get_action_name(action));
328 ret = -1;
329 goto end;
330 }
331
332 session_lock_list();
333 session = session_find_by_name(session_name);
334 if (!session) {
335 DBG("Failed to find session `%s` by name while executing `%s` action of trigger `%p`",
336 session_name, get_action_name(action),
337 work_item->trigger);
338 goto error_unlock_list;
339 }
340
341 session_lock(session);
342 if (!is_trigger_allowed_for_session(work_item->trigger, session)) {
343 goto error_dispose_session;
344 }
345
346 cmd_ret = cmd_rotate_session(session, NULL, false,
347 LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
348 switch (cmd_ret) {
349 case LTTNG_OK:
350 DBG("Successfully started rotation of session `%s` on behalf of trigger `%p`",
351 session_name, work_item->trigger);
352 break;
353 case LTTNG_ERR_ROTATION_PENDING:
354 DBG("Attempted to start a rotation of session `%s` on behalf of trigger `%p` but a rotation is already ongoing",
355 session_name, work_item->trigger);
356 break;
357 case LTTNG_ERR_ROTATION_MULTIPLE_AFTER_STOP:
358 case LTTNG_ERR_ROTATION_AFTER_STOP_CLEAR:
359 DBG("Attempted to start a rotation of session `%s` on behalf of trigger `%p` but a rotation has already been completed since the last stop or clear",
360 session_name, work_item->trigger);
361 break;
362 default:
363 WARN("Failed to start a rotation of session `%s` on behalf of trigger `%p`: %s",
364 session_name, work_item->trigger,
365 lttng_strerror(-cmd_ret));
366 break;
367 }
368
369 error_dispose_session:
370 session_unlock(session);
371 session_put(session);
372 error_unlock_list:
373 session_unlock_list();
374 end:
375 return ret;
376 }
377
378 static int action_executor_snapshot_session_handler(struct action_executor *executor,
379 const struct action_work_item *work_item,
380 const struct lttng_action *action)
381 {
382 int ret = 0;
383 const char *session_name;
384 enum lttng_action_status action_status;
385 struct ltt_session *session;
386 const struct lttng_snapshot_output default_snapshot_output = {
387 .max_size = UINT64_MAX,
388 };
389 const struct lttng_snapshot_output *snapshot_output =
390 &default_snapshot_output;
391 enum lttng_error_code cmd_ret;
392
393 action_status = lttng_action_snapshot_session_get_session_name(
394 action, &session_name);
395 if (action_status != LTTNG_ACTION_STATUS_OK) {
396 ERR("Failed to get session name from `%s` action",
397 get_action_name(action));
398 ret = -1;
399 goto end;
400 }
401
402 action_status = lttng_action_snapshot_session_get_output(
403 action, &snapshot_output);
404 if (action_status != LTTNG_ACTION_STATUS_OK &&
405 action_status != LTTNG_ACTION_STATUS_UNSET) {
406 ERR("Failed to get output from `%s` action",
407 get_action_name(action));
408 ret = -1;
409 goto end;
410 }
411
412 session_lock_list();
413 session = session_find_by_name(session_name);
414 if (!session) {
415 DBG("Failed to find session `%s` by name while executing `%s` action of trigger `%p`",
416 session_name, get_action_name(action),
417 work_item->trigger);
418 goto error_unlock_list;
419 }
420
421
422 session_lock(session);
423 if (!is_trigger_allowed_for_session(work_item->trigger, session)) {
424 goto error_dispose_session;
425 }
426
427 cmd_ret = cmd_snapshot_record(session, snapshot_output, 0);
428 switch (cmd_ret) {
429 case LTTNG_OK:
430 DBG("Successfully recorded snapshot of session `%s` on behalf of trigger `%p`",
431 session_name, work_item->trigger);
432 break;
433 default:
434 WARN("Failed to record snapshot of session `%s` on behalf of trigger `%p`: %s",
435 session_name, work_item->trigger,
436 lttng_strerror(-cmd_ret));
437 break;
438 }
439
440 error_dispose_session:
441 session_unlock(session);
442 session_put(session);
443 error_unlock_list:
444 session_unlock_list();
445 end:
446 return ret;
447 }
448
449 static int action_executor_group_handler(struct action_executor *executor,
450 const struct action_work_item *work_item,
451 const struct lttng_action *action_group)
452 {
453 int ret = 0;
454 unsigned int i, count;
455 enum lttng_action_status action_status;
456
457 action_status = lttng_action_group_get_count(action_group, &count);
458 if (action_status != LTTNG_ACTION_STATUS_OK) {
459 /* Fatal error. */
460 ERR("Failed to get count of action in action group");
461 ret = -1;
462 goto end;
463 }
464
465 DBG("Action group has %u action%s", count, count != 1 ? "s" : "");
466 for (i = 0; i < count; i++) {
467 const struct lttng_action *action =
468 lttng_action_group_get_at_index(
469 action_group, i);
470
471 ret = action_executor_generic_handler(
472 executor, work_item, action);
473 if (ret) {
474 ERR("Stopping the execution of the action group of trigger `%p` following a fatal error",
475 work_item->trigger);
476 goto end;
477 }
478 }
479 end:
480 return ret;
481 }
482
483 static int action_executor_generic_handler(struct action_executor *executor,
484 const struct action_work_item *work_item,
485 const struct lttng_action *action)
486 {
487 DBG("Executing action `%s` of trigger `%p` action work item %" PRIu64,
488 get_action_name(action),
489 work_item->trigger,
490 work_item->id);
491
492 return action_executors[lttng_action_get_type_const(action)](
493 executor, work_item, action);
494 }
495
496 static int action_work_item_execute(struct action_executor *executor,
497 struct action_work_item *work_item)
498 {
499 int ret;
500 const struct lttng_action *action =
501 lttng_trigger_get_const_action(work_item->trigger);
502
503 DBG("Starting execution of action work item %" PRIu64 " of trigger `%p`",
504 work_item->id, work_item->trigger);
505 ret = action_executor_generic_handler(executor, work_item, action);
506 DBG("Completed execution of action work item %" PRIu64 " of trigger `%p`",
507 work_item->id, work_item->trigger);
508 return ret;
509 }
510
511 static void action_work_item_destroy(struct action_work_item *work_item)
512 {
513 lttng_trigger_put(work_item->trigger);
514 lttng_evaluation_destroy(work_item->evaluation);
515 notification_client_list_put(work_item->client_list);
516 free(work_item);
517 }
518
519 static void *action_executor_thread(void *_data)
520 {
521 struct action_executor *executor = _data;
522
523 assert(executor);
524
525 health_register(health_sessiond, HEALTH_SESSIOND_TYPE_ACTION_EXECUTOR);
526
527 rcu_register_thread();
528 rcu_thread_online();
529
530 DBG("Entering work execution loop");
531 pthread_mutex_lock(&executor->work.lock);
532 while (!executor->should_quit) {
533 int ret;
534 struct action_work_item *work_item;
535
536 health_code_update();
537 if (executor->work.pending_count == 0) {
538 health_poll_entry();
539 DBG("No work items enqueued, entering wait");
540 pthread_cond_wait(&executor->work.cond,
541 &executor->work.lock);
542 DBG("Woke-up from wait");
543 health_poll_exit();
544 continue;
545 }
546
547 /* Pop item from front of the listwith work lock held. */
548 work_item = cds_list_first_entry(&executor->work.list,
549 struct action_work_item, list_node);
550 cds_list_del(&work_item->list_node);
551 executor->work.pending_count--;
552
553 /*
554 * Work can be performed without holding the work lock,
555 * allowing new items to be queued.
556 */
557 pthread_mutex_unlock(&executor->work.lock);
558 ret = action_work_item_execute(executor, work_item);
559 action_work_item_destroy(work_item);
560 if (ret) {
561 /* Fatal error. */
562 break;
563 }
564
565 health_code_update();
566 pthread_mutex_lock(&executor->work.lock);
567 }
568
569 pthread_mutex_unlock(&executor->work.lock);
570 DBG("Left work execution loop");
571
572 health_code_update();
573
574 rcu_thread_offline();
575 rcu_unregister_thread();
576 health_unregister(health_sessiond);
577
578 return NULL;
579 }
580
581 static bool shutdown_action_executor_thread(void *_data)
582 {
583 struct action_executor *executor = _data;
584
585 executor->should_quit = true;
586 pthread_cond_signal(&executor->work.cond);
587 return true;
588 }
589
590 static void clean_up_action_executor_thread(void *_data)
591 {
592 struct action_executor *executor = _data;
593
594 assert(cds_list_empty(&executor->work.list));
595
596 pthread_mutex_destroy(&executor->work.lock);
597 pthread_cond_destroy(&executor->work.cond);
598 free(executor);
599 }
600
601 struct action_executor *action_executor_create(
602 struct notification_thread_handle *handle)
603 {
604 struct action_executor *executor = zmalloc(sizeof(*executor));
605
606 if (!executor) {
607 goto end;
608 }
609
610 CDS_INIT_LIST_HEAD(&executor->work.list);
611 pthread_cond_init(&executor->work.cond, NULL);
612 pthread_mutex_init(&executor->work.lock, NULL);
613 executor->notification_thread_handle = handle;
614
615 executor->thread = lttng_thread_create(THREAD_NAME,
616 action_executor_thread, shutdown_action_executor_thread,
617 clean_up_action_executor_thread, executor);
618 end:
619 return executor;
620 }
621
622 void action_executor_destroy(struct action_executor *executor)
623 {
624 struct action_work_item *work_item, *tmp;
625
626 /* TODO Wait for work list to drain? */
627 lttng_thread_shutdown(executor->thread);
628 pthread_mutex_lock(&executor->work.lock);
629 if (executor->work.pending_count != 0) {
630 WARN("%" PRIu64
631 " trigger action%s still queued for execution and will be discarded",
632 executor->work.pending_count,
633 executor->work.pending_count == 1 ? " is" :
634 "s are");
635 }
636
637 cds_list_for_each_entry_safe (
638 work_item, tmp, &executor->work.list, list_node) {
639 WARN("Discarding action work item %" PRIu64
640 " associated to trigger `%p`",
641 work_item->id, work_item->trigger);
642 cds_list_del(&work_item->list_node);
643 action_work_item_destroy(work_item);
644 }
645 pthread_mutex_unlock(&executor->work.lock);
646 lttng_thread_put(executor->thread);
647 }
648
649 /* RCU read-lock must be held by the caller. */
650 enum action_executor_status action_executor_enqueue(
651 struct action_executor *executor,
652 struct lttng_trigger *trigger,
653 struct lttng_evaluation *evaluation,
654 const struct lttng_credentials *object_creds,
655 struct notification_client_list *client_list)
656 {
657 enum action_executor_status executor_status = ACTION_EXECUTOR_STATUS_OK;
658 const uint64_t work_item_id = executor->next_work_item_id++;
659 struct action_work_item *work_item;
660 bool signal = false;
661
662 pthread_mutex_lock(&executor->work.lock);
663 /* Check for queue overflow. */
664 if (executor->work.pending_count >= MAX_QUEUED_WORK_COUNT) {
665 /* Most likely spammy, remove if it is the case. */
666 DBG("Refusing to enqueue action for trigger `%p` as work item %" PRIu64
667 " (overflow)",
668 trigger, work_item_id);
669 executor_status = ACTION_EXECUTOR_STATUS_OVERFLOW;
670 goto error_unlock;
671 }
672
673 work_item = zmalloc(sizeof(*work_item));
674 if (!work_item) {
675 PERROR("Failed to allocate action executor work item on behalf of trigger `%p`",
676 trigger);
677 executor_status = ACTION_EXECUTOR_STATUS_ERROR;
678 goto error_unlock;
679 }
680
681 lttng_trigger_get(trigger);
682 if (client_list) {
683 const bool reference_acquired =
684 notification_client_list_get(client_list);
685
686 assert(reference_acquired);
687 }
688
689 *work_item = (typeof(*work_item)){
690 .id = work_item_id,
691 .trigger = trigger,
692 /* Ownership transferred to the work item. */
693 .evaluation = evaluation,
694 .object_creds = {
695 .is_set = !!object_creds,
696 .value = object_creds ? *object_creds :
697 (typeof(work_item->object_creds.value)) {},
698 },
699 .client_list = client_list,
700 .list_node = CDS_LIST_HEAD_INIT(work_item->list_node),
701 };
702
703 evaluation = NULL;
704 cds_list_add_tail(&work_item->list_node, &executor->work.list);
705 executor->work.pending_count++;
706 DBG("Enqueued action for trigger `%p` as work item %" PRIu64,
707 trigger, work_item_id);
708 signal = true;
709
710 error_unlock:
711 pthread_mutex_unlock(&executor->work.lock);
712 if (signal) {
713 pthread_cond_signal(&executor->work.cond);
714 }
715
716 lttng_evaluation_destroy(evaluation);
717 return executor_status;
718 }
This page took 0.069441 seconds and 3 git commands to generate.