common: rename filter bytecode types
[lttng-tools.git] / src / bin / lttng-sessiond / notification-thread.c
CommitLineData
ab0ee2ca 1/*
ab5be9fa 2 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
ab0ee2ca 3 *
ab5be9fa 4 * SPDX-License-Identifier: GPL-2.0-only
ab0ee2ca 5 *
ab0ee2ca
JG
6 */
7
8#define _LGPL_SOURCE
9#include <lttng/trigger/trigger.h>
10#include <lttng/notification/channel-internal.h>
11#include <lttng/notification/notification-internal.h>
12#include <lttng/condition/condition-internal.h>
13#include <lttng/condition/buffer-usage-internal.h>
14#include <common/error.h>
15#include <common/config/session-config.h>
16#include <common/defaults.h>
17#include <common/utils.h>
ab0ee2ca
JG
18#include <common/align.h>
19#include <common/time.h>
ab0ee2ca
JG
20#include <sys/stat.h>
21#include <time.h>
22#include <signal.h>
23
24#include "notification-thread.h"
25#include "notification-thread-events.h"
26#include "notification-thread-commands.h"
27#include "lttng-sessiond.h"
28#include "health-sessiond.h"
c8a9de5a 29#include "thread.h"
ab0ee2ca 30
94078603
JR
31#include "kernel.h"
32#include <common/kernel-ctl/kernel-ctl.h>
33
ab0ee2ca
JG
34#include <urcu.h>
35#include <urcu/list.h>
36#include <urcu/rculfhash.h>
37
ab0ee2ca
JG
38/*
39 * Destroy the thread data previously created by the init function.
40 */
41void notification_thread_handle_destroy(
42 struct notification_thread_handle *handle)
43{
44 int ret;
ab0ee2ca
JG
45
46 if (!handle) {
47 goto end;
48 }
49
8ada111f 50 assert(cds_list_empty(&handle->cmd_queue.list));
ab0ee2ca 51 pthread_mutex_destroy(&handle->cmd_queue.lock);
c8a9de5a 52 sem_destroy(&handle->ready);
ab0ee2ca 53
814b4934
JR
54 if (handle->cmd_queue.event_pipe) {
55 lttng_pipe_destroy(handle->cmd_queue.event_pipe);
56 }
ab0ee2ca
JG
57 if (handle->channel_monitoring_pipes.ust32_consumer >= 0) {
58 ret = close(handle->channel_monitoring_pipes.ust32_consumer);
59 if (ret) {
60 PERROR("close 32-bit consumer channel monitoring pipe");
61 }
62 }
63 if (handle->channel_monitoring_pipes.ust64_consumer >= 0) {
64 ret = close(handle->channel_monitoring_pipes.ust64_consumer);
65 if (ret) {
66 PERROR("close 64-bit consumer channel monitoring pipe");
67 }
68 }
69 if (handle->channel_monitoring_pipes.kernel_consumer >= 0) {
70 ret = close(handle->channel_monitoring_pipes.kernel_consumer);
71 if (ret) {
72 PERROR("close kernel consumer channel monitoring pipe");
73 }
74 }
94078603 75
ab0ee2ca
JG
76end:
77 free(handle);
78}
79
80struct notification_thread_handle *notification_thread_handle_create(
81 struct lttng_pipe *ust32_channel_monitor_pipe,
82 struct lttng_pipe *ust64_channel_monitor_pipe,
c8a9de5a 83 struct lttng_pipe *kernel_channel_monitor_pipe)
ab0ee2ca
JG
84{
85 int ret;
86 struct notification_thread_handle *handle;
814b4934 87 struct lttng_pipe *event_pipe = NULL;
ab0ee2ca
JG
88
89 handle = zmalloc(sizeof(*handle));
90 if (!handle) {
91 goto end;
92 }
93
c8a9de5a
JG
94 sem_init(&handle->ready, 0, 0);
95
18d08850 96 event_pipe = lttng_pipe_open(FD_CLOEXEC);
814b4934
JR
97 if (!event_pipe) {
98 ERR("event_pipe creation");
ab0ee2ca
JG
99 goto error;
100 }
814b4934
JR
101
102 handle->cmd_queue.event_pipe = event_pipe;
103 event_pipe = NULL;
104
ab0ee2ca
JG
105 CDS_INIT_LIST_HEAD(&handle->cmd_queue.list);
106 ret = pthread_mutex_init(&handle->cmd_queue.lock, NULL);
107 if (ret) {
108 goto error;
109 }
110
111 if (ust32_channel_monitor_pipe) {
112 handle->channel_monitoring_pipes.ust32_consumer =
113 lttng_pipe_release_readfd(
114 ust32_channel_monitor_pipe);
115 if (handle->channel_monitoring_pipes.ust32_consumer < 0) {
116 goto error;
117 }
118 } else {
119 handle->channel_monitoring_pipes.ust32_consumer = -1;
120 }
121 if (ust64_channel_monitor_pipe) {
122 handle->channel_monitoring_pipes.ust64_consumer =
123 lttng_pipe_release_readfd(
124 ust64_channel_monitor_pipe);
125 if (handle->channel_monitoring_pipes.ust64_consumer < 0) {
126 goto error;
127 }
128 } else {
129 handle->channel_monitoring_pipes.ust64_consumer = -1;
130 }
131 if (kernel_channel_monitor_pipe) {
132 handle->channel_monitoring_pipes.kernel_consumer =
133 lttng_pipe_release_readfd(
134 kernel_channel_monitor_pipe);
135 if (handle->channel_monitoring_pipes.kernel_consumer < 0) {
136 goto error;
137 }
138 } else {
139 handle->channel_monitoring_pipes.kernel_consumer = -1;
140 }
d02d7404 141
ab0ee2ca
JG
142end:
143 return handle;
144error:
814b4934 145 lttng_pipe_destroy(event_pipe);
ab0ee2ca
JG
146 notification_thread_handle_destroy(handle);
147 return NULL;
148}
149
150static
151char *get_notification_channel_sock_path(void)
152{
153 int ret;
154 bool is_root = !getuid();
155 char *sock_path;
156
157 sock_path = zmalloc(LTTNG_PATH_MAX);
158 if (!sock_path) {
159 goto error;
160 }
161
162 if (is_root) {
163 ret = snprintf(sock_path, LTTNG_PATH_MAX,
164 DEFAULT_GLOBAL_NOTIFICATION_CHANNEL_UNIX_SOCK);
165 if (ret < 0) {
166 goto error;
167 }
168 } else {
4f00620d 169 const char *home_path = utils_get_home_dir();
ab0ee2ca
JG
170
171 if (!home_path) {
172 ERR("Can't get HOME directory for socket creation");
173 goto error;
174 }
175
176 ret = snprintf(sock_path, LTTNG_PATH_MAX,
177 DEFAULT_HOME_NOTIFICATION_CHANNEL_UNIX_SOCK,
178 home_path);
179 if (ret < 0) {
180 goto error;
181 }
182 }
183
184 return sock_path;
185error:
186 free(sock_path);
187 return NULL;
188}
189
190static
191void notification_channel_socket_destroy(int fd)
192{
193 int ret;
194 char *sock_path = get_notification_channel_sock_path();
195
196 DBG("[notification-thread] Destroying notification channel socket");
197
198 if (sock_path) {
199 ret = unlink(sock_path);
200 free(sock_path);
201 if (ret < 0) {
202 PERROR("unlink notification channel socket");
203 }
204 }
205
206 ret = close(fd);
207 if (ret) {
208 PERROR("close notification channel socket");
209 }
210}
211
212static
213int notification_channel_socket_create(void)
214{
215 int fd = -1, ret;
216 char *sock_path = get_notification_channel_sock_path();
217
218 DBG("[notification-thread] Creating notification channel UNIX socket at %s",
219 sock_path);
220
221 ret = lttcomm_create_unix_sock(sock_path);
222 if (ret < 0) {
223 ERR("[notification-thread] Failed to create notification socket");
224 goto error;
225 }
226 fd = ret;
227
228 ret = chmod(sock_path, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
229 if (ret < 0) {
230 ERR("Set file permissions failed: %s", sock_path);
231 PERROR("chmod notification channel socket");
232 goto error;
233 }
234
235 if (getuid() == 0) {
28ab59d0
JR
236 gid_t gid;
237
238 ret = utils_get_group_id(config.tracing_group_name.value, true,
239 &gid);
240 if (ret) {
241 /* Default to root group. */
242 gid = 0;
243 }
244
245 ret = chown(sock_path, 0, gid);
ab0ee2ca
JG
246 if (ret) {
247 ERR("Failed to set the notification channel socket's group");
248 ret = -1;
249 goto error;
250 }
251 }
252
253 DBG("[notification-thread] Notification channel UNIX socket created (fd = %i)",
254 fd);
255 free(sock_path);
256 return fd;
257error:
258 if (fd >= 0 && close(fd) < 0) {
259 PERROR("close notification channel socket");
260 }
261 free(sock_path);
262 return ret;
263}
264
265static
266int init_poll_set(struct lttng_poll_event *poll_set,
267 struct notification_thread_handle *handle,
268 int notification_channel_socket)
269{
270 int ret;
271
272 /*
273 * Create pollset with size 5:
274 * - notification channel socket (listen for new connections),
275 * - command queue event fd (internal sessiond commands),
276 * - consumerd (32-bit user space) channel monitor pipe,
277 * - consumerd (64-bit user space) channel monitor pipe,
278 * - consumerd (kernel) channel monitor pipe.
279 */
280 ret = lttng_poll_create(poll_set, 5, LTTNG_CLOEXEC);
281 if (ret < 0) {
282 goto end;
283 }
284
285 ret = lttng_poll_add(poll_set, notification_channel_socket,
286 LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP);
287 if (ret < 0) {
288 ERR("[notification-thread] Failed to add notification channel socket to pollset");
289 goto error;
290 }
814b4934 291 ret = lttng_poll_add(poll_set, lttng_pipe_get_readfd(handle->cmd_queue.event_pipe),
ab0ee2ca
JG
292 LPOLLIN | LPOLLERR);
293 if (ret < 0) {
294 ERR("[notification-thread] Failed to add notification command queue event fd to pollset");
295 goto error;
296 }
297 ret = lttng_poll_add(poll_set,
298 handle->channel_monitoring_pipes.ust32_consumer,
299 LPOLLIN | LPOLLERR);
300 if (ret < 0) {
301 ERR("[notification-thread] Failed to add ust-32 channel monitoring pipe fd to pollset");
302 goto error;
303 }
304 ret = lttng_poll_add(poll_set,
305 handle->channel_monitoring_pipes.ust64_consumer,
306 LPOLLIN | LPOLLERR);
307 if (ret < 0) {
308 ERR("[notification-thread] Failed to add ust-64 channel monitoring pipe fd to pollset");
309 goto error;
310 }
311 if (handle->channel_monitoring_pipes.kernel_consumer < 0) {
312 goto end;
313 }
314 ret = lttng_poll_add(poll_set,
315 handle->channel_monitoring_pipes.kernel_consumer,
316 LPOLLIN | LPOLLERR);
317 if (ret < 0) {
318 ERR("[notification-thread] Failed to add kernel channel monitoring pipe fd to pollset");
319 goto error;
320 }
321end:
322 return ret;
323error:
324 lttng_poll_clean(poll_set);
325 return ret;
326}
327
328static
329void fini_thread_state(struct notification_thread_state *state)
330{
331 int ret;
332
333 if (state->client_socket_ht) {
334 ret = handle_notification_thread_client_disconnect_all(state);
335 assert(!ret);
336 ret = cds_lfht_destroy(state->client_socket_ht, NULL);
337 assert(!ret);
338 }
ac1889bf
JG
339 if (state->client_id_ht) {
340 ret = cds_lfht_destroy(state->client_id_ht, NULL);
341 assert(!ret);
342 }
ab0ee2ca
JG
343 if (state->triggers_ht) {
344 ret = handle_notification_thread_trigger_unregister_all(state);
345 assert(!ret);
346 ret = cds_lfht_destroy(state->triggers_ht, NULL);
347 assert(!ret);
348 }
349 if (state->channel_triggers_ht) {
350 ret = cds_lfht_destroy(state->channel_triggers_ht, NULL);
351 assert(!ret);
352 }
353 if (state->channel_state_ht) {
354 ret = cds_lfht_destroy(state->channel_state_ht, NULL);
355 assert(!ret);
356 }
357 if (state->notification_trigger_clients_ht) {
358 ret = cds_lfht_destroy(state->notification_trigger_clients_ht,
359 NULL);
360 assert(!ret);
361 }
362 if (state->channels_ht) {
8abe313a
JG
363 ret = cds_lfht_destroy(state->channels_ht, NULL);
364 assert(!ret);
365 }
366 if (state->sessions_ht) {
367 ret = cds_lfht_destroy(state->sessions_ht, NULL);
ab0ee2ca
JG
368 assert(!ret);
369 }
242388e4
JR
370 if (state->triggers_by_name_uid_ht) {
371 ret = cds_lfht_destroy(state->triggers_by_name_uid_ht, NULL);
372 assert(!ret);
373 }
e7c93cf9
JR
374 if (state->trigger_tokens_ht) {
375 ret = cds_lfht_destroy(state->trigger_tokens_ht, NULL);
376 assert(!ret);
377 }
ea9a44f0
JG
378 /*
379 * Must be destroyed after all channels have been destroyed.
380 * See comment in struct lttng_session_trigger_list.
381 */
382 if (state->session_triggers_ht) {
383 ret = cds_lfht_destroy(state->session_triggers_ht, NULL);
384 assert(!ret);
385 }
ab0ee2ca
JG
386 if (state->notification_channel_socket >= 0) {
387 notification_channel_socket_destroy(
388 state->notification_channel_socket);
389 }
d02d7404
JR
390
391 assert(cds_list_empty(&state->tracer_event_sources_list));
392
f2b3ef9f
JG
393 if (state->executor) {
394 action_executor_destroy(state->executor);
395 }
ab0ee2ca
JG
396 lttng_poll_clean(&state->events);
397}
398
c8a9de5a
JG
399static
400void mark_thread_as_ready(struct notification_thread_handle *handle)
401{
402 DBG("Marking notification thread as ready");
403 sem_post(&handle->ready);
404}
405
406static
407void wait_until_thread_is_ready(struct notification_thread_handle *handle)
408{
409 DBG("Waiting for notification thread to be ready");
410 sem_wait(&handle->ready);
411 DBG("Notification thread is ready");
412}
413
ab0ee2ca
JG
414static
415int init_thread_state(struct notification_thread_handle *handle,
416 struct notification_thread_state *state)
417{
418 int ret;
419
420 memset(state, 0, sizeof(*state));
421 state->notification_channel_socket = -1;
e6887944 422 state->trigger_id.next_tracer_token = 1;
ab0ee2ca
JG
423 lttng_poll_init(&state->events);
424
425 ret = notification_channel_socket_create();
426 if (ret < 0) {
427 goto end;
428 }
429 state->notification_channel_socket = ret;
430
431 ret = init_poll_set(&state->events, handle,
432 state->notification_channel_socket);
433 if (ret) {
434 goto end;
435 }
436
437 DBG("[notification-thread] Listening on notification channel socket");
438 ret = lttcomm_listen_unix_sock(state->notification_channel_socket);
439 if (ret < 0) {
440 ERR("[notification-thread] Listen failed on notification channel socket");
441 goto error;
442 }
443
444 state->client_socket_ht = cds_lfht_new(DEFAULT_HT_SIZE, 1, 0,
445 CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
446 if (!state->client_socket_ht) {
447 goto error;
448 }
449
ac1889bf
JG
450 state->client_id_ht = cds_lfht_new(DEFAULT_HT_SIZE, 1, 0,
451 CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
452 if (!state->client_id_ht) {
453 goto error;
454 }
455
ab0ee2ca
JG
456 state->channel_triggers_ht = cds_lfht_new(DEFAULT_HT_SIZE, 1, 0,
457 CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
458 if (!state->channel_triggers_ht) {
459 goto error;
460 }
461
ea9a44f0
JG
462 state->session_triggers_ht = cds_lfht_new(DEFAULT_HT_SIZE, 1, 0,
463 CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
464 if (!state->session_triggers_ht) {
465 goto error;
466 }
467
ab0ee2ca
JG
468 state->channel_state_ht = cds_lfht_new(DEFAULT_HT_SIZE, 1, 0,
469 CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
470 if (!state->channel_state_ht) {
471 goto error;
472 }
473
474 state->notification_trigger_clients_ht = cds_lfht_new(DEFAULT_HT_SIZE,
475 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
476 if (!state->notification_trigger_clients_ht) {
477 goto error;
478 }
479
480 state->channels_ht = cds_lfht_new(DEFAULT_HT_SIZE,
481 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
482 if (!state->channels_ht) {
483 goto error;
484 }
8abe313a
JG
485 state->sessions_ht = cds_lfht_new(DEFAULT_HT_SIZE,
486 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
487 if (!state->sessions_ht) {
488 goto error;
489 }
ab0ee2ca
JG
490 state->triggers_ht = cds_lfht_new(DEFAULT_HT_SIZE,
491 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
492 if (!state->triggers_ht) {
493 goto error;
f2b3ef9f 494 }
242388e4
JR
495 state->triggers_by_name_uid_ht = cds_lfht_new(DEFAULT_HT_SIZE,
496 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
497 if (!state->triggers_by_name_uid_ht) {
498 goto error;
499 }
f2b3ef9f 500
e7c93cf9
JR
501 state->trigger_tokens_ht = cds_lfht_new(DEFAULT_HT_SIZE,
502 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
503 if (!state->trigger_tokens_ht) {
504 goto error;
505 }
506
d02d7404
JR
507 CDS_INIT_LIST_HEAD(&state->tracer_event_sources_list);
508
f2b3ef9f
JG
509 state->executor = action_executor_create(handle);
510 if (!state->executor) {
511 goto error;
ab0ee2ca 512 }
8b524060
FD
513
514 state->restart_poll = false;
515
c8a9de5a 516 mark_thread_as_ready(handle);
ab0ee2ca
JG
517end:
518 return 0;
519error:
520 fini_thread_state(state);
521 return -1;
522}
523
524static
525int handle_channel_monitoring_pipe(int fd, uint32_t revents,
526 struct notification_thread_handle *handle,
527 struct notification_thread_state *state)
528{
529 int ret = 0;
530 enum lttng_domain_type domain;
531
532 if (fd == handle->channel_monitoring_pipes.ust32_consumer ||
533 fd == handle->channel_monitoring_pipes.ust64_consumer) {
534 domain = LTTNG_DOMAIN_UST;
535 } else if (fd == handle->channel_monitoring_pipes.kernel_consumer) {
536 domain = LTTNG_DOMAIN_KERNEL;
537 } else {
538 abort();
539 }
540
541 if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
542 ret = lttng_poll_del(&state->events, fd);
543 if (ret) {
544 ERR("[notification-thread] Failed to remove consumer monitoring pipe from poll set");
545 }
546 goto end;
547 }
548
549 ret = handle_notification_thread_channel_sample(
550 state, fd, domain);
551 if (ret) {
4149ace8 552 ERR("[notification-thread] Consumer sample handling error occurred");
ab0ee2ca
JG
553 ret = -1;
554 goto end;
555 }
556end:
557 return ret;
558}
559
94078603
JR
560static int handle_event_notification_pipe(int event_source_fd,
561 enum lttng_domain_type domain,
562 uint32_t revents,
563 struct notification_thread_state *state)
564{
565 int ret = 0;
566
567 if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
568 ret = handle_notification_thread_remove_tracer_event_source_no_result(
569 state, event_source_fd);
570 if (ret) {
571 ERR("[notification-thread] Failed to remove event notification pipe from poll set: fd = %d",
572 event_source_fd);
573 }
574 goto end;
575 }
576
577 ret = handle_notification_thread_event_notification(
578 state, event_source_fd, domain);
579 if (ret) {
580 ERR("[notification-thread] Event notification handling error occurred for fd: %d",
581 event_source_fd);
582 ret = -1;
583 goto end;
584 }
585end:
586 return ret;
587}
588
589/*
590 * Return the event source domain type via parameter.
591 */
592static bool fd_is_event_notification_source(const struct notification_thread_state *state,
593 int fd,
594 enum lttng_domain_type *domain)
595{
596 struct notification_event_tracer_event_source_element *source_element;
597
598 assert(domain);
599
600 cds_list_for_each_entry(source_element,
601 &state->tracer_event_sources_list, node) {
602 if (source_element->fd != fd) {
603 continue;
604 }
605
606 *domain = source_element->domain;
607 return true;
608 }
609
610 return false;
611}
612
ab0ee2ca
JG
613/*
614 * This thread services notification channel clients and commands received
615 * from various lttng-sessiond components over a command queue.
616 */
c8a9de5a 617static
ab0ee2ca
JG
618void *thread_notification(void *data)
619{
620 int ret;
621 struct notification_thread_handle *handle = data;
622 struct notification_thread_state state;
94078603 623 enum lttng_domain_type domain;
ab0ee2ca
JG
624
625 DBG("[notification-thread] Started notification thread");
626
f620cc28
JG
627 health_register(health_sessiond, HEALTH_SESSIOND_TYPE_NOTIFICATION);
628 rcu_register_thread();
629 rcu_thread_online();
630
ab0ee2ca
JG
631 if (!handle) {
632 ERR("[notification-thread] Invalid thread context provided");
633 goto end;
634 }
635
ab0ee2ca
JG
636 health_code_update();
637
638 ret = init_thread_state(handle, &state);
639 if (ret) {
640 goto end;
641 }
642
ab0ee2ca
JG
643 while (true) {
644 int fd_count, i;
645
646 health_poll_entry();
647 DBG("[notification-thread] Entering poll wait");
648 ret = lttng_poll_wait(&state.events, -1);
649 DBG("[notification-thread] Poll wait returned (%i)", ret);
650 health_poll_exit();
651 if (ret < 0) {
652 /*
653 * Restart interrupted system call.
654 */
655 if (errno == EINTR) {
656 continue;
657 }
658 ERR("[notification-thread] Error encountered during lttng_poll_wait (%i)", ret);
659 goto error;
660 }
661
8b524060
FD
662 /*
663 * Reset restart_poll flag so that calls below might turn it
664 * on.
665 */
666 state.restart_poll = false;
667
ab0ee2ca
JG
668 fd_count = ret;
669 for (i = 0; i < fd_count; i++) {
670 int fd = LTTNG_POLL_GETFD(&state.events, i);
671 uint32_t revents = LTTNG_POLL_GETEV(&state.events, i);
672
673 DBG("[notification-thread] Handling fd (%i) activity (%u)", fd, revents);
674
675 if (fd == state.notification_channel_socket) {
676 if (revents & LPOLLIN) {
677 ret = handle_notification_thread_client_connect(
678 &state);
679 if (ret < 0) {
680 goto error;
681 }
682 } else if (revents &
683 (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
684 ERR("[notification-thread] Notification socket poll error");
685 goto error;
686 } else {
687 ERR("[notification-thread] Unexpected poll events %u for notification socket %i", revents, fd);
688 goto error;
689 }
814b4934 690 } else if (fd == lttng_pipe_get_readfd(handle->cmd_queue.event_pipe)) {
ab0ee2ca
JG
691 ret = handle_notification_thread_command(handle,
692 &state);
693 if (ret < 0) {
694 DBG("[notification-thread] Error encountered while servicing command queue");
695 goto error;
696 } else if (ret > 0) {
697 goto exit;
698 }
699 } else if (fd == handle->channel_monitoring_pipes.ust32_consumer ||
700 fd == handle->channel_monitoring_pipes.ust64_consumer ||
701 fd == handle->channel_monitoring_pipes.kernel_consumer) {
702 ret = handle_channel_monitoring_pipe(fd,
703 revents, handle, &state);
704 if (ret) {
705 goto error;
706 }
94078603
JR
707 } else if (fd_is_event_notification_source(&state, fd, &domain)) {
708 ret = handle_event_notification_pipe(fd, domain, revents, &state);
709 if (ret) {
710 goto error;
711 }
ab0ee2ca
JG
712 } else {
713 /* Activity on a client's socket. */
714 if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
715 /*
716 * It doesn't matter if a command was
717 * pending on the client socket at this
718 * point since it now has no way to
719 * receive the notifications to which
720 * it was subscribing or unsubscribing.
721 */
722 ret = handle_notification_thread_client_disconnect(
723 fd, &state);
724 if (ret) {
725 goto error;
726 }
727 } else {
728 if (revents & LPOLLIN) {
729 ret = handle_notification_thread_client_in(
730 &state, fd);
731 if (ret) {
732 goto error;
733 }
734 }
735
736 if (revents & LPOLLOUT) {
737 ret = handle_notification_thread_client_out(
738 &state, fd);
739 if (ret) {
740 goto error;
741 }
742 }
743 }
744 }
8b524060
FD
745
746 /*
747 * Calls above might have changed the state of the
748 * FDs in `state.events`. Call _poll_wait() again to
749 * ensure we have a consistent state.
750 */
751 if (state.restart_poll) {
752 break;
753 }
ab0ee2ca
JG
754 }
755 }
756exit:
757error:
758 fini_thread_state(&state);
f620cc28 759end:
ab0ee2ca
JG
760 rcu_thread_offline();
761 rcu_unregister_thread();
f620cc28 762 health_unregister(health_sessiond);
ab0ee2ca
JG
763 return NULL;
764}
c8a9de5a
JG
765
766static
767bool shutdown_notification_thread(void *thread_data)
768{
769 struct notification_thread_handle *handle = thread_data;
770
771 notification_thread_command_quit(handle);
772 return true;
773}
774
4a91420c
JG
775struct lttng_thread *launch_notification_thread(
776 struct notification_thread_handle *handle)
c8a9de5a
JG
777{
778 struct lttng_thread *thread;
779
780 thread = lttng_thread_create("Notification",
781 thread_notification,
782 shutdown_notification_thread,
783 NULL,
784 handle);
785 if (!thread) {
786 goto error;
787 }
788
789 /*
790 * Wait for the thread to be marked as "ready" before returning
791 * as other subsystems depend on the notification subsystem
792 * (e.g. rotation thread).
793 */
794 wait_until_thread_is_ready(handle);
4a91420c 795 return thread;
c8a9de5a 796error:
4a91420c 797 return NULL;
c8a9de5a 798}
This page took 0.076692 seconds and 4 git commands to generate.