Clean-up: sessiond: remove useless LTTNG_ASSERT
[lttng-tools.git] / src / bin / lttng-sessiond / action-executor.cpp
1 /*
2 * Copyright (C) 2020 Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * SPDX-License-Identifier: GPL-2.0-only
5 *
6 */
7
8 #include "action-executor.hpp"
9 #include "cmd.hpp"
10 #include "health-sessiond.hpp"
11 #include "lttng-sessiond.hpp"
12 #include "notification-thread-internal.hpp"
13 #include "session.hpp"
14 #include "thread.hpp"
15
16 #include <common/dynamic-array.hpp>
17 #include <common/macros.hpp>
18 #include <common/optional.hpp>
19
20 #include <lttng/action/action-internal.hpp>
21 #include <lttng/action/list-internal.hpp>
22 #include <lttng/action/list.h>
23 #include <lttng/action/notify-internal.hpp>
24 #include <lttng/action/notify.h>
25 #include <lttng/action/rotate-session.h>
26 #include <lttng/action/snapshot-session.h>
27 #include <lttng/action/start-session.h>
28 #include <lttng/action/stop-session.h>
29 #include <lttng/condition/evaluation.h>
30 #include <lttng/condition/event-rule-matches-internal.hpp>
31 #include <lttng/lttng-error.h>
32 #include <lttng/trigger/trigger-internal.hpp>
33
34 #include <pthread.h>
35 #include <stdbool.h>
36 #include <stddef.h>
37 #include <urcu/list.h>
38
39 #define THREAD_NAME "Action Executor"
40 #define MAX_QUEUED_WORK_COUNT 8192
41
42 struct action_executor {
43 struct lttng_thread *thread;
44 struct notification_thread_handle *notification_thread_handle;
45 struct {
46 uint64_t pending_count;
47 struct cds_list_head list;
48 pthread_cond_t cond;
49 pthread_mutex_t lock;
50 } work;
51 bool should_quit;
52 uint64_t next_work_item_id;
53 };
54
55 namespace {
56 /*
57 * A work item is composed of a dynamic array of sub-items which
58 * represent a flattened, and augmented, version of a trigger's actions.
59 *
60 * We cannot rely solely on the trigger's actions since each action can have an
61 * execution context we need to comply with.
62 *
63 * The notion of execution context is required since for some actions the
64 * associated object are referenced by name and not by id. This can lead to
65 * a number of ambiguities when executing an action work item.
66 *
67 * For example, let's take a simple trigger such as:
68 * - condition: ust event a
69 * - action: start session S
70 *
71 * At time T, session S exists.
72 * At T + 1, the event A is hit.
73 * At T + 2, the tracer event notification is received and the work item is
74 * queued. Here session S have an id of 1.
75 * At T + 3, the session S is destroyed and a new session S is created, with a
76 * resulting id of 200.
77 * At T +4, the work item is popped from the queue and begin execution and will
78 * start session S with an id of 200 instead of the session S id 1 that was
79 * present at the queuing phase.
80 *
81 * The context to be respected is the one when the work item is queued. If the
82 * execution context is not the same at the moment of execution, we skip the
83 * execution of that sub-item.
84 *
85 * It is the same policy in regards to the validity of the associated
86 * trigger object at the moment of execution, if the trigger is found to be
87 * unregistered, the execution is skipped.
88 */
89 struct action_work_item {
90 uint64_t id;
91
92 /*
93 * The actions to be executed with their respective execution context.
94 * See struct `action_work_subitem`.
95 */
96 struct lttng_dynamic_array subitems;
97
98 /* Execution context data */
99 struct lttng_trigger *trigger;
100 struct lttng_evaluation *evaluation;
101 struct notification_client_list *client_list;
102 LTTNG_OPTIONAL(struct lttng_credentials) object_creds;
103 struct cds_list_head list_node;
104 };
105
106 struct action_work_subitem {
107 struct lttng_action *action;
108 struct {
109 /* Used by actions targeting a session. */
110 LTTNG_OPTIONAL(uint64_t) session_id;
111 } context;
112 };
113 } /* namespace */
114
115 /*
116 * Only return non-zero on a fatal error that should shut down the action
117 * executor.
118 */
119 using action_executor_handler = int (*)(struct action_executor *,
120 const struct action_work_item *,
121 struct action_work_subitem *);
122
123 static int action_executor_notify_handler(struct action_executor *executor,
124 const struct action_work_item *,
125 struct action_work_subitem *);
126 static int action_executor_start_session_handler(struct action_executor *executor,
127 const struct action_work_item *,
128 struct action_work_subitem *);
129 static int action_executor_stop_session_handler(struct action_executor *executor,
130 const struct action_work_item *,
131 struct action_work_subitem *);
132 static int action_executor_rotate_session_handler(struct action_executor *executor,
133 const struct action_work_item *,
134 struct action_work_subitem *);
135 static int action_executor_snapshot_session_handler(struct action_executor *executor,
136 const struct action_work_item *,
137 struct action_work_subitem *);
138 static int action_executor_list_handler(struct action_executor *executor,
139 const struct action_work_item *,
140 struct action_work_subitem *);
141 static int action_executor_generic_handler(struct action_executor *executor,
142 const struct action_work_item *,
143 struct action_work_subitem *);
144
145 static const action_executor_handler action_executors[] = {
146 action_executor_notify_handler, action_executor_start_session_handler,
147 action_executor_stop_session_handler, action_executor_rotate_session_handler,
148 action_executor_snapshot_session_handler, action_executor_list_handler,
149 };
150
151 /* Forward declaration */
152 static int add_action_to_subitem_array(struct lttng_action *action,
153 struct lttng_dynamic_array *subitems);
154
155 static int populate_subitem_array_from_trigger(struct lttng_trigger *trigger,
156 struct lttng_dynamic_array *subitems);
157
158 static void action_work_subitem_destructor(void *element)
159 {
160 struct action_work_subitem *subitem = (action_work_subitem *) element;
161
162 lttng_action_put(subitem->action);
163 }
164
165 static const char *get_action_name(const struct lttng_action *action)
166 {
167 const enum lttng_action_type action_type = lttng_action_get_type(action);
168
169 LTTNG_ASSERT(action_type != LTTNG_ACTION_TYPE_UNKNOWN);
170
171 return lttng_action_type_string(action_type);
172 }
173
174 /* Check if this trigger allowed to interect with a given session. */
175 static bool is_trigger_allowed_for_session(const struct lttng_trigger *trigger,
176 struct ltt_session *session)
177 {
178 bool is_allowed = false;
179 const struct lttng_credentials session_creds = {
180 .uid = LTTNG_OPTIONAL_INIT_VALUE(session->uid),
181 .gid = LTTNG_OPTIONAL_INIT_VALUE(session->gid),
182 };
183 /* Can never be NULL. */
184 const struct lttng_credentials *trigger_creds = lttng_trigger_get_credentials(trigger);
185
186 is_allowed = (lttng_credentials_is_equal_uid(trigger_creds, &session_creds)) ||
187 (lttng_credentials_get_uid(trigger_creds) == 0);
188 if (!is_allowed) {
189 WARN("Trigger is not allowed to interact with session `%s`: session uid = %ld, session gid = %ld, trigger uid = %ld",
190 session->name,
191 (long int) session->uid,
192 (long int) session->gid,
193 (long int) lttng_credentials_get_uid(trigger_creds));
194 }
195
196 return is_allowed;
197 }
198
199 static const char *get_trigger_name(const struct lttng_trigger *trigger)
200 {
201 const char *trigger_name;
202 enum lttng_trigger_status trigger_status;
203
204 trigger_status = lttng_trigger_get_name(trigger, &trigger_name);
205 switch (trigger_status) {
206 case LTTNG_TRIGGER_STATUS_OK:
207 break;
208 case LTTNG_TRIGGER_STATUS_UNSET:
209 trigger_name = "(anonymous)";
210 break;
211 default:
212 trigger_name = "(failed to get name)";
213 break;
214 }
215
216 return trigger_name;
217 }
218
219 static int client_handle_transmission_status(struct notification_client *client,
220 enum client_transmission_status status,
221 void *user_data)
222 {
223 int ret = 0;
224 struct action_executor *executor = (action_executor *) user_data;
225 bool update_communication = true;
226
227 switch (status) {
228 case CLIENT_TRANSMISSION_STATUS_COMPLETE:
229 DBG("Successfully sent full notification to client, client_id = %" PRIu64,
230 client->id);
231 /*
232 * There is no need to wake the (e)poll thread. If it was waiting for
233 * "out" events on the client's socket, it will see that no payload
234 * in queued and will unsubscribe from that event.
235 *
236 * In the other cases, we have to wake the the (e)poll thread to either
237 * handle the error on the client or to get it to monitor the client "out"
238 * events.
239 */
240 update_communication = false;
241 break;
242 case CLIENT_TRANSMISSION_STATUS_QUEUED:
243 DBG("Queued notification in client outgoing buffer, client_id = %" PRIu64,
244 client->id);
245 break;
246 case CLIENT_TRANSMISSION_STATUS_FAIL:
247 DBG("Communication error occurred while sending notification to client, client_id = %" PRIu64,
248 client->id);
249 break;
250 default:
251 ERR("Fatal error encoutered while sending notification to client, client_id = %" PRIu64,
252 client->id);
253 ret = -1;
254 goto end;
255 }
256
257 if (!update_communication) {
258 goto end;
259 }
260
261 /* Safe to read client's id without locking as it is immutable. */
262 ret = notification_thread_client_communication_update(
263 executor->notification_thread_handle, client->id, status);
264 end:
265 return ret;
266 }
267
268 static int action_executor_notify_handler(struct action_executor *executor,
269 const struct action_work_item *work_item,
270 struct action_work_subitem *item __attribute__((unused)))
271 {
272 return notification_client_list_send_evaluation(
273 work_item->client_list,
274 work_item->trigger,
275 work_item->evaluation,
276 work_item->object_creds.is_set ? &(work_item->object_creds.value) : nullptr,
277 client_handle_transmission_status,
278 executor);
279 }
280
281 static int action_executor_start_session_handler(struct action_executor *executor
282 __attribute__((unused)),
283 const struct action_work_item *work_item,
284 struct action_work_subitem *item)
285 {
286 int ret = 0;
287 const char *session_name;
288 enum lttng_action_status action_status;
289 struct ltt_session *session;
290 enum lttng_error_code cmd_ret;
291 struct lttng_action *action = item->action;
292
293 action_status = lttng_action_start_session_get_session_name(action, &session_name);
294 if (action_status != LTTNG_ACTION_STATUS_OK) {
295 ERR("Failed to get session name from `%s` action", get_action_name(action));
296 ret = -1;
297 goto end;
298 }
299
300 /*
301 * Validate if at the moment of the action was queued the session
302 * existed. If not skip the action altogether.
303 */
304 if (!item->context.session_id.is_set) {
305 DBG("Session `%s` was not present at the moment the work item was enqueued for `%s` action of trigger `%s`",
306 session_name,
307 get_action_name(action),
308 get_trigger_name(work_item->trigger));
309 lttng_action_increase_execution_failure_count(action);
310 goto end;
311 }
312
313 session_lock_list();
314 rcu_read_lock();
315 session = session_find_by_id(LTTNG_OPTIONAL_GET(item->context.session_id));
316 if (!session) {
317 DBG("Failed to find session `%s` by name while executing `%s` action of trigger `%s`",
318 session_name,
319 get_action_name(action),
320 get_trigger_name(work_item->trigger));
321 lttng_action_increase_execution_failure_count(action);
322 goto error_unlock_list;
323 }
324
325 session_lock(session);
326 if (session->destroyed) {
327 DBG("Session `%s` with id = %" PRIu64
328 " is flagged as destroyed. Skipping: action = `%s`, trigger = `%s`",
329 session->name,
330 session->id,
331 get_action_name(action),
332 get_trigger_name(work_item->trigger));
333 goto error_unlock_session;
334 }
335
336 if (!is_trigger_allowed_for_session(work_item->trigger, session)) {
337 goto error_unlock_session;
338 }
339
340 cmd_ret = (lttng_error_code) cmd_start_trace(session);
341 switch (cmd_ret) {
342 case LTTNG_OK:
343 DBG("Successfully started session `%s` on behalf of trigger `%s`",
344 session_name,
345 get_trigger_name(work_item->trigger));
346 break;
347 case LTTNG_ERR_TRACE_ALREADY_STARTED:
348 DBG("Attempted to start session `%s` on behalf of trigger `%s` but it was already started",
349 session_name,
350 get_trigger_name(work_item->trigger));
351 break;
352 default:
353 WARN("Failed to start session `%s` on behalf of trigger `%s`: %s",
354 session_name,
355 get_trigger_name(work_item->trigger),
356 lttng_strerror(-cmd_ret));
357 lttng_action_increase_execution_failure_count(action);
358 break;
359 }
360
361 error_unlock_session:
362 session_unlock(session);
363 session_put(session);
364 error_unlock_list:
365 rcu_read_unlock();
366 session_unlock_list();
367 end:
368 return ret;
369 }
370
371 static int action_executor_stop_session_handler(struct action_executor *executor
372 __attribute__((unused)),
373 const struct action_work_item *work_item,
374 struct action_work_subitem *item)
375 {
376 int ret = 0;
377 const char *session_name;
378 enum lttng_action_status action_status;
379 struct ltt_session *session;
380 enum lttng_error_code cmd_ret;
381 struct lttng_action *action = item->action;
382
383 action_status = lttng_action_stop_session_get_session_name(action, &session_name);
384 if (action_status != LTTNG_ACTION_STATUS_OK) {
385 ERR("Failed to get session name from `%s` action", get_action_name(action));
386 ret = -1;
387 goto end;
388 }
389
390 /*
391 * Validate if, at the moment the action was queued, the target session
392 * existed. If not, skip the action altogether.
393 */
394 if (!item->context.session_id.is_set) {
395 DBG("Session `%s` was not present at the moment the work item was enqueued for `%s` action of trigger `%s`",
396 session_name,
397 get_action_name(action),
398 get_trigger_name(work_item->trigger));
399 lttng_action_increase_execution_failure_count(action);
400 goto end;
401 }
402
403 session_lock_list();
404 rcu_read_lock();
405 session = session_find_by_id(LTTNG_OPTIONAL_GET(item->context.session_id));
406 if (!session) {
407 DBG("Failed to find session `%s` by name while executing `%s` action of trigger `%s`",
408 session_name,
409 get_action_name(action),
410 get_trigger_name(work_item->trigger));
411 lttng_action_increase_execution_failure_count(action);
412 goto error_unlock_list;
413 }
414
415 session_lock(session);
416 if (session->destroyed) {
417 DBG("Session `%s` with id = %" PRIu64
418 " is flagged as destroyed. Skipping: action = `%s`, trigger = `%s`",
419 session->name,
420 session->id,
421 get_action_name(action),
422 get_trigger_name(work_item->trigger));
423 goto error_unlock_session;
424 }
425
426 if (!is_trigger_allowed_for_session(work_item->trigger, session)) {
427 goto error_unlock_session;
428 }
429
430 cmd_ret = (lttng_error_code) cmd_stop_trace(session);
431 switch (cmd_ret) {
432 case LTTNG_OK:
433 DBG("Successfully stopped session `%s` on behalf of trigger `%s`",
434 session_name,
435 get_trigger_name(work_item->trigger));
436 break;
437 case LTTNG_ERR_TRACE_ALREADY_STOPPED:
438 DBG("Attempted to stop session `%s` on behalf of trigger `%s` but it was already stopped",
439 session_name,
440 get_trigger_name(work_item->trigger));
441 break;
442 default:
443 WARN("Failed to stop session `%s` on behalf of trigger `%s`: %s",
444 session_name,
445 get_trigger_name(work_item->trigger),
446 lttng_strerror(-cmd_ret));
447 lttng_action_increase_execution_failure_count(action);
448 break;
449 }
450
451 error_unlock_session:
452 session_unlock(session);
453 session_put(session);
454 error_unlock_list:
455 rcu_read_unlock();
456 session_unlock_list();
457 end:
458 return ret;
459 }
460
461 static int action_executor_rotate_session_handler(struct action_executor *executor
462 __attribute__((unused)),
463 const struct action_work_item *work_item,
464 struct action_work_subitem *item)
465 {
466 int ret = 0;
467 const char *session_name;
468 enum lttng_action_status action_status;
469 struct ltt_session *session;
470 enum lttng_error_code cmd_ret;
471 struct lttng_action *action = item->action;
472
473 action_status = lttng_action_rotate_session_get_session_name(action, &session_name);
474 if (action_status != LTTNG_ACTION_STATUS_OK) {
475 ERR("Failed to get session name from `%s` action", get_action_name(action));
476 ret = -1;
477 goto end;
478 }
479
480 /*
481 * Validate if, at the moment the action was queued, the target session
482 * existed. If not, skip the action altogether.
483 */
484 if (!item->context.session_id.is_set) {
485 DBG("Session `%s` was not present at the moment the work item was enqueued for `%s` action of trigger `%s`",
486 session_name,
487 get_action_name(action),
488 get_trigger_name(work_item->trigger));
489 lttng_action_increase_execution_failure_count(action);
490 goto end;
491 }
492
493 session_lock_list();
494 rcu_read_lock();
495 session = session_find_by_id(LTTNG_OPTIONAL_GET(item->context.session_id));
496 if (!session) {
497 DBG("Failed to find session `%s` by name while executing `%s` action of trigger `%s`",
498 session_name,
499 get_action_name(action),
500 get_trigger_name(work_item->trigger));
501 lttng_action_increase_execution_failure_count(action);
502 goto error_unlock_list;
503 }
504
505 session_lock(session);
506 if (session->destroyed) {
507 DBG("Session `%s` with id = %" PRIu64
508 " is flagged as destroyed. Skipping: action = `%s`, trigger = `%s`",
509 session->name,
510 session->id,
511 get_action_name(action),
512 get_trigger_name(work_item->trigger));
513 goto error_unlock_session;
514 }
515
516 if (!is_trigger_allowed_for_session(work_item->trigger, session)) {
517 goto error_unlock_session;
518 }
519
520 cmd_ret = (lttng_error_code) cmd_rotate_session(
521 session, nullptr, false, LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
522 switch (cmd_ret) {
523 case LTTNG_OK:
524 DBG("Successfully started rotation of session `%s` on behalf of trigger `%s`",
525 session_name,
526 get_trigger_name(work_item->trigger));
527 break;
528 case LTTNG_ERR_ROTATION_PENDING:
529 DBG("Attempted to start a rotation of session `%s` on behalf of trigger `%s` but a rotation is already ongoing",
530 session_name,
531 get_trigger_name(work_item->trigger));
532 lttng_action_increase_execution_failure_count(action);
533 break;
534 case LTTNG_ERR_ROTATION_MULTIPLE_AFTER_STOP:
535 case LTTNG_ERR_ROTATION_AFTER_STOP_CLEAR:
536 DBG("Attempted to start a rotation of session `%s` on behalf of trigger `%s` but a rotation has already been completed since the last stop or clear",
537 session_name,
538 get_trigger_name(work_item->trigger));
539 break;
540 default:
541 WARN("Failed to start a rotation of session `%s` on behalf of trigger `%s`: %s",
542 session_name,
543 get_trigger_name(work_item->trigger),
544 lttng_strerror(-cmd_ret));
545 lttng_action_increase_execution_failure_count(action);
546 break;
547 }
548
549 error_unlock_session:
550 session_unlock(session);
551 session_put(session);
552 error_unlock_list:
553 rcu_read_unlock();
554 session_unlock_list();
555 end:
556 return ret;
557 }
558
559 static int action_executor_snapshot_session_handler(struct action_executor *executor
560 __attribute__((unused)),
561 const struct action_work_item *work_item,
562 struct action_work_subitem *item)
563 {
564 int ret = 0;
565 const char *session_name;
566 enum lttng_action_status action_status;
567 struct ltt_session *session;
568 lttng_snapshot_output default_snapshot_output;
569 const struct lttng_snapshot_output *snapshot_output = &default_snapshot_output;
570 enum lttng_error_code cmd_ret;
571 struct lttng_action *action = item->action;
572
573 default_snapshot_output.max_size = UINT64_MAX;
574
575 /*
576 * Validate if, at the moment the action was queued, the target session
577 * existed. If not, skip the action altogether.
578 */
579 if (!item->context.session_id.is_set) {
580 DBG("Session was not present at the moment the work item was enqueued for `%s` action of trigger `%s`",
581 get_action_name(action),
582 get_trigger_name(work_item->trigger));
583 lttng_action_increase_execution_failure_count(action);
584 goto end;
585 }
586
587 action_status = lttng_action_snapshot_session_get_session_name(action, &session_name);
588 if (action_status != LTTNG_ACTION_STATUS_OK) {
589 ERR("Failed to get session name from `%s` action", get_action_name(action));
590 ret = -1;
591 goto end;
592 }
593
594 action_status = lttng_action_snapshot_session_get_output(action, &snapshot_output);
595 if (action_status != LTTNG_ACTION_STATUS_OK && action_status != LTTNG_ACTION_STATUS_UNSET) {
596 ERR("Failed to get output from `%s` action", get_action_name(action));
597 ret = -1;
598 goto end;
599 }
600
601 session_lock_list();
602 rcu_read_lock();
603 session = session_find_by_id(LTTNG_OPTIONAL_GET(item->context.session_id));
604 if (!session) {
605 DBG("Failed to find session `%s` by name while executing `%s` action of trigger `%s`",
606 session_name,
607 get_action_name(action),
608 get_trigger_name(work_item->trigger));
609 lttng_action_increase_execution_failure_count(action);
610 goto error_unlock_list;
611 }
612
613 session_lock(session);
614 if (session->destroyed) {
615 DBG("Session `%s` with id = %" PRIu64
616 " is flagged as destroyed. Skipping: action = `%s`, trigger = `%s`",
617 session->name,
618 session->id,
619 get_action_name(action),
620 get_trigger_name(work_item->trigger));
621 goto error_unlock_session;
622 }
623
624 if (!is_trigger_allowed_for_session(work_item->trigger, session)) {
625 goto error_unlock_session;
626 }
627
628 cmd_ret = (lttng_error_code) cmd_snapshot_record(session, snapshot_output, 0);
629 switch (cmd_ret) {
630 case LTTNG_OK:
631 DBG("Successfully recorded snapshot of session `%s` on behalf of trigger `%s`",
632 session_name,
633 get_trigger_name(work_item->trigger));
634 break;
635 default:
636 WARN("Failed to record snapshot of session `%s` on behalf of trigger `%s`: %s",
637 session_name,
638 get_trigger_name(work_item->trigger),
639 lttng_strerror(-cmd_ret));
640 lttng_action_increase_execution_failure_count(action);
641 break;
642 }
643
644 error_unlock_session:
645 session_unlock(session);
646 session_put(session);
647 error_unlock_list:
648 rcu_read_unlock();
649 session_unlock_list();
650 end:
651 return ret;
652 }
653
654 static int action_executor_list_handler(struct action_executor *executor __attribute__((unused)),
655 const struct action_work_item *work_item
656 __attribute__((unused)),
657 struct action_work_subitem *item __attribute__((unused)))
658 {
659 ERR("Execution of a list action by the action executor should never occur");
660 abort();
661 }
662
663 static int action_executor_generic_handler(struct action_executor *executor,
664 const struct action_work_item *work_item,
665 struct action_work_subitem *item)
666 {
667 int ret;
668 struct lttng_action *action = item->action;
669 const enum lttng_action_type action_type = lttng_action_get_type(action);
670
671 LTTNG_ASSERT(action_type != LTTNG_ACTION_TYPE_UNKNOWN);
672
673 lttng_action_increase_execution_request_count(action);
674 if (!lttng_action_should_execute(action)) {
675 DBG("Policy prevented execution of action `%s` of trigger `%s` action work item %" PRIu64,
676 get_action_name(action),
677 get_trigger_name(work_item->trigger),
678 work_item->id);
679 ret = 0;
680 goto end;
681 }
682
683 lttng_action_increase_execution_count(action);
684 DBG("Executing action `%s` of trigger `%s` action work item %" PRIu64,
685 get_action_name(action),
686 get_trigger_name(work_item->trigger),
687 work_item->id);
688 ret = action_executors[action_type](executor, work_item, item);
689 end:
690 return ret;
691 }
692
693 static int action_work_item_execute(struct action_executor *executor,
694 struct action_work_item *work_item)
695 {
696 int ret;
697 size_t count, i;
698
699 DBG("Starting execution of action work item %" PRIu64 " of trigger `%s`",
700 work_item->id,
701 get_trigger_name(work_item->trigger));
702
703 count = lttng_dynamic_array_get_count(&work_item->subitems);
704 for (i = 0; i < count; i++) {
705 struct action_work_subitem *item;
706
707 item = (action_work_subitem *) lttng_dynamic_array_get_element(&work_item->subitems,
708 i);
709 ret = action_executor_generic_handler(executor, work_item, item);
710 if (ret) {
711 goto end;
712 }
713 }
714 end:
715 DBG("Completed execution of action work item %" PRIu64 " of trigger `%s`",
716 work_item->id,
717 get_trigger_name(work_item->trigger));
718 return ret;
719 }
720
721 static void action_work_item_destroy(struct action_work_item *work_item)
722 {
723 lttng_trigger_put(work_item->trigger);
724 lttng_evaluation_destroy(work_item->evaluation);
725 notification_client_list_put(work_item->client_list);
726 lttng_dynamic_array_reset(&work_item->subitems);
727 free(work_item);
728 }
729
730 static void *action_executor_thread(void *_data)
731 {
732 struct action_executor *executor = (action_executor *) _data;
733
734 LTTNG_ASSERT(executor);
735
736 health_register(the_health_sessiond, HEALTH_SESSIOND_TYPE_ACTION_EXECUTOR);
737
738 rcu_register_thread();
739 rcu_thread_online();
740
741 DBG("Entering work execution loop");
742 pthread_mutex_lock(&executor->work.lock);
743 while (!executor->should_quit) {
744 int ret = 0;
745 struct action_work_item *work_item;
746
747 health_code_update();
748 if (executor->work.pending_count == 0) {
749 health_poll_entry();
750 DBG("No work items enqueued, entering wait");
751 pthread_cond_wait(&executor->work.cond, &executor->work.lock);
752 DBG("Woke-up from wait");
753 health_poll_exit();
754 continue;
755 }
756
757 /* Pop item from front of the list with work lock held. */
758 work_item = cds_list_first_entry(
759 &executor->work.list, struct action_work_item, list_node);
760 cds_list_del(&work_item->list_node);
761 executor->work.pending_count--;
762
763 /*
764 * Work can be performed without holding the work lock,
765 * allowing new items to be queued.
766 */
767 pthread_mutex_unlock(&executor->work.lock);
768
769 /* Execute item only if a trigger is registered. */
770 lttng_trigger_lock(work_item->trigger);
771 if (!lttng_trigger_is_registered(work_item->trigger)) {
772 const char *trigger_name = nullptr;
773 uid_t trigger_owner_uid;
774 enum lttng_trigger_status trigger_status;
775
776 trigger_name = get_trigger_name(work_item->trigger);
777
778 trigger_status =
779 lttng_trigger_get_owner_uid(work_item->trigger, &trigger_owner_uid);
780 LTTNG_ASSERT(trigger_status == LTTNG_TRIGGER_STATUS_OK);
781
782 DBG("Work item skipped since the associated trigger is no longer registered: work item id = %" PRIu64
783 ", trigger name = `%s`, trigger owner uid = %d",
784 work_item->id,
785 trigger_name,
786 (int) trigger_owner_uid);
787 ret = 0;
788 goto skip_execute;
789 }
790
791 ret = action_work_item_execute(executor, work_item);
792
793 skip_execute:
794 lttng_trigger_unlock(work_item->trigger);
795 action_work_item_destroy(work_item);
796 if (ret) {
797 /* Fatal error. */
798 break;
799 }
800
801 health_code_update();
802 pthread_mutex_lock(&executor->work.lock);
803 }
804
805 if (executor->should_quit) {
806 pthread_mutex_unlock(&executor->work.lock);
807 }
808 DBG("Left work execution loop");
809
810 health_code_update();
811
812 rcu_thread_offline();
813 rcu_unregister_thread();
814 health_unregister(the_health_sessiond);
815
816 return nullptr;
817 }
818
819 static bool shutdown_action_executor_thread(void *_data)
820 {
821 struct action_executor *executor = (action_executor *) _data;
822
823 pthread_mutex_lock(&executor->work.lock);
824 executor->should_quit = true;
825 pthread_cond_signal(&executor->work.cond);
826 pthread_mutex_unlock(&executor->work.lock);
827 return true;
828 }
829
830 static void clean_up_action_executor_thread(void *_data)
831 {
832 struct action_executor *executor = (action_executor *) _data;
833
834 LTTNG_ASSERT(cds_list_empty(&executor->work.list));
835
836 pthread_mutex_destroy(&executor->work.lock);
837 pthread_cond_destroy(&executor->work.cond);
838 free(executor);
839 }
840
841 struct action_executor *action_executor_create(struct notification_thread_handle *handle)
842 {
843 struct action_executor *executor = zmalloc<action_executor>();
844
845 if (!executor) {
846 goto end;
847 }
848
849 CDS_INIT_LIST_HEAD(&executor->work.list);
850 pthread_cond_init(&executor->work.cond, nullptr);
851 pthread_mutex_init(&executor->work.lock, nullptr);
852 executor->notification_thread_handle = handle;
853
854 executor->thread = lttng_thread_create(THREAD_NAME,
855 action_executor_thread,
856 shutdown_action_executor_thread,
857 clean_up_action_executor_thread,
858 executor);
859 end:
860 return executor;
861 }
862
863 void action_executor_destroy(struct action_executor *executor)
864 {
865 struct action_work_item *work_item, *tmp;
866
867 /* TODO Wait for work list to drain? */
868 lttng_thread_shutdown(executor->thread);
869 pthread_mutex_lock(&executor->work.lock);
870 if (executor->work.pending_count != 0) {
871 WARN("%" PRIu64
872 " trigger action%s still queued for execution and will be discarded",
873 executor->work.pending_count,
874 executor->work.pending_count == 1 ? " is" : "s are");
875 }
876
877 cds_list_for_each_entry_safe (work_item, tmp, &executor->work.list, list_node) {
878 WARN("Discarding action work item %" PRIu64 " associated to trigger `%s`",
879 work_item->id,
880 get_trigger_name(work_item->trigger));
881 cds_list_del(&work_item->list_node);
882 action_work_item_destroy(work_item);
883 }
884 pthread_mutex_unlock(&executor->work.lock);
885 lttng_thread_put(executor->thread);
886 }
887
888 /* RCU read-lock must be held by the caller. */
889 enum action_executor_status
890 action_executor_enqueue_trigger(struct action_executor *executor,
891 struct lttng_trigger *trigger,
892 struct lttng_evaluation *evaluation,
893 const struct lttng_credentials *object_creds,
894 struct notification_client_list *client_list)
895 {
896 int ret;
897 enum action_executor_status executor_status = ACTION_EXECUTOR_STATUS_OK;
898 const uint64_t work_item_id = executor->next_work_item_id++;
899 struct action_work_item *work_item;
900 bool signal = false;
901
902 LTTNG_ASSERT(trigger);
903 ASSERT_RCU_READ_LOCKED();
904
905 pthread_mutex_lock(&executor->work.lock);
906 /* Check for queue overflow. */
907 if (executor->work.pending_count >= MAX_QUEUED_WORK_COUNT) {
908 /* Most likely spammy, remove if it is the case. */
909 DBG("Refusing to enqueue action for trigger (overflow): trigger name = `%s`, work item id = %" PRIu64,
910 get_trigger_name(trigger),
911 work_item_id);
912 executor_status = ACTION_EXECUTOR_STATUS_OVERFLOW;
913 goto error_unlock;
914 }
915
916 work_item = zmalloc<action_work_item>();
917 if (!work_item) {
918 PERROR("Failed to allocate action executor work item: trigger name = `%s`",
919 get_trigger_name(trigger));
920 executor_status = ACTION_EXECUTOR_STATUS_ERROR;
921 goto error_unlock;
922 }
923
924 lttng_trigger_get(trigger);
925 if (client_list) {
926 const bool reference_acquired = notification_client_list_get(client_list);
927
928 LTTNG_ASSERT(reference_acquired);
929 }
930
931 work_item->id = work_item_id;
932 work_item->trigger = trigger;
933
934 /* Ownership transferred to the work item. */
935 work_item->evaluation = evaluation;
936 evaluation = nullptr;
937
938 work_item->client_list = client_list;
939 work_item->object_creds.is_set = !!object_creds;
940 if (object_creds) {
941 work_item->object_creds.value = *object_creds;
942 }
943
944 CDS_INIT_LIST_HEAD(&work_item->list_node);
945
946 /* Build the array of action work subitems for the passed trigger. */
947 lttng_dynamic_array_init(&work_item->subitems,
948 sizeof(struct action_work_subitem),
949 action_work_subitem_destructor);
950
951 ret = populate_subitem_array_from_trigger(trigger, &work_item->subitems);
952 if (ret) {
953 ERR("Failed to populate work item sub items on behalf of trigger: trigger name = `%s`",
954 get_trigger_name(trigger));
955 executor_status = ACTION_EXECUTOR_STATUS_ERROR;
956 goto error_unlock;
957 }
958
959 cds_list_add_tail(&work_item->list_node, &executor->work.list);
960 executor->work.pending_count++;
961 DBG("Enqueued action for trigger: trigger name = `%s`, work item id = %" PRIu64,
962 get_trigger_name(trigger),
963 work_item_id);
964 signal = true;
965
966 error_unlock:
967 if (signal) {
968 pthread_cond_signal(&executor->work.cond);
969 }
970
971 pthread_mutex_unlock(&executor->work.lock);
972 lttng_evaluation_destroy(evaluation);
973 return executor_status;
974 }
975
976 static int add_action_to_subitem_array(struct lttng_action *action,
977 struct lttng_dynamic_array *subitems)
978 {
979 int ret = 0;
980 enum lttng_action_type type = lttng_action_get_type(action);
981 const char *session_name = nullptr;
982 enum lttng_action_status status;
983 struct action_work_subitem subitem = {
984 .action = nullptr,
985 .context = {
986 .session_id = LTTNG_OPTIONAL_INIT_UNSET,
987 },
988 };
989
990 LTTNG_ASSERT(action);
991 LTTNG_ASSERT(subitems);
992
993 if (type == LTTNG_ACTION_TYPE_LIST) {
994 unsigned int count, i;
995
996 status = lttng_action_list_get_count(action, &count);
997 LTTNG_ASSERT(status == LTTNG_ACTION_STATUS_OK);
998
999 for (i = 0; i < count; i++) {
1000 struct lttng_action *inner_action = nullptr;
1001
1002 inner_action = lttng_action_list_borrow_mutable_at_index(action, i);
1003 LTTNG_ASSERT(inner_action);
1004 ret = add_action_to_subitem_array(inner_action, subitems);
1005 if (ret) {
1006 goto end;
1007 }
1008 }
1009
1010 /*
1011 * Go directly to the end since there is no need to add the
1012 * list action by itself to the subitems array.
1013 */
1014 goto end;
1015 }
1016
1017 /* Gather execution context. */
1018 switch (type) {
1019 case LTTNG_ACTION_TYPE_NOTIFY:
1020 break;
1021 case LTTNG_ACTION_TYPE_START_SESSION:
1022 status = lttng_action_start_session_get_session_name(action, &session_name);
1023 LTTNG_ASSERT(status == LTTNG_ACTION_STATUS_OK);
1024 break;
1025 case LTTNG_ACTION_TYPE_STOP_SESSION:
1026 status = lttng_action_stop_session_get_session_name(action, &session_name);
1027 LTTNG_ASSERT(status == LTTNG_ACTION_STATUS_OK);
1028 break;
1029 case LTTNG_ACTION_TYPE_ROTATE_SESSION:
1030 status = lttng_action_rotate_session_get_session_name(action, &session_name);
1031 LTTNG_ASSERT(status == LTTNG_ACTION_STATUS_OK);
1032 break;
1033 case LTTNG_ACTION_TYPE_SNAPSHOT_SESSION:
1034 status = lttng_action_snapshot_session_get_session_name(action, &session_name);
1035 LTTNG_ASSERT(status == LTTNG_ACTION_STATUS_OK);
1036 break;
1037 case LTTNG_ACTION_TYPE_LIST:
1038 case LTTNG_ACTION_TYPE_UNKNOWN:
1039 /* Fallthrough */
1040 default:
1041 abort();
1042 break;
1043 }
1044
1045 /*
1046 * Fetch the session execution context info as needed.
1047 * Note that we could decide to not add an action for which we know the
1048 * execution will not happen (i.e no session exists for that name). For
1049 * now we leave the decision to skip to the action executor for sake of
1050 * simplicity and consistency.
1051 */
1052 if (session_name != nullptr) {
1053 uint64_t session_id;
1054
1055 /*
1056 * Instantaneous sampling of the session id if present.
1057 *
1058 * This method is preferred over `sessiond_find_by_name` then
1059 * fetching the session'd id since `sessiond_find_by_name`
1060 * requires the session list lock to be taken.
1061 *
1062 * Taking the session list lock can lead to a deadlock
1063 * between the action executor and the notification thread
1064 * (caller of add_action_to_subitem_array). It is okay if the
1065 * session state changes between the enqueuing time and the
1066 * execution time. The execution context is validated at
1067 * execution time.
1068 */
1069 if (sample_session_id_by_name(session_name, &session_id)) {
1070 LTTNG_OPTIONAL_SET(&subitem.context.session_id, session_id);
1071 }
1072 }
1073
1074 /* Get a reference to the action. */
1075 lttng_action_get(action);
1076 subitem.action = action;
1077
1078 ret = lttng_dynamic_array_add_element(subitems, &subitem);
1079 if (ret) {
1080 ERR("Failed to add work subitem to the subitem array");
1081 lttng_action_put(action);
1082 ret = -1;
1083 goto end;
1084 }
1085
1086 end:
1087 return ret;
1088 }
1089
1090 static int populate_subitem_array_from_trigger(struct lttng_trigger *trigger,
1091 struct lttng_dynamic_array *subitems)
1092 {
1093 struct lttng_action *action;
1094
1095 action = lttng_trigger_get_action(trigger);
1096 LTTNG_ASSERT(action);
1097
1098 return add_action_to_subitem_array(action, subitems);
1099 }
This page took 0.057859 seconds and 4 git commands to generate.