Tests: Add test to check shared-memory FD leaks after relayd dies
[lttng-tools.git] / src / bin / lttng-sessiond / action-executor.cpp
1 /*
2 * Copyright (C) 2020 Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * SPDX-License-Identifier: GPL-2.0-only
5 *
6 */
7
8 #include "action-executor.hpp"
9 #include "cmd.hpp"
10 #include "health-sessiond.hpp"
11 #include "lttng-sessiond.hpp"
12 #include "notification-thread-internal.hpp"
13 #include "session.hpp"
14 #include "thread.hpp"
15 #include <common/dynamic-array.hpp>
16 #include <common/macros.hpp>
17 #include <common/optional.hpp>
18 #include <lttng/action/action-internal.hpp>
19 #include <lttng/action/list-internal.hpp>
20 #include <lttng/action/list.h>
21 #include <lttng/action/notify-internal.hpp>
22 #include <lttng/action/notify.h>
23 #include <lttng/action/rotate-session.h>
24 #include <lttng/action/snapshot-session.h>
25 #include <lttng/action/start-session.h>
26 #include <lttng/action/stop-session.h>
27 #include <lttng/condition/evaluation.h>
28 #include <lttng/condition/event-rule-matches-internal.hpp>
29 #include <lttng/lttng-error.h>
30 #include <lttng/trigger/trigger-internal.hpp>
31 #include <pthread.h>
32 #include <stdbool.h>
33 #include <stddef.h>
34 #include <urcu/list.h>
35
36 #define THREAD_NAME "Action Executor"
37 #define MAX_QUEUED_WORK_COUNT 8192
38
39 struct action_executor {
40 struct lttng_thread *thread;
41 struct notification_thread_handle *notification_thread_handle;
42 struct {
43 uint64_t pending_count;
44 struct cds_list_head list;
45 pthread_cond_t cond;
46 pthread_mutex_t lock;
47 } work;
48 bool should_quit;
49 uint64_t next_work_item_id;
50 };
51
52 namespace {
53 /*
54 * A work item is composed of a dynamic array of sub-items which
55 * represent a flattened, and augmented, version of a trigger's actions.
56 *
57 * We cannot rely solely on the trigger's actions since each action can have an
58 * execution context we need to comply with.
59 *
60 * The notion of execution context is required since for some actions the
61 * associated object are referenced by name and not by id. This can lead to
62 * a number of ambiguities when executing an action work item.
63 *
64 * For example, let's take a simple trigger such as:
65 * - condition: ust event a
66 * - action: start session S
67 *
68 * At time T, session S exists.
69 * At T + 1, the event A is hit.
70 * At T + 2, the tracer event notification is received and the work item is
71 * queued. Here session S have an id of 1.
72 * At T + 3, the session S is destroyed and a new session S is created, with a
73 * resulting id of 200.
74 * At T +4, the work item is popped from the queue and begin execution and will
75 * start session S with an id of 200 instead of the session S id 1 that was
76 * present at the queuing phase.
77 *
78 * The context to be respected is the one when the work item is queued. If the
79 * execution context is not the same at the moment of execution, we skip the
80 * execution of that sub-item.
81 *
82 * It is the same policy in regards to the validity of the associated
83 * trigger object at the moment of execution, if the trigger is found to be
84 * unregistered, the execution is skipped.
85 */
86 struct action_work_item {
87 uint64_t id;
88
89 /*
90 * The actions to be executed with their respective execution context.
91 * See struct `action_work_subitem`.
92 */
93 struct lttng_dynamic_array subitems;
94
95 /* Execution context data */
96 struct lttng_trigger *trigger;
97 struct lttng_evaluation *evaluation;
98 struct notification_client_list *client_list;
99 LTTNG_OPTIONAL(struct lttng_credentials) object_creds;
100 struct cds_list_head list_node;
101 };
102
103 struct action_work_subitem {
104 struct lttng_action *action;
105 struct {
106 /* Used by actions targeting a session. */
107 LTTNG_OPTIONAL(uint64_t) session_id;
108 } context;
109 };
110 } /* namespace */
111
112
113 /*
114 * Only return non-zero on a fatal error that should shut down the action
115 * executor.
116 */
117 typedef int (*action_executor_handler)(struct action_executor *executor,
118 const struct action_work_item *,
119 struct action_work_subitem *item);
120
121 static int action_executor_notify_handler(struct action_executor *executor,
122 const struct action_work_item *,
123 struct action_work_subitem *);
124 static int action_executor_start_session_handler(
125 struct action_executor *executor,
126 const struct action_work_item *,
127 struct action_work_subitem *);
128 static int action_executor_stop_session_handler(
129 struct action_executor *executor,
130 const struct action_work_item *,
131 struct action_work_subitem *);
132 static int action_executor_rotate_session_handler(
133 struct action_executor *executor,
134 const struct action_work_item *,
135 struct action_work_subitem *);
136 static int action_executor_snapshot_session_handler(
137 struct action_executor *executor,
138 const struct action_work_item *,
139 struct action_work_subitem *);
140 static int action_executor_list_handler(struct action_executor *executor,
141 const struct action_work_item *,
142 struct action_work_subitem *);
143 static int action_executor_generic_handler(struct action_executor *executor,
144 const struct action_work_item *,
145 struct action_work_subitem *);
146
147 static const action_executor_handler action_executors[] = {
148 action_executor_notify_handler,
149 action_executor_start_session_handler,
150 action_executor_stop_session_handler,
151 action_executor_rotate_session_handler,
152 action_executor_snapshot_session_handler,
153 action_executor_list_handler,
154 };
155
156 /* Forward declaration */
157 static int add_action_to_subitem_array(struct lttng_action *action,
158 struct lttng_dynamic_array *subitems);
159
160 static int populate_subitem_array_from_trigger(struct lttng_trigger *trigger,
161 struct lttng_dynamic_array *subitems);
162
163 static void action_work_subitem_destructor(void *element)
164 {
165 struct action_work_subitem *subitem = (action_work_subitem *) element;
166
167 lttng_action_put(subitem->action);
168 }
169
170 static const char *get_action_name(const struct lttng_action *action)
171 {
172 const enum lttng_action_type action_type = lttng_action_get_type(action);
173
174 LTTNG_ASSERT(action_type != LTTNG_ACTION_TYPE_UNKNOWN);
175
176 return lttng_action_type_string(action_type);
177 }
178
179 /* Check if this trigger allowed to interect with a given session. */
180 static bool is_trigger_allowed_for_session(const struct lttng_trigger *trigger,
181 struct ltt_session *session)
182 {
183 bool is_allowed = false;
184 const struct lttng_credentials session_creds = {
185 .uid = LTTNG_OPTIONAL_INIT_VALUE(session->uid),
186 .gid = LTTNG_OPTIONAL_INIT_VALUE(session->gid),
187 };
188 /* Can never be NULL. */
189 const struct lttng_credentials *trigger_creds =
190 lttng_trigger_get_credentials(trigger);
191
192 is_allowed = (lttng_credentials_is_equal_uid(trigger_creds, &session_creds)) ||
193 (lttng_credentials_get_uid(trigger_creds) == 0);
194 if (!is_allowed) {
195 WARN("Trigger is not allowed to interact with session `%s`: session uid = %ld, session gid = %ld, trigger uid = %ld",
196 session->name,
197 (long int) session->uid,
198 (long int) session->gid,
199 (long int) lttng_credentials_get_uid(trigger_creds));
200 }
201
202 return is_allowed;
203 }
204
205 static const char *get_trigger_name(const struct lttng_trigger *trigger)
206 {
207 const char *trigger_name;
208 enum lttng_trigger_status trigger_status;
209
210 trigger_status = lttng_trigger_get_name(trigger, &trigger_name);
211 switch (trigger_status) {
212 case LTTNG_TRIGGER_STATUS_OK:
213 break;
214 case LTTNG_TRIGGER_STATUS_UNSET:
215 trigger_name = "(anonymous)";
216 break;
217 default:
218 trigger_name = "(failed to get name)";
219 break;
220 }
221
222 return trigger_name;
223 }
224
225 static int client_handle_transmission_status(
226 struct notification_client *client,
227 enum client_transmission_status status,
228 void *user_data)
229 {
230 int ret = 0;
231 struct action_executor *executor = (action_executor *) user_data;
232 bool update_communication = true;
233
234 switch (status) {
235 case CLIENT_TRANSMISSION_STATUS_COMPLETE:
236 DBG("Successfully sent full notification to client, client_id = %" PRIu64,
237 client->id);
238 /*
239 * There is no need to wake the (e)poll thread. If it was waiting for
240 * "out" events on the client's socket, it will see that no payload
241 * in queued and will unsubscribe from that event.
242 *
243 * In the other cases, we have to wake the the (e)poll thread to either
244 * handle the error on the client or to get it to monitor the client "out"
245 * events.
246 */
247 update_communication = false;
248 break;
249 case CLIENT_TRANSMISSION_STATUS_QUEUED:
250 DBG("Queued notification in client outgoing buffer, client_id = %" PRIu64,
251 client->id);
252 break;
253 case CLIENT_TRANSMISSION_STATUS_FAIL:
254 DBG("Communication error occurred while sending notification to client, client_id = %" PRIu64,
255 client->id);
256 break;
257 default:
258 ERR("Fatal error encoutered while sending notification to client, client_id = %" PRIu64,
259 client->id);
260 ret = -1;
261 goto end;
262 }
263
264 if (!update_communication) {
265 goto end;
266 }
267
268 /* Safe to read client's id without locking as it is immutable. */
269 ret = notification_thread_client_communication_update(
270 executor->notification_thread_handle, client->id,
271 status);
272 end:
273 return ret;
274 }
275
276 static int action_executor_notify_handler(struct action_executor *executor,
277 const struct action_work_item *work_item,
278 struct action_work_subitem *item __attribute__((unused)))
279 {
280 return notification_client_list_send_evaluation(work_item->client_list,
281 work_item->trigger,
282 work_item->evaluation,
283 work_item->object_creds.is_set ?
284 &(work_item->object_creds.value) :
285 NULL,
286 client_handle_transmission_status, executor);
287 }
288
289 static int action_executor_start_session_handler(
290 struct action_executor *executor __attribute__((unused)),
291 const struct action_work_item *work_item,
292 struct action_work_subitem *item)
293 {
294 int ret = 0;
295 const char *session_name;
296 enum lttng_action_status action_status;
297 struct ltt_session *session;
298 enum lttng_error_code cmd_ret;
299 struct lttng_action *action = item->action;
300
301 action_status = lttng_action_start_session_get_session_name(
302 action, &session_name);
303 if (action_status != LTTNG_ACTION_STATUS_OK) {
304 ERR("Failed to get session name from `%s` action",
305 get_action_name(action));
306 ret = -1;
307 goto end;
308 }
309
310 /*
311 * Validate if at the moment of the action was queued the session
312 * existed. If not skip the action altogether.
313 */
314 if (!item->context.session_id.is_set) {
315 DBG("Session `%s` was not present at the moment the work item was enqueued for `%s` action of trigger `%s`",
316 session_name, get_action_name(action),
317 get_trigger_name(work_item->trigger));
318 lttng_action_increase_execution_failure_count(action);
319 goto end;
320 }
321
322 session_lock_list();
323 rcu_read_lock();
324 session = session_find_by_id(LTTNG_OPTIONAL_GET(item->context.session_id));
325 if (!session) {
326 DBG("Failed to find session `%s` by name while executing `%s` action of trigger `%s`",
327 session_name, get_action_name(action),
328 get_trigger_name(work_item->trigger));
329 lttng_action_increase_execution_failure_count(action);
330 goto error_unlock_list;
331 }
332
333 session_lock(session);
334 if (session->destroyed) {
335 DBG("Session `%s` with id = %" PRIu64 " is flagged as destroyed. Skipping: action = `%s`, trigger = `%s`",
336 session->name, session->id,
337 get_action_name(action),
338 get_trigger_name(work_item->trigger));
339 goto error_unlock_session;
340 }
341
342 if (!is_trigger_allowed_for_session(work_item->trigger, session)) {
343 goto error_unlock_session;
344 }
345
346 cmd_ret = (lttng_error_code) cmd_start_trace(session);
347 switch (cmd_ret) {
348 case LTTNG_OK:
349 DBG("Successfully started session `%s` on behalf of trigger `%s`",
350 session_name, get_trigger_name(work_item->trigger));
351 break;
352 case LTTNG_ERR_TRACE_ALREADY_STARTED:
353 DBG("Attempted to start session `%s` on behalf of trigger `%s` but it was already started",
354 session_name, get_trigger_name(work_item->trigger));
355 break;
356 default:
357 WARN("Failed to start session `%s` on behalf of trigger `%s`: %s",
358 session_name, get_trigger_name(work_item->trigger),
359 lttng_strerror(-cmd_ret));
360 lttng_action_increase_execution_failure_count(action);
361 break;
362 }
363
364 error_unlock_session:
365 session_unlock(session);
366 session_put(session);
367 error_unlock_list:
368 rcu_read_unlock();
369 session_unlock_list();
370 end:
371 return ret;
372 }
373
374 static int action_executor_stop_session_handler(
375 struct action_executor *executor __attribute__((unused)),
376 const struct action_work_item *work_item,
377 struct action_work_subitem *item)
378 {
379 int ret = 0;
380 const char *session_name;
381 enum lttng_action_status action_status;
382 struct ltt_session *session;
383 enum lttng_error_code cmd_ret;
384 struct lttng_action *action = item->action;
385
386 action_status = lttng_action_stop_session_get_session_name(
387 action, &session_name);
388 if (action_status != LTTNG_ACTION_STATUS_OK) {
389 ERR("Failed to get session name from `%s` action",
390 get_action_name(action));
391 ret = -1;
392 goto end;
393 }
394
395 /*
396 * Validate if, at the moment the action was queued, the target session
397 * existed. If not, skip the action altogether.
398 */
399 if (!item->context.session_id.is_set) {
400 DBG("Session `%s` was not present at the moment the work item was enqueued for `%s` action of trigger `%s`",
401 session_name, get_action_name(action),
402 get_trigger_name(work_item->trigger));
403 lttng_action_increase_execution_failure_count(action);
404 goto end;
405 }
406
407 session_lock_list();
408 rcu_read_lock();
409 session = session_find_by_id(LTTNG_OPTIONAL_GET(item->context.session_id));
410 if (!session) {
411 DBG("Failed to find session `%s` by name while executing `%s` action of trigger `%s`",
412 session_name, get_action_name(action),
413 get_trigger_name(work_item->trigger));
414 lttng_action_increase_execution_failure_count(action);
415 goto error_unlock_list;
416 }
417
418 session_lock(session);
419 if (session->destroyed) {
420 DBG("Session `%s` with id = %" PRIu64 " is flagged as destroyed. Skipping: action = `%s`, trigger = `%s`",
421 session->name, session->id,
422 get_action_name(action),
423 get_trigger_name(work_item->trigger));
424 goto error_unlock_session;
425 }
426
427 if (!is_trigger_allowed_for_session(work_item->trigger, session)) {
428 goto error_unlock_session;
429 }
430
431 cmd_ret = (lttng_error_code) cmd_stop_trace(session);
432 switch (cmd_ret) {
433 case LTTNG_OK:
434 DBG("Successfully stopped session `%s` on behalf of trigger `%s`",
435 session_name, get_trigger_name(work_item->trigger));
436 break;
437 case LTTNG_ERR_TRACE_ALREADY_STOPPED:
438 DBG("Attempted to stop session `%s` on behalf of trigger `%s` but it was already stopped",
439 session_name, get_trigger_name(work_item->trigger));
440 break;
441 default:
442 WARN("Failed to stop session `%s` on behalf of trigger `%s`: %s",
443 session_name, get_trigger_name(work_item->trigger),
444 lttng_strerror(-cmd_ret));
445 lttng_action_increase_execution_failure_count(action);
446 break;
447 }
448
449 error_unlock_session:
450 session_unlock(session);
451 session_put(session);
452 error_unlock_list:
453 rcu_read_unlock();
454 session_unlock_list();
455 end:
456 return ret;
457 }
458
459 static int action_executor_rotate_session_handler(
460 struct action_executor *executor __attribute__((unused)),
461 const struct action_work_item *work_item,
462 struct action_work_subitem *item)
463 {
464 int ret = 0;
465 const char *session_name;
466 enum lttng_action_status action_status;
467 struct ltt_session *session;
468 enum lttng_error_code cmd_ret;
469 struct lttng_action *action = item->action;
470
471 action_status = lttng_action_rotate_session_get_session_name(
472 action, &session_name);
473 if (action_status != LTTNG_ACTION_STATUS_OK) {
474 ERR("Failed to get session name from `%s` action",
475 get_action_name(action));
476 ret = -1;
477 goto end;
478 }
479
480 /*
481 * Validate if, at the moment the action was queued, the target session
482 * existed. If not, skip the action altogether.
483 */
484 if (!item->context.session_id.is_set) {
485 DBG("Session `%s` was not present at the moment the work item was enqueued for `%s` action of trigger `%s`",
486 session_name, get_action_name(action),
487 get_trigger_name(work_item->trigger));
488 lttng_action_increase_execution_failure_count(action);
489 goto end;
490 }
491
492 session_lock_list();
493 rcu_read_lock();
494 session = session_find_by_id(LTTNG_OPTIONAL_GET(item->context.session_id));
495 if (!session) {
496 DBG("Failed to find session `%s` by name while executing `%s` action of trigger `%s`",
497 session_name, get_action_name(action),
498 get_trigger_name(work_item->trigger));
499 lttng_action_increase_execution_failure_count(action);
500 goto error_unlock_list;
501 }
502
503 session_lock(session);
504 if (session->destroyed) {
505 DBG("Session `%s` with id = %" PRIu64 " is flagged as destroyed. Skipping: action = `%s`, trigger = `%s`",
506 session->name, session->id,
507 get_action_name(action),
508 get_trigger_name(work_item->trigger));
509 goto error_unlock_session;
510 }
511
512 if (!is_trigger_allowed_for_session(work_item->trigger, session)) {
513 goto error_unlock_session;
514 }
515
516 cmd_ret = (lttng_error_code) cmd_rotate_session(session, NULL, false,
517 LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
518 switch (cmd_ret) {
519 case LTTNG_OK:
520 DBG("Successfully started rotation of session `%s` on behalf of trigger `%s`",
521 session_name, get_trigger_name(work_item->trigger));
522 break;
523 case LTTNG_ERR_ROTATION_PENDING:
524 DBG("Attempted to start a rotation of session `%s` on behalf of trigger `%s` but a rotation is already ongoing",
525 session_name, get_trigger_name(work_item->trigger));
526 lttng_action_increase_execution_failure_count(action);
527 break;
528 case LTTNG_ERR_ROTATION_MULTIPLE_AFTER_STOP:
529 case LTTNG_ERR_ROTATION_AFTER_STOP_CLEAR:
530 DBG("Attempted to start a rotation of session `%s` on behalf of trigger `%s` but a rotation has already been completed since the last stop or clear",
531 session_name, get_trigger_name(work_item->trigger));
532 break;
533 default:
534 WARN("Failed to start a rotation of session `%s` on behalf of trigger `%s`: %s",
535 session_name, get_trigger_name(work_item->trigger),
536 lttng_strerror(-cmd_ret));
537 lttng_action_increase_execution_failure_count(action);
538 break;
539 }
540
541 error_unlock_session:
542 session_unlock(session);
543 session_put(session);
544 error_unlock_list:
545 rcu_read_unlock();
546 session_unlock_list();
547 end:
548 return ret;
549 }
550
551 static int action_executor_snapshot_session_handler(
552 struct action_executor *executor __attribute__((unused)),
553 const struct action_work_item *work_item,
554 struct action_work_subitem *item)
555 {
556 int ret = 0;
557 const char *session_name;
558 enum lttng_action_status action_status;
559 struct ltt_session *session;
560 lttng_snapshot_output default_snapshot_output;
561 const struct lttng_snapshot_output *snapshot_output =
562 &default_snapshot_output;
563 enum lttng_error_code cmd_ret;
564 struct lttng_action *action = item->action;
565
566 default_snapshot_output.max_size = UINT64_MAX;
567
568 /*
569 * Validate if, at the moment the action was queued, the target session
570 * existed. If not, skip the action altogether.
571 */
572 if (!item->context.session_id.is_set) {
573 DBG("Session was not present at the moment the work item was enqueued for `%s` action of trigger `%s`",
574 get_action_name(action),
575 get_trigger_name(work_item->trigger));
576 lttng_action_increase_execution_failure_count(action);
577 goto end;
578 }
579
580 action_status = lttng_action_snapshot_session_get_session_name(
581 action, &session_name);
582 if (action_status != LTTNG_ACTION_STATUS_OK) {
583 ERR("Failed to get session name from `%s` action",
584 get_action_name(action));
585 ret = -1;
586 goto end;
587 }
588
589 action_status = lttng_action_snapshot_session_get_output(
590 action, &snapshot_output);
591 if (action_status != LTTNG_ACTION_STATUS_OK &&
592 action_status != LTTNG_ACTION_STATUS_UNSET) {
593 ERR("Failed to get output from `%s` action",
594 get_action_name(action));
595 ret = -1;
596 goto end;
597 }
598
599 session_lock_list();
600 rcu_read_lock();
601 session = session_find_by_id(LTTNG_OPTIONAL_GET(item->context.session_id));
602 if (!session) {
603 DBG("Failed to find session `%s` by name while executing `%s` action of trigger `%s`",
604 session_name, get_action_name(action),
605 get_trigger_name(work_item->trigger));
606 lttng_action_increase_execution_failure_count(action);
607 goto error_unlock_list;
608 }
609
610 session_lock(session);
611 if (session->destroyed) {
612 DBG("Session `%s` with id = %" PRIu64 " is flagged as destroyed. Skipping: action = `%s`, trigger = `%s`",
613 session->name, session->id,
614 get_action_name(action),
615 get_trigger_name(work_item->trigger));
616 goto error_unlock_session;
617 }
618
619 if (!is_trigger_allowed_for_session(work_item->trigger, session)) {
620 goto error_unlock_session;
621 }
622
623 cmd_ret = (lttng_error_code) cmd_snapshot_record(session, snapshot_output, 0);
624 switch (cmd_ret) {
625 case LTTNG_OK:
626 DBG("Successfully recorded snapshot of session `%s` on behalf of trigger `%s`",
627 session_name, get_trigger_name(work_item->trigger));
628 break;
629 default:
630 WARN("Failed to record snapshot of session `%s` on behalf of trigger `%s`: %s",
631 session_name, get_trigger_name(work_item->trigger),
632 lttng_strerror(-cmd_ret));
633 lttng_action_increase_execution_failure_count(action);
634 break;
635 }
636
637 error_unlock_session:
638 session_unlock(session);
639 session_put(session);
640 error_unlock_list:
641 rcu_read_unlock();
642 session_unlock_list();
643 end:
644 return ret;
645 }
646
647 static int action_executor_list_handler(
648 struct action_executor *executor __attribute__((unused)),
649 const struct action_work_item *work_item __attribute__((unused)),
650 struct action_work_subitem *item __attribute__((unused)))
651 {
652 ERR("Execution of a list action by the action executor should never occur");
653 abort();
654 }
655
656 static int action_executor_generic_handler(struct action_executor *executor,
657 const struct action_work_item *work_item,
658 struct action_work_subitem *item)
659 {
660 int ret;
661 struct lttng_action *action = item->action;
662 const enum lttng_action_type action_type = lttng_action_get_type(action);
663
664 LTTNG_ASSERT(action_type != LTTNG_ACTION_TYPE_UNKNOWN);
665
666 lttng_action_increase_execution_request_count(action);
667 if (!lttng_action_should_execute(action)) {
668 DBG("Policy prevented execution of action `%s` of trigger `%s` action work item %" PRIu64,
669 get_action_name(action),
670 get_trigger_name(work_item->trigger),
671 work_item->id);
672 ret = 0;
673 goto end;
674 }
675
676 lttng_action_increase_execution_count(action);
677 DBG("Executing action `%s` of trigger `%s` action work item %" PRIu64,
678 get_action_name(action),
679 get_trigger_name(work_item->trigger),
680 work_item->id);
681 ret = action_executors[action_type](executor, work_item, item);
682 end:
683 return ret;
684 }
685
686 static int action_work_item_execute(struct action_executor *executor,
687 struct action_work_item *work_item)
688 {
689 int ret;
690 size_t count, i;
691
692 DBG("Starting execution of action work item %" PRIu64 " of trigger `%s`",
693 work_item->id, get_trigger_name(work_item->trigger));
694
695 count = lttng_dynamic_array_get_count(&work_item->subitems);
696 for (i = 0; i < count; i++) {
697 struct action_work_subitem *item;
698
699 item = (action_work_subitem *) lttng_dynamic_array_get_element(&work_item->subitems, i);
700 ret = action_executor_generic_handler(
701 executor, work_item, item);
702 if (ret) {
703 goto end;
704 }
705 }
706 end:
707 DBG("Completed execution of action work item %" PRIu64 " of trigger `%s`",
708 work_item->id, get_trigger_name(work_item->trigger));
709 return ret;
710 }
711
712 static void action_work_item_destroy(struct action_work_item *work_item)
713 {
714 lttng_trigger_put(work_item->trigger);
715 lttng_evaluation_destroy(work_item->evaluation);
716 notification_client_list_put(work_item->client_list);
717 lttng_dynamic_array_reset(&work_item->subitems);
718 free(work_item);
719 }
720
721 static void *action_executor_thread(void *_data)
722 {
723 struct action_executor *executor = (action_executor *) _data;
724
725 LTTNG_ASSERT(executor);
726
727 health_register(the_health_sessiond,
728 HEALTH_SESSIOND_TYPE_ACTION_EXECUTOR);
729
730 rcu_register_thread();
731 rcu_thread_online();
732
733 DBG("Entering work execution loop");
734 pthread_mutex_lock(&executor->work.lock);
735 while (!executor->should_quit) {
736 int ret = 0;
737 struct action_work_item *work_item;
738
739 health_code_update();
740 if (executor->work.pending_count == 0) {
741 health_poll_entry();
742 DBG("No work items enqueued, entering wait");
743 pthread_cond_wait(&executor->work.cond,
744 &executor->work.lock);
745 DBG("Woke-up from wait");
746 health_poll_exit();
747 continue;
748 }
749
750 /* Pop item from front of the list with work lock held. */
751 work_item = cds_list_first_entry(&executor->work.list,
752 struct action_work_item, list_node);
753 cds_list_del(&work_item->list_node);
754 executor->work.pending_count--;
755
756 /*
757 * Work can be performed without holding the work lock,
758 * allowing new items to be queued.
759 */
760 pthread_mutex_unlock(&executor->work.lock);
761
762 /* Execute item only if a trigger is registered. */
763 lttng_trigger_lock(work_item->trigger);
764 if (!lttng_trigger_is_registered(work_item->trigger)) {
765 const char *trigger_name = NULL;
766 uid_t trigger_owner_uid;
767 enum lttng_trigger_status trigger_status;
768
769 trigger_name = get_trigger_name(work_item->trigger);
770
771 trigger_status = lttng_trigger_get_owner_uid(
772 work_item->trigger, &trigger_owner_uid);
773 LTTNG_ASSERT(trigger_status == LTTNG_TRIGGER_STATUS_OK);
774
775 DBG("Work item skipped since the associated trigger is no longer registered: work item id = %" PRIu64 ", trigger name = `%s`, trigger owner uid = %d",
776 work_item->id, trigger_name,
777 (int) trigger_owner_uid);
778 ret = 0;
779 goto skip_execute;
780 }
781
782 ret = action_work_item_execute(executor, work_item);
783
784 skip_execute:
785 lttng_trigger_unlock(work_item->trigger);
786 action_work_item_destroy(work_item);
787 if (ret) {
788 /* Fatal error. */
789 break;
790 }
791
792 health_code_update();
793 pthread_mutex_lock(&executor->work.lock);
794 }
795
796 if (executor->should_quit) {
797 pthread_mutex_unlock(&executor->work.lock);
798 }
799 DBG("Left work execution loop");
800
801 health_code_update();
802
803 rcu_thread_offline();
804 rcu_unregister_thread();
805 health_unregister(the_health_sessiond);
806
807 return NULL;
808 }
809
810 static bool shutdown_action_executor_thread(void *_data)
811 {
812 struct action_executor *executor = (action_executor *) _data;
813
814 pthread_mutex_lock(&executor->work.lock);
815 executor->should_quit = true;
816 pthread_cond_signal(&executor->work.cond);
817 pthread_mutex_unlock(&executor->work.lock);
818 return true;
819 }
820
821 static void clean_up_action_executor_thread(void *_data)
822 {
823 struct action_executor *executor = (action_executor *) _data;
824
825 LTTNG_ASSERT(cds_list_empty(&executor->work.list));
826
827 pthread_mutex_destroy(&executor->work.lock);
828 pthread_cond_destroy(&executor->work.cond);
829 free(executor);
830 }
831
832 struct action_executor *action_executor_create(
833 struct notification_thread_handle *handle)
834 {
835 struct action_executor *executor = zmalloc<action_executor>();
836
837 if (!executor) {
838 goto end;
839 }
840
841 CDS_INIT_LIST_HEAD(&executor->work.list);
842 pthread_cond_init(&executor->work.cond, NULL);
843 pthread_mutex_init(&executor->work.lock, NULL);
844 executor->notification_thread_handle = handle;
845
846 executor->thread = lttng_thread_create(THREAD_NAME,
847 action_executor_thread, shutdown_action_executor_thread,
848 clean_up_action_executor_thread, executor);
849 end:
850 return executor;
851 }
852
853 void action_executor_destroy(struct action_executor *executor)
854 {
855 struct action_work_item *work_item, *tmp;
856
857 /* TODO Wait for work list to drain? */
858 lttng_thread_shutdown(executor->thread);
859 pthread_mutex_lock(&executor->work.lock);
860 if (executor->work.pending_count != 0) {
861 WARN("%" PRIu64
862 " trigger action%s still queued for execution and will be discarded",
863 executor->work.pending_count,
864 executor->work.pending_count == 1 ? " is" :
865 "s are");
866 }
867
868 cds_list_for_each_entry_safe (
869 work_item, tmp, &executor->work.list, list_node) {
870 WARN("Discarding action work item %" PRIu64
871 " associated to trigger `%s`",
872 work_item->id, get_trigger_name(work_item->trigger));
873 cds_list_del(&work_item->list_node);
874 action_work_item_destroy(work_item);
875 }
876 pthread_mutex_unlock(&executor->work.lock);
877 lttng_thread_put(executor->thread);
878 }
879
880 /* RCU read-lock must be held by the caller. */
881 enum action_executor_status action_executor_enqueue_trigger(
882 struct action_executor *executor,
883 struct lttng_trigger *trigger,
884 struct lttng_evaluation *evaluation,
885 const struct lttng_credentials *object_creds,
886 struct notification_client_list *client_list)
887 {
888 int ret;
889 enum action_executor_status executor_status = ACTION_EXECUTOR_STATUS_OK;
890 const uint64_t work_item_id = executor->next_work_item_id++;
891 struct action_work_item *work_item;
892 bool signal = false;
893
894 LTTNG_ASSERT(trigger);
895 ASSERT_RCU_READ_LOCKED();
896
897 pthread_mutex_lock(&executor->work.lock);
898 /* Check for queue overflow. */
899 if (executor->work.pending_count >= MAX_QUEUED_WORK_COUNT) {
900 /* Most likely spammy, remove if it is the case. */
901 DBG("Refusing to enqueue action for trigger (overflow): trigger name = `%s`, work item id = %" PRIu64,
902 get_trigger_name(trigger), work_item_id);
903 executor_status = ACTION_EXECUTOR_STATUS_OVERFLOW;
904 goto error_unlock;
905 }
906
907 work_item = zmalloc<action_work_item>();
908 if (!work_item) {
909 PERROR("Failed to allocate action executor work item: trigger name = `%s`",
910 get_trigger_name(trigger));
911 executor_status = ACTION_EXECUTOR_STATUS_ERROR;
912 goto error_unlock;
913 }
914
915 lttng_trigger_get(trigger);
916 if (client_list) {
917 const bool reference_acquired =
918 notification_client_list_get(client_list);
919
920 LTTNG_ASSERT(reference_acquired);
921 }
922
923 work_item->id = work_item_id;
924 work_item->trigger = trigger;
925
926 /* Ownership transferred to the work item. */
927 work_item->evaluation = evaluation;
928 evaluation = NULL;
929
930 work_item->client_list = client_list;
931 work_item->object_creds.is_set = !!object_creds;
932 if (object_creds) {
933 work_item->object_creds.value = *object_creds;
934 }
935
936 CDS_INIT_LIST_HEAD(&work_item->list_node);
937
938 /* Build the array of action work subitems for the passed trigger. */
939 lttng_dynamic_array_init(&work_item->subitems,
940 sizeof(struct action_work_subitem),
941 action_work_subitem_destructor);
942
943 ret = populate_subitem_array_from_trigger(
944 trigger, &work_item->subitems);
945 if (ret) {
946 ERR("Failed to populate work item sub items on behalf of trigger: trigger name = `%s`",
947 get_trigger_name(trigger));
948 executor_status = ACTION_EXECUTOR_STATUS_ERROR;
949 goto error_unlock;
950 }
951
952 cds_list_add_tail(&work_item->list_node, &executor->work.list);
953 executor->work.pending_count++;
954 DBG("Enqueued action for trigger: trigger name = `%s`, work item id = %" PRIu64,
955 get_trigger_name(trigger), work_item_id);
956 signal = true;
957
958 error_unlock:
959 if (signal) {
960 pthread_cond_signal(&executor->work.cond);
961 }
962
963 pthread_mutex_unlock(&executor->work.lock);
964 lttng_evaluation_destroy(evaluation);
965 return executor_status;
966 }
967
968 static int add_action_to_subitem_array(struct lttng_action *action,
969 struct lttng_dynamic_array *subitems)
970 {
971 int ret = 0;
972 enum lttng_action_type type = lttng_action_get_type(action);
973 const char *session_name = NULL;
974 enum lttng_action_status status;
975 struct action_work_subitem subitem = {
976 .action = NULL,
977 .context = {
978 .session_id = LTTNG_OPTIONAL_INIT_UNSET,
979 },
980 };
981
982 LTTNG_ASSERT(action);
983 LTTNG_ASSERT(subitems);
984
985 if (type == LTTNG_ACTION_TYPE_LIST) {
986 unsigned int count, i;
987
988 status = lttng_action_list_get_count(action, &count);
989 LTTNG_ASSERT(status == LTTNG_ACTION_STATUS_OK);
990
991 for (i = 0; i < count; i++) {
992 struct lttng_action *inner_action = NULL;
993
994 inner_action = lttng_action_list_borrow_mutable_at_index(
995 action, i);
996 LTTNG_ASSERT(inner_action);
997 ret = add_action_to_subitem_array(
998 inner_action, subitems);
999 if (ret) {
1000 goto end;
1001 }
1002 }
1003
1004 /*
1005 * Go directly to the end since there is no need to add the
1006 * list action by itself to the subitems array.
1007 */
1008 goto end;
1009 }
1010
1011 /* Gather execution context. */
1012 switch (type) {
1013 case LTTNG_ACTION_TYPE_NOTIFY:
1014 break;
1015 case LTTNG_ACTION_TYPE_START_SESSION:
1016 status = lttng_action_start_session_get_session_name(
1017 action, &session_name);
1018 LTTNG_ASSERT(status == LTTNG_ACTION_STATUS_OK);
1019 break;
1020 case LTTNG_ACTION_TYPE_STOP_SESSION:
1021 status = lttng_action_stop_session_get_session_name(
1022 action, &session_name);
1023 LTTNG_ASSERT(status == LTTNG_ACTION_STATUS_OK);
1024 break;
1025 case LTTNG_ACTION_TYPE_ROTATE_SESSION:
1026 status = lttng_action_rotate_session_get_session_name(
1027 action, &session_name);
1028 LTTNG_ASSERT(status == LTTNG_ACTION_STATUS_OK);
1029 break;
1030 case LTTNG_ACTION_TYPE_SNAPSHOT_SESSION:
1031 status = lttng_action_snapshot_session_get_session_name(
1032 action, &session_name);
1033 LTTNG_ASSERT(status == LTTNG_ACTION_STATUS_OK);
1034 break;
1035 case LTTNG_ACTION_TYPE_LIST:
1036 case LTTNG_ACTION_TYPE_UNKNOWN:
1037 /* Fallthrough */
1038 default:
1039 abort();
1040 break;
1041 }
1042
1043 /*
1044 * Fetch the session execution context info as needed.
1045 * Note that we could decide to not add an action for which we know the
1046 * execution will not happen (i.e no session exists for that name). For
1047 * now we leave the decision to skip to the action executor for sake of
1048 * simplicity and consistency.
1049 */
1050 if (session_name != NULL) {
1051 uint64_t session_id;
1052
1053 /*
1054 * Instantaneous sampling of the session id if present.
1055 *
1056 * This method is preferred over `sessiond_find_by_name` then
1057 * fetching the session'd id since `sessiond_find_by_name`
1058 * requires the session list lock to be taken.
1059 *
1060 * Taking the session list lock can lead to a deadlock
1061 * between the action executor and the notification thread
1062 * (caller of add_action_to_subitem_array). It is okay if the
1063 * session state changes between the enqueuing time and the
1064 * execution time. The execution context is validated at
1065 * execution time.
1066 */
1067 if (sample_session_id_by_name(session_name, &session_id)) {
1068 LTTNG_OPTIONAL_SET(&subitem.context.session_id,
1069 session_id);
1070 }
1071 }
1072
1073 /* Get a reference to the action. */
1074 lttng_action_get(action);
1075 subitem.action = action;
1076
1077 ret = lttng_dynamic_array_add_element(subitems, &subitem);
1078 if (ret) {
1079 ERR("Failed to add work subitem to the subitem array");
1080 lttng_action_put(action);
1081 ret = -1;
1082 goto end;
1083 }
1084
1085 end:
1086 return ret;
1087 }
1088
1089 static int populate_subitem_array_from_trigger(struct lttng_trigger *trigger,
1090 struct lttng_dynamic_array *subitems)
1091 {
1092 struct lttng_action *action;
1093
1094 action = lttng_trigger_get_action(trigger);
1095 LTTNG_ASSERT(action);
1096
1097 return add_action_to_subitem_array(action, subitems);
1098 }
This page took 0.050757 seconds and 4 git commands to generate.