Fix: sessiond: client/client_list lock inversion on disconnect
[lttng-tools.git] / src / bin / lttng-sessiond / action-executor.c
1 /*
2 * Copyright (C) 2020 Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * SPDX-License-Identifier: GPL-2.0-only
5 *
6 */
7
8 #include "action-executor.h"
9 #include "cmd.h"
10 #include "health-sessiond.h"
11 #include "lttng-sessiond.h"
12 #include "notification-thread-internal.h"
13 #include "session.h"
14 #include "thread.h"
15 #include <common/macros.h>
16 #include <common/optional.h>
17 #include <lttng/action/action-internal.h>
18 #include <lttng/action/group.h>
19 #include <lttng/action/notify.h>
20 #include <lttng/action/rotate-session.h>
21 #include <lttng/action/snapshot-session.h>
22 #include <lttng/action/start-session.h>
23 #include <lttng/action/stop-session.h>
24 #include <lttng/condition/evaluation.h>
25 #include <lttng/lttng-error.h>
26 #include <lttng/trigger/trigger-internal.h>
27 #include <pthread.h>
28 #include <stdbool.h>
29 #include <stddef.h>
30 #include <urcu/list.h>
31
32 #define THREAD_NAME "Action Executor"
33 #define MAX_QUEUED_WORK_COUNT 8192
34
35 struct action_work_item {
36 uint64_t id;
37 struct lttng_trigger *trigger;
38 struct lttng_evaluation *evaluation;
39 struct notification_client_list *client_list;
40 LTTNG_OPTIONAL(struct lttng_credentials) object_creds;
41 struct cds_list_head list_node;
42 };
43
44 struct action_executor {
45 struct lttng_thread *thread;
46 struct notification_thread_handle *notification_thread_handle;
47 struct {
48 uint64_t pending_count;
49 struct cds_list_head list;
50 pthread_cond_t cond;
51 pthread_mutex_t lock;
52 } work;
53 bool should_quit;
54 uint64_t next_work_item_id;
55 };
56
57 /*
58 * Only return non-zero on a fatal error that should shut down the action
59 * executor.
60 */
61 typedef int (*action_executor_handler)(struct action_executor *executor,
62 const struct action_work_item *,
63 const struct lttng_action *action);
64
65 static int action_executor_notify_handler(struct action_executor *executor,
66 const struct action_work_item *,
67 const struct lttng_action *);
68 static int action_executor_start_session_handler(struct action_executor *executor,
69 const struct action_work_item *,
70 const struct lttng_action *);
71 static int action_executor_stop_session_handler(struct action_executor *executor,
72 const struct action_work_item *,
73 const struct lttng_action *);
74 static int action_executor_rotate_session_handler(struct action_executor *executor,
75 const struct action_work_item *,
76 const struct lttng_action *);
77 static int action_executor_snapshot_session_handler(struct action_executor *executor,
78 const struct action_work_item *,
79 const struct lttng_action *);
80 static int action_executor_group_handler(struct action_executor *executor,
81 const struct action_work_item *,
82 const struct lttng_action *);
83 static int action_executor_generic_handler(struct action_executor *executor,
84 const struct action_work_item *,
85 const struct lttng_action *);
86
87 static const action_executor_handler action_executors[] = {
88 [LTTNG_ACTION_TYPE_NOTIFY] = action_executor_notify_handler,
89 [LTTNG_ACTION_TYPE_START_SESSION] = action_executor_start_session_handler,
90 [LTTNG_ACTION_TYPE_STOP_SESSION] = action_executor_stop_session_handler,
91 [LTTNG_ACTION_TYPE_ROTATE_SESSION] = action_executor_rotate_session_handler,
92 [LTTNG_ACTION_TYPE_SNAPSHOT_SESSION] = action_executor_snapshot_session_handler,
93 [LTTNG_ACTION_TYPE_GROUP] = action_executor_group_handler,
94 };
95
96 static const char *action_type_names[] = {
97 [LTTNG_ACTION_TYPE_NOTIFY] = "Notify",
98 [LTTNG_ACTION_TYPE_START_SESSION] = "Start session",
99 [LTTNG_ACTION_TYPE_STOP_SESSION] = "Stop session",
100 [LTTNG_ACTION_TYPE_ROTATE_SESSION] = "Rotate session",
101 [LTTNG_ACTION_TYPE_SNAPSHOT_SESSION] = "Snapshot session",
102 [LTTNG_ACTION_TYPE_GROUP] = "Group",
103 };
104
105 static const char *get_action_name(const struct lttng_action *action)
106 {
107 return action_type_names[lttng_action_get_type_const(action)];
108 }
109
110 /* Check if this trigger allowed to interect with a given session. */
111 static bool is_trigger_allowed_for_session(const struct lttng_trigger *trigger,
112 struct ltt_session *session)
113 {
114 bool is_allowed = false;
115 const struct lttng_credentials session_creds = {
116 .uid = session->uid,
117 .gid = session->gid,
118 };
119 /* Can never be NULL. */
120 const struct lttng_credentials *trigger_creds =
121 lttng_trigger_get_credentials(trigger);
122
123 is_allowed = (trigger_creds->uid == session_creds.uid) ||
124 (trigger_creds->uid == 0);
125 if (!is_allowed) {
126 WARN("Trigger is not allowed to interact with session `%s`: session uid = %ld, session gid = %ld, trigger uid = %ld, trigger gid = %ld",
127 session->name,
128 (long int) session->uid,
129 (long int) session->gid,
130 (long int) trigger_creds->uid,
131 (long int) trigger_creds->gid);
132 }
133
134 return is_allowed;
135 }
136
137 static int client_handle_transmission_status(
138 struct notification_client *client,
139 enum client_transmission_status status,
140 void *user_data)
141 {
142 int ret = 0;
143 struct action_executor *executor = user_data;
144 bool update_communication = true;
145
146 switch (status) {
147 case CLIENT_TRANSMISSION_STATUS_COMPLETE:
148 DBG("Successfully sent full notification to client, client_id = %" PRIu64,
149 client->id);
150 update_communication = false;
151 break;
152 case CLIENT_TRANSMISSION_STATUS_QUEUED:
153 DBG("Queued notification in client outgoing buffer, client_id = %" PRIu64,
154 client->id);
155 break;
156 case CLIENT_TRANSMISSION_STATUS_FAIL:
157 DBG("Communication error occurred while sending notification to client, client_id = %" PRIu64,
158 client->id);
159 break;
160 default:
161 ERR("Fatal error encoutered while sending notification to client, client_id = %" PRIu64,
162 client->id);
163 ret = -1;
164 goto end;
165 }
166
167 if (!update_communication) {
168 goto end;
169 }
170
171 /* Safe to read client's id without locking as it is immutable. */
172 ret = notification_thread_client_communication_update(
173 executor->notification_thread_handle, client->id,
174 status);
175 end:
176 return ret;
177 }
178
179 static int action_executor_notify_handler(struct action_executor *executor,
180 const struct action_work_item *work_item,
181 const struct lttng_action *action)
182 {
183 return notification_client_list_send_evaluation(work_item->client_list,
184 lttng_trigger_get_const_condition(work_item->trigger),
185 work_item->evaluation,
186 lttng_trigger_get_credentials(work_item->trigger),
187 LTTNG_OPTIONAL_GET_PTR(work_item->object_creds),
188 client_handle_transmission_status,
189 executor);
190 }
191
192 static int action_executor_start_session_handler(struct action_executor *executor,
193 const struct action_work_item *work_item,
194 const struct lttng_action *action)
195 {
196 int ret = 0;
197 const char *session_name;
198 enum lttng_action_status action_status;
199 struct ltt_session *session;
200 enum lttng_error_code cmd_ret;
201
202 action_status = lttng_action_start_session_get_session_name(
203 action, &session_name);
204 if (action_status != LTTNG_ACTION_STATUS_OK) {
205 ERR("Failed to get session name from `%s` action",
206 get_action_name(action));
207 ret = -1;
208 goto end;
209 }
210
211 session_lock_list();
212 session = session_find_by_name(session_name);
213 if (!session) {
214 DBG("Failed to find session `%s` by name while executing `%s` action of trigger `%p`",
215 session_name, get_action_name(action),
216 work_item->trigger);
217 goto error_unlock_list;
218 }
219
220 session_lock(session);
221 if (!is_trigger_allowed_for_session(work_item->trigger, session)) {
222 goto error_dispose_session;
223 }
224
225 cmd_ret = cmd_start_trace(session);
226 switch (cmd_ret) {
227 case LTTNG_OK:
228 DBG("Successfully started session `%s` on behalf of trigger `%p`",
229 session_name, work_item->trigger);
230 break;
231 case LTTNG_ERR_TRACE_ALREADY_STARTED:
232 DBG("Attempted to start session `%s` on behalf of trigger `%p` but it was already started",
233 session_name, work_item->trigger);
234 break;
235 default:
236 WARN("Failed to start session `%s` on behalf of trigger `%p`: %s",
237 session_name, work_item->trigger,
238 lttng_strerror(-cmd_ret));
239 break;
240 }
241
242 error_dispose_session:
243 session_unlock(session);
244 session_put(session);
245 error_unlock_list:
246 session_unlock_list();
247 end:
248 return ret;
249 }
250
251 static int action_executor_stop_session_handler(struct action_executor *executor,
252 const struct action_work_item *work_item,
253 const struct lttng_action *action)
254 {
255 int ret = 0;
256 const char *session_name;
257 enum lttng_action_status action_status;
258 struct ltt_session *session;
259 enum lttng_error_code cmd_ret;
260
261 action_status = lttng_action_stop_session_get_session_name(
262 action, &session_name);
263 if (action_status != LTTNG_ACTION_STATUS_OK) {
264 ERR("Failed to get session name from `%s` action",
265 get_action_name(action));
266 ret = -1;
267 goto end;
268 }
269
270 session_lock_list();
271 session = session_find_by_name(session_name);
272 if (!session) {
273 DBG("Failed to find session `%s` by name while executing `%s` action of trigger `%p`",
274 session_name, get_action_name(action),
275 work_item->trigger);
276 goto error_unlock_list;
277 }
278
279 session_lock(session);
280 if (!is_trigger_allowed_for_session(work_item->trigger, session)) {
281 goto error_dispose_session;
282 }
283
284 cmd_ret = cmd_stop_trace(session);
285 switch (cmd_ret) {
286 case LTTNG_OK:
287 DBG("Successfully stopped session `%s` on behalf of trigger `%p`",
288 session_name, work_item->trigger);
289 break;
290 case LTTNG_ERR_TRACE_ALREADY_STOPPED:
291 DBG("Attempted to stop session `%s` on behalf of trigger `%p` but it was already stopped",
292 session_name, work_item->trigger);
293 break;
294 default:
295 WARN("Failed to stop session `%s` on behalf of trigger `%p`: %s",
296 session_name, work_item->trigger,
297 lttng_strerror(-cmd_ret));
298 break;
299 }
300
301 error_dispose_session:
302 session_unlock(session);
303 session_put(session);
304 error_unlock_list:
305 session_unlock_list();
306 end:
307 return ret;
308 }
309
310 static int action_executor_rotate_session_handler(struct action_executor *executor,
311 const struct action_work_item *work_item,
312 const struct lttng_action *action)
313 {
314 int ret = 0;
315 const char *session_name;
316 enum lttng_action_status action_status;
317 struct ltt_session *session;
318 enum lttng_error_code cmd_ret;
319
320 action_status = lttng_action_rotate_session_get_session_name(
321 action, &session_name);
322 if (action_status != LTTNG_ACTION_STATUS_OK) {
323 ERR("Failed to get session name from `%s` action",
324 get_action_name(action));
325 ret = -1;
326 goto end;
327 }
328
329 session_lock_list();
330 session = session_find_by_name(session_name);
331 if (!session) {
332 DBG("Failed to find session `%s` by name while executing `%s` action of trigger `%p`",
333 session_name, get_action_name(action),
334 work_item->trigger);
335 goto error_unlock_list;
336 }
337
338 session_lock(session);
339 if (!is_trigger_allowed_for_session(work_item->trigger, session)) {
340 goto error_dispose_session;
341 }
342
343 cmd_ret = cmd_rotate_session(session, NULL, false,
344 LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
345 switch (cmd_ret) {
346 case LTTNG_OK:
347 DBG("Successfully started rotation of session `%s` on behalf of trigger `%p`",
348 session_name, work_item->trigger);
349 break;
350 case LTTNG_ERR_ROTATION_PENDING:
351 DBG("Attempted to start a rotation of session `%s` on behalf of trigger `%p` but a rotation is already ongoing",
352 session_name, work_item->trigger);
353 break;
354 case LTTNG_ERR_ROTATION_MULTIPLE_AFTER_STOP:
355 case LTTNG_ERR_ROTATION_AFTER_STOP_CLEAR:
356 DBG("Attempted to start a rotation of session `%s` on behalf of trigger `%p` but a rotation has already been completed since the last stop or clear",
357 session_name, work_item->trigger);
358 break;
359 default:
360 WARN("Failed to start a rotation of session `%s` on behalf of trigger `%p`: %s",
361 session_name, work_item->trigger,
362 lttng_strerror(-cmd_ret));
363 break;
364 }
365
366 error_dispose_session:
367 session_unlock(session);
368 session_put(session);
369 error_unlock_list:
370 session_unlock_list();
371 end:
372 return ret;
373 }
374
375 static int action_executor_snapshot_session_handler(struct action_executor *executor,
376 const struct action_work_item *work_item,
377 const struct lttng_action *action)
378 {
379 int ret = 0;
380 const char *session_name;
381 enum lttng_action_status action_status;
382 struct ltt_session *session;
383 const struct lttng_snapshot_output default_snapshot_output = {
384 .max_size = UINT64_MAX,
385 };
386 const struct lttng_snapshot_output *snapshot_output =
387 &default_snapshot_output;
388 enum lttng_error_code cmd_ret;
389
390 action_status = lttng_action_snapshot_session_get_session_name(
391 action, &session_name);
392 if (action_status != LTTNG_ACTION_STATUS_OK) {
393 ERR("Failed to get session name from `%s` action",
394 get_action_name(action));
395 ret = -1;
396 goto end;
397 }
398
399 action_status = lttng_action_snapshot_session_get_output(
400 action, &snapshot_output);
401 if (action_status != LTTNG_ACTION_STATUS_OK &&
402 action_status != LTTNG_ACTION_STATUS_UNSET) {
403 ERR("Failed to get output from `%s` action",
404 get_action_name(action));
405 ret = -1;
406 goto end;
407 }
408
409 session_lock_list();
410 session = session_find_by_name(session_name);
411 if (!session) {
412 DBG("Failed to find session `%s` by name while executing `%s` action of trigger `%p`",
413 session_name, get_action_name(action),
414 work_item->trigger);
415 goto error_unlock_list;
416 }
417
418
419 session_lock(session);
420 if (!is_trigger_allowed_for_session(work_item->trigger, session)) {
421 goto error_dispose_session;
422 }
423
424 cmd_ret = cmd_snapshot_record(session, snapshot_output, 0);
425 switch (cmd_ret) {
426 case LTTNG_OK:
427 DBG("Successfully recorded snapshot of session `%s` on behalf of trigger `%p`",
428 session_name, work_item->trigger);
429 break;
430 default:
431 WARN("Failed to record snapshot of session `%s` on behalf of trigger `%p`: %s",
432 session_name, work_item->trigger,
433 lttng_strerror(-cmd_ret));
434 break;
435 }
436
437 error_dispose_session:
438 session_unlock(session);
439 session_put(session);
440 error_unlock_list:
441 session_unlock_list();
442 end:
443 return ret;
444 }
445
446 static int action_executor_group_handler(struct action_executor *executor,
447 const struct action_work_item *work_item,
448 const struct lttng_action *action_group)
449 {
450 int ret = 0;
451 unsigned int i, count;
452 enum lttng_action_status action_status;
453
454 action_status = lttng_action_group_get_count(action_group, &count);
455 if (action_status != LTTNG_ACTION_STATUS_OK) {
456 /* Fatal error. */
457 ERR("Failed to get count of action in action group");
458 ret = -1;
459 goto end;
460 }
461
462 DBG("Action group has %u action%s", count, count != 1 ? "s" : "");
463 for (i = 0; i < count; i++) {
464 const struct lttng_action *action =
465 lttng_action_group_get_at_index(
466 action_group, i);
467
468 ret = action_executor_generic_handler(
469 executor, work_item, action);
470 if (ret) {
471 ERR("Stopping the execution of the action group of trigger `%p` following a fatal error",
472 work_item->trigger);
473 goto end;
474 }
475 }
476 end:
477 return ret;
478 }
479
480 static int action_executor_generic_handler(struct action_executor *executor,
481 const struct action_work_item *work_item,
482 const struct lttng_action *action)
483 {
484 DBG("Executing action `%s` of trigger `%p` action work item %" PRIu64,
485 get_action_name(action),
486 work_item->trigger,
487 work_item->id);
488
489 return action_executors[lttng_action_get_type_const(action)](
490 executor, work_item, action);
491 }
492
493 static int action_work_item_execute(struct action_executor *executor,
494 struct action_work_item *work_item)
495 {
496 int ret;
497 const struct lttng_action *action =
498 lttng_trigger_get_const_action(work_item->trigger);
499
500 DBG("Starting execution of action work item %" PRIu64 " of trigger `%p`",
501 work_item->id, work_item->trigger);
502 ret = action_executor_generic_handler(executor, work_item, action);
503 DBG("Completed execution of action work item %" PRIu64 " of trigger `%p`",
504 work_item->id, work_item->trigger);
505 return ret;
506 }
507
508 static void action_work_item_destroy(struct action_work_item *work_item)
509 {
510 lttng_trigger_put(work_item->trigger);
511 lttng_evaluation_destroy(work_item->evaluation);
512 notification_client_list_put(work_item->client_list);
513 free(work_item);
514 }
515
516 static void *action_executor_thread(void *_data)
517 {
518 struct action_executor *executor = _data;
519
520 assert(executor);
521
522 health_register(health_sessiond, HEALTH_SESSIOND_TYPE_ACTION_EXECUTOR);
523
524 rcu_register_thread();
525 rcu_thread_online();
526
527 DBG("Entering work execution loop");
528 pthread_mutex_lock(&executor->work.lock);
529 while (!executor->should_quit) {
530 int ret;
531 struct action_work_item *work_item;
532
533 health_code_update();
534 if (executor->work.pending_count == 0) {
535 health_poll_entry();
536 DBG("No work items enqueued, entering wait");
537 pthread_cond_wait(&executor->work.cond,
538 &executor->work.lock);
539 DBG("Woke-up from wait");
540 health_poll_exit();
541 continue;
542 }
543
544 /* Pop item from front of the listwith work lock held. */
545 work_item = cds_list_first_entry(&executor->work.list,
546 struct action_work_item, list_node);
547 cds_list_del(&work_item->list_node);
548 executor->work.pending_count--;
549
550 /*
551 * Work can be performed without holding the work lock,
552 * allowing new items to be queued.
553 */
554 pthread_mutex_unlock(&executor->work.lock);
555 ret = action_work_item_execute(executor, work_item);
556 action_work_item_destroy(work_item);
557 if (ret) {
558 /* Fatal error. */
559 break;
560 }
561
562 health_code_update();
563 pthread_mutex_lock(&executor->work.lock);
564 }
565
566 pthread_mutex_unlock(&executor->work.lock);
567 DBG("Left work execution loop");
568
569 health_code_update();
570
571 rcu_thread_offline();
572 rcu_unregister_thread();
573 health_unregister(health_sessiond);
574
575 return NULL;
576 }
577
578 static bool shutdown_action_executor_thread(void *_data)
579 {
580 struct action_executor *executor = _data;
581
582 executor->should_quit = true;
583 pthread_cond_signal(&executor->work.cond);
584 return true;
585 }
586
587 static void clean_up_action_executor_thread(void *_data)
588 {
589 struct action_executor *executor = _data;
590
591 assert(cds_list_empty(&executor->work.list));
592
593 pthread_mutex_destroy(&executor->work.lock);
594 pthread_cond_destroy(&executor->work.cond);
595 free(executor);
596 }
597
598 struct action_executor *action_executor_create(
599 struct notification_thread_handle *handle)
600 {
601 struct action_executor *executor = zmalloc(sizeof(*executor));
602
603 if (!executor) {
604 goto end;
605 }
606
607 CDS_INIT_LIST_HEAD(&executor->work.list);
608 pthread_cond_init(&executor->work.cond, NULL);
609 pthread_mutex_init(&executor->work.lock, NULL);
610 executor->notification_thread_handle = handle;
611
612 executor->thread = lttng_thread_create(THREAD_NAME,
613 action_executor_thread, shutdown_action_executor_thread,
614 clean_up_action_executor_thread, executor);
615 end:
616 return executor;
617 }
618
619 void action_executor_destroy(struct action_executor *executor)
620 {
621 struct action_work_item *work_item, *tmp;
622
623 /* TODO Wait for work list to drain? */
624 lttng_thread_shutdown(executor->thread);
625 pthread_mutex_lock(&executor->work.lock);
626 if (executor->work.pending_count != 0) {
627 WARN("%" PRIu64
628 " trigger action%s still queued for execution and will be discarded",
629 executor->work.pending_count,
630 executor->work.pending_count == 1 ? " is" :
631 "s are");
632 }
633
634 cds_list_for_each_entry_safe (
635 work_item, tmp, &executor->work.list, list_node) {
636 WARN("Discarding action work item %" PRIu64
637 " associated to trigger `%p`",
638 work_item->id, work_item->trigger);
639 cds_list_del(&work_item->list_node);
640 action_work_item_destroy(work_item);
641 }
642 pthread_mutex_unlock(&executor->work.lock);
643 lttng_thread_put(executor->thread);
644 }
645
646 /* RCU read-lock must be held by the caller. */
647 enum action_executor_status action_executor_enqueue(
648 struct action_executor *executor,
649 struct lttng_trigger *trigger,
650 struct lttng_evaluation *evaluation,
651 const struct lttng_credentials *object_creds,
652 struct notification_client_list *client_list)
653 {
654 enum action_executor_status executor_status = ACTION_EXECUTOR_STATUS_OK;
655 const uint64_t work_item_id = executor->next_work_item_id++;
656 struct action_work_item *work_item;
657 bool signal = false;
658
659 pthread_mutex_lock(&executor->work.lock);
660 /* Check for queue overflow. */
661 if (executor->work.pending_count >= MAX_QUEUED_WORK_COUNT) {
662 /* Most likely spammy, remove if it is the case. */
663 DBG("Refusing to enqueue action for trigger `%p` as work item %" PRIu64
664 " (overflow)",
665 trigger, work_item_id);
666 executor_status = ACTION_EXECUTOR_STATUS_OVERFLOW;
667 goto error_unlock;
668 }
669
670 work_item = zmalloc(sizeof(*work_item));
671 if (!work_item) {
672 PERROR("Failed to allocate action executor work item on behalf of trigger `%p`",
673 trigger);
674 executor_status = ACTION_EXECUTOR_STATUS_ERROR;
675 goto error_unlock;
676 }
677
678 lttng_trigger_get(trigger);
679 if (client_list) {
680 const bool reference_acquired =
681 notification_client_list_get(client_list);
682
683 assert(reference_acquired);
684 }
685
686 *work_item = (typeof(*work_item)){
687 .id = work_item_id,
688 .trigger = trigger,
689 /* Ownership transferred to the work item. */
690 .evaluation = evaluation,
691 .object_creds = {
692 .is_set = !!object_creds,
693 .value = object_creds ? *object_creds :
694 (typeof(work_item->object_creds.value)) {},
695 },
696 .client_list = client_list,
697 .list_node = CDS_LIST_HEAD_INIT(work_item->list_node),
698 };
699
700 evaluation = NULL;
701 cds_list_add_tail(&work_item->list_node, &executor->work.list);
702 executor->work.pending_count++;
703 DBG("Enqueued action for trigger `%p` as work item %" PRIu64,
704 trigger, work_item_id);
705 signal = true;
706
707 error_unlock:
708 pthread_mutex_unlock(&executor->work.lock);
709 if (signal) {
710 pthread_cond_signal(&executor->work.cond);
711 }
712
713 lttng_evaluation_destroy(evaluation);
714 return executor_status;
715 }
This page took 0.067439 seconds and 4 git commands to generate.