sessiond: notification: receive incoming notifications from tracers
[lttng-tools.git] / src / bin / lttng-sessiond / notification-thread.c
CommitLineData
ab0ee2ca 1/*
ab5be9fa 2 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
ab0ee2ca 3 *
ab5be9fa 4 * SPDX-License-Identifier: GPL-2.0-only
ab0ee2ca 5 *
ab0ee2ca
JG
6 */
7
8#define _LGPL_SOURCE
9#include <lttng/trigger/trigger.h>
10#include <lttng/notification/channel-internal.h>
11#include <lttng/notification/notification-internal.h>
12#include <lttng/condition/condition-internal.h>
13#include <lttng/condition/buffer-usage-internal.h>
14#include <common/error.h>
15#include <common/config/session-config.h>
16#include <common/defaults.h>
17#include <common/utils.h>
ab0ee2ca
JG
18#include <common/align.h>
19#include <common/time.h>
ab0ee2ca
JG
20#include <sys/stat.h>
21#include <time.h>
22#include <signal.h>
23
24#include "notification-thread.h"
25#include "notification-thread-events.h"
26#include "notification-thread-commands.h"
27#include "lttng-sessiond.h"
28#include "health-sessiond.h"
c8a9de5a 29#include "thread.h"
ab0ee2ca 30
94078603
JR
31#include "kernel.h"
32#include <common/kernel-ctl/kernel-ctl.h>
33
ab0ee2ca
JG
34#include <urcu.h>
35#include <urcu/list.h>
36#include <urcu/rculfhash.h>
37
ab0ee2ca
JG
38/*
39 * Destroy the thread data previously created by the init function.
40 */
41void notification_thread_handle_destroy(
42 struct notification_thread_handle *handle)
43{
44 int ret;
ab0ee2ca
JG
45
46 if (!handle) {
47 goto end;
48 }
49
8ada111f 50 assert(cds_list_empty(&handle->cmd_queue.list));
ab0ee2ca 51 pthread_mutex_destroy(&handle->cmd_queue.lock);
c8a9de5a 52 sem_destroy(&handle->ready);
ab0ee2ca 53
814b4934
JR
54 if (handle->cmd_queue.event_pipe) {
55 lttng_pipe_destroy(handle->cmd_queue.event_pipe);
56 }
ab0ee2ca
JG
57 if (handle->channel_monitoring_pipes.ust32_consumer >= 0) {
58 ret = close(handle->channel_monitoring_pipes.ust32_consumer);
59 if (ret) {
60 PERROR("close 32-bit consumer channel monitoring pipe");
61 }
62 }
63 if (handle->channel_monitoring_pipes.ust64_consumer >= 0) {
64 ret = close(handle->channel_monitoring_pipes.ust64_consumer);
65 if (ret) {
66 PERROR("close 64-bit consumer channel monitoring pipe");
67 }
68 }
69 if (handle->channel_monitoring_pipes.kernel_consumer >= 0) {
70 ret = close(handle->channel_monitoring_pipes.kernel_consumer);
71 if (ret) {
72 PERROR("close kernel consumer channel monitoring pipe");
73 }
74 }
94078603 75
ab0ee2ca
JG
76end:
77 free(handle);
78}
79
80struct notification_thread_handle *notification_thread_handle_create(
81 struct lttng_pipe *ust32_channel_monitor_pipe,
82 struct lttng_pipe *ust64_channel_monitor_pipe,
c8a9de5a 83 struct lttng_pipe *kernel_channel_monitor_pipe)
ab0ee2ca
JG
84{
85 int ret;
86 struct notification_thread_handle *handle;
814b4934 87 struct lttng_pipe *event_pipe = NULL;
ab0ee2ca
JG
88
89 handle = zmalloc(sizeof(*handle));
90 if (!handle) {
91 goto end;
92 }
93
c8a9de5a
JG
94 sem_init(&handle->ready, 0, 0);
95
18d08850 96 event_pipe = lttng_pipe_open(FD_CLOEXEC);
814b4934
JR
97 if (!event_pipe) {
98 ERR("event_pipe creation");
ab0ee2ca
JG
99 goto error;
100 }
814b4934
JR
101
102 handle->cmd_queue.event_pipe = event_pipe;
103 event_pipe = NULL;
104
ab0ee2ca
JG
105 CDS_INIT_LIST_HEAD(&handle->cmd_queue.list);
106 ret = pthread_mutex_init(&handle->cmd_queue.lock, NULL);
107 if (ret) {
108 goto error;
109 }
110
111 if (ust32_channel_monitor_pipe) {
112 handle->channel_monitoring_pipes.ust32_consumer =
113 lttng_pipe_release_readfd(
114 ust32_channel_monitor_pipe);
115 if (handle->channel_monitoring_pipes.ust32_consumer < 0) {
116 goto error;
117 }
118 } else {
119 handle->channel_monitoring_pipes.ust32_consumer = -1;
120 }
121 if (ust64_channel_monitor_pipe) {
122 handle->channel_monitoring_pipes.ust64_consumer =
123 lttng_pipe_release_readfd(
124 ust64_channel_monitor_pipe);
125 if (handle->channel_monitoring_pipes.ust64_consumer < 0) {
126 goto error;
127 }
128 } else {
129 handle->channel_monitoring_pipes.ust64_consumer = -1;
130 }
131 if (kernel_channel_monitor_pipe) {
132 handle->channel_monitoring_pipes.kernel_consumer =
133 lttng_pipe_release_readfd(
134 kernel_channel_monitor_pipe);
135 if (handle->channel_monitoring_pipes.kernel_consumer < 0) {
136 goto error;
137 }
138 } else {
139 handle->channel_monitoring_pipes.kernel_consumer = -1;
140 }
d02d7404 141
ab0ee2ca
JG
142end:
143 return handle;
144error:
814b4934 145 lttng_pipe_destroy(event_pipe);
ab0ee2ca
JG
146 notification_thread_handle_destroy(handle);
147 return NULL;
148}
149
150static
151char *get_notification_channel_sock_path(void)
152{
153 int ret;
154 bool is_root = !getuid();
155 char *sock_path;
156
157 sock_path = zmalloc(LTTNG_PATH_MAX);
158 if (!sock_path) {
159 goto error;
160 }
161
162 if (is_root) {
163 ret = snprintf(sock_path, LTTNG_PATH_MAX,
164 DEFAULT_GLOBAL_NOTIFICATION_CHANNEL_UNIX_SOCK);
165 if (ret < 0) {
166 goto error;
167 }
168 } else {
4f00620d 169 const char *home_path = utils_get_home_dir();
ab0ee2ca
JG
170
171 if (!home_path) {
172 ERR("Can't get HOME directory for socket creation");
173 goto error;
174 }
175
176 ret = snprintf(sock_path, LTTNG_PATH_MAX,
177 DEFAULT_HOME_NOTIFICATION_CHANNEL_UNIX_SOCK,
178 home_path);
179 if (ret < 0) {
180 goto error;
181 }
182 }
183
184 return sock_path;
185error:
186 free(sock_path);
187 return NULL;
188}
189
190static
191void notification_channel_socket_destroy(int fd)
192{
193 int ret;
194 char *sock_path = get_notification_channel_sock_path();
195
196 DBG("[notification-thread] Destroying notification channel socket");
197
198 if (sock_path) {
199 ret = unlink(sock_path);
200 free(sock_path);
201 if (ret < 0) {
202 PERROR("unlink notification channel socket");
203 }
204 }
205
206 ret = close(fd);
207 if (ret) {
208 PERROR("close notification channel socket");
209 }
210}
211
212static
213int notification_channel_socket_create(void)
214{
215 int fd = -1, ret;
216 char *sock_path = get_notification_channel_sock_path();
217
218 DBG("[notification-thread] Creating notification channel UNIX socket at %s",
219 sock_path);
220
221 ret = lttcomm_create_unix_sock(sock_path);
222 if (ret < 0) {
223 ERR("[notification-thread] Failed to create notification socket");
224 goto error;
225 }
226 fd = ret;
227
228 ret = chmod(sock_path, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
229 if (ret < 0) {
230 ERR("Set file permissions failed: %s", sock_path);
231 PERROR("chmod notification channel socket");
232 goto error;
233 }
234
235 if (getuid() == 0) {
28ab59d0
JR
236 gid_t gid;
237
238 ret = utils_get_group_id(config.tracing_group_name.value, true,
239 &gid);
240 if (ret) {
241 /* Default to root group. */
242 gid = 0;
243 }
244
245 ret = chown(sock_path, 0, gid);
ab0ee2ca
JG
246 if (ret) {
247 ERR("Failed to set the notification channel socket's group");
248 ret = -1;
249 goto error;
250 }
251 }
252
253 DBG("[notification-thread] Notification channel UNIX socket created (fd = %i)",
254 fd);
255 free(sock_path);
256 return fd;
257error:
258 if (fd >= 0 && close(fd) < 0) {
259 PERROR("close notification channel socket");
260 }
261 free(sock_path);
262 return ret;
263}
264
265static
266int init_poll_set(struct lttng_poll_event *poll_set,
267 struct notification_thread_handle *handle,
268 int notification_channel_socket)
269{
270 int ret;
271
272 /*
273 * Create pollset with size 5:
274 * - notification channel socket (listen for new connections),
275 * - command queue event fd (internal sessiond commands),
276 * - consumerd (32-bit user space) channel monitor pipe,
277 * - consumerd (64-bit user space) channel monitor pipe,
278 * - consumerd (kernel) channel monitor pipe.
279 */
280 ret = lttng_poll_create(poll_set, 5, LTTNG_CLOEXEC);
281 if (ret < 0) {
282 goto end;
283 }
284
285 ret = lttng_poll_add(poll_set, notification_channel_socket,
286 LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP);
287 if (ret < 0) {
288 ERR("[notification-thread] Failed to add notification channel socket to pollset");
289 goto error;
290 }
814b4934 291 ret = lttng_poll_add(poll_set, lttng_pipe_get_readfd(handle->cmd_queue.event_pipe),
ab0ee2ca
JG
292 LPOLLIN | LPOLLERR);
293 if (ret < 0) {
294 ERR("[notification-thread] Failed to add notification command queue event fd to pollset");
295 goto error;
296 }
297 ret = lttng_poll_add(poll_set,
298 handle->channel_monitoring_pipes.ust32_consumer,
299 LPOLLIN | LPOLLERR);
300 if (ret < 0) {
301 ERR("[notification-thread] Failed to add ust-32 channel monitoring pipe fd to pollset");
302 goto error;
303 }
304 ret = lttng_poll_add(poll_set,
305 handle->channel_monitoring_pipes.ust64_consumer,
306 LPOLLIN | LPOLLERR);
307 if (ret < 0) {
308 ERR("[notification-thread] Failed to add ust-64 channel monitoring pipe fd to pollset");
309 goto error;
310 }
311 if (handle->channel_monitoring_pipes.kernel_consumer < 0) {
312 goto end;
313 }
314 ret = lttng_poll_add(poll_set,
315 handle->channel_monitoring_pipes.kernel_consumer,
316 LPOLLIN | LPOLLERR);
317 if (ret < 0) {
318 ERR("[notification-thread] Failed to add kernel channel monitoring pipe fd to pollset");
319 goto error;
320 }
321end:
322 return ret;
323error:
324 lttng_poll_clean(poll_set);
325 return ret;
326}
327
328static
329void fini_thread_state(struct notification_thread_state *state)
330{
331 int ret;
332
333 if (state->client_socket_ht) {
334 ret = handle_notification_thread_client_disconnect_all(state);
335 assert(!ret);
336 ret = cds_lfht_destroy(state->client_socket_ht, NULL);
337 assert(!ret);
338 }
ac1889bf
JG
339 if (state->client_id_ht) {
340 ret = cds_lfht_destroy(state->client_id_ht, NULL);
341 assert(!ret);
342 }
ab0ee2ca
JG
343 if (state->triggers_ht) {
344 ret = handle_notification_thread_trigger_unregister_all(state);
345 assert(!ret);
346 ret = cds_lfht_destroy(state->triggers_ht, NULL);
347 assert(!ret);
348 }
349 if (state->channel_triggers_ht) {
350 ret = cds_lfht_destroy(state->channel_triggers_ht, NULL);
351 assert(!ret);
352 }
353 if (state->channel_state_ht) {
354 ret = cds_lfht_destroy(state->channel_state_ht, NULL);
355 assert(!ret);
356 }
357 if (state->notification_trigger_clients_ht) {
358 ret = cds_lfht_destroy(state->notification_trigger_clients_ht,
359 NULL);
360 assert(!ret);
361 }
362 if (state->channels_ht) {
8abe313a
JG
363 ret = cds_lfht_destroy(state->channels_ht, NULL);
364 assert(!ret);
365 }
366 if (state->sessions_ht) {
367 ret = cds_lfht_destroy(state->sessions_ht, NULL);
ab0ee2ca
JG
368 assert(!ret);
369 }
242388e4
JR
370 if (state->triggers_by_name_uid_ht) {
371 ret = cds_lfht_destroy(state->triggers_by_name_uid_ht, NULL);
372 assert(!ret);
373 }
e7c93cf9
JR
374 if (state->trigger_tokens_ht) {
375 ret = cds_lfht_destroy(state->trigger_tokens_ht, NULL);
376 assert(!ret);
377 }
ea9a44f0
JG
378 /*
379 * Must be destroyed after all channels have been destroyed.
380 * See comment in struct lttng_session_trigger_list.
381 */
382 if (state->session_triggers_ht) {
383 ret = cds_lfht_destroy(state->session_triggers_ht, NULL);
384 assert(!ret);
385 }
ab0ee2ca
JG
386 if (state->notification_channel_socket >= 0) {
387 notification_channel_socket_destroy(
388 state->notification_channel_socket);
389 }
d02d7404
JR
390
391 assert(cds_list_empty(&state->tracer_event_sources_list));
392
f2b3ef9f
JG
393 if (state->executor) {
394 action_executor_destroy(state->executor);
395 }
ab0ee2ca
JG
396 lttng_poll_clean(&state->events);
397}
398
c8a9de5a
JG
399static
400void mark_thread_as_ready(struct notification_thread_handle *handle)
401{
402 DBG("Marking notification thread as ready");
403 sem_post(&handle->ready);
404}
405
406static
407void wait_until_thread_is_ready(struct notification_thread_handle *handle)
408{
409 DBG("Waiting for notification thread to be ready");
410 sem_wait(&handle->ready);
411 DBG("Notification thread is ready");
412}
413
ab0ee2ca
JG
414static
415int init_thread_state(struct notification_thread_handle *handle,
416 struct notification_thread_state *state)
417{
418 int ret;
419
420 memset(state, 0, sizeof(*state));
421 state->notification_channel_socket = -1;
e6887944 422 state->trigger_id.next_tracer_token = 1;
ab0ee2ca
JG
423 lttng_poll_init(&state->events);
424
425 ret = notification_channel_socket_create();
426 if (ret < 0) {
427 goto end;
428 }
429 state->notification_channel_socket = ret;
430
431 ret = init_poll_set(&state->events, handle,
432 state->notification_channel_socket);
433 if (ret) {
434 goto end;
435 }
436
437 DBG("[notification-thread] Listening on notification channel socket");
438 ret = lttcomm_listen_unix_sock(state->notification_channel_socket);
439 if (ret < 0) {
440 ERR("[notification-thread] Listen failed on notification channel socket");
441 goto error;
442 }
443
444 state->client_socket_ht = cds_lfht_new(DEFAULT_HT_SIZE, 1, 0,
445 CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
446 if (!state->client_socket_ht) {
447 goto error;
448 }
449
ac1889bf
JG
450 state->client_id_ht = cds_lfht_new(DEFAULT_HT_SIZE, 1, 0,
451 CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
452 if (!state->client_id_ht) {
453 goto error;
454 }
455
ab0ee2ca
JG
456 state->channel_triggers_ht = cds_lfht_new(DEFAULT_HT_SIZE, 1, 0,
457 CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
458 if (!state->channel_triggers_ht) {
459 goto error;
460 }
461
ea9a44f0
JG
462 state->session_triggers_ht = cds_lfht_new(DEFAULT_HT_SIZE, 1, 0,
463 CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
464 if (!state->session_triggers_ht) {
465 goto error;
466 }
467
ab0ee2ca
JG
468 state->channel_state_ht = cds_lfht_new(DEFAULT_HT_SIZE, 1, 0,
469 CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
470 if (!state->channel_state_ht) {
471 goto error;
472 }
473
474 state->notification_trigger_clients_ht = cds_lfht_new(DEFAULT_HT_SIZE,
475 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
476 if (!state->notification_trigger_clients_ht) {
477 goto error;
478 }
479
480 state->channels_ht = cds_lfht_new(DEFAULT_HT_SIZE,
481 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
482 if (!state->channels_ht) {
483 goto error;
484 }
8abe313a
JG
485 state->sessions_ht = cds_lfht_new(DEFAULT_HT_SIZE,
486 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
487 if (!state->sessions_ht) {
488 goto error;
489 }
ab0ee2ca
JG
490 state->triggers_ht = cds_lfht_new(DEFAULT_HT_SIZE,
491 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
492 if (!state->triggers_ht) {
493 goto error;
f2b3ef9f 494 }
242388e4
JR
495 state->triggers_by_name_uid_ht = cds_lfht_new(DEFAULT_HT_SIZE,
496 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
497 if (!state->triggers_by_name_uid_ht) {
498 goto error;
499 }
f2b3ef9f 500
e7c93cf9
JR
501 state->trigger_tokens_ht = cds_lfht_new(DEFAULT_HT_SIZE,
502 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
503 if (!state->trigger_tokens_ht) {
504 goto error;
505 }
506
d02d7404
JR
507 CDS_INIT_LIST_HEAD(&state->tracer_event_sources_list);
508
f2b3ef9f
JG
509 state->executor = action_executor_create(handle);
510 if (!state->executor) {
511 goto error;
ab0ee2ca 512 }
c8a9de5a 513 mark_thread_as_ready(handle);
ab0ee2ca
JG
514end:
515 return 0;
516error:
517 fini_thread_state(state);
518 return -1;
519}
520
521static
522int handle_channel_monitoring_pipe(int fd, uint32_t revents,
523 struct notification_thread_handle *handle,
524 struct notification_thread_state *state)
525{
526 int ret = 0;
527 enum lttng_domain_type domain;
528
529 if (fd == handle->channel_monitoring_pipes.ust32_consumer ||
530 fd == handle->channel_monitoring_pipes.ust64_consumer) {
531 domain = LTTNG_DOMAIN_UST;
532 } else if (fd == handle->channel_monitoring_pipes.kernel_consumer) {
533 domain = LTTNG_DOMAIN_KERNEL;
534 } else {
535 abort();
536 }
537
538 if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
539 ret = lttng_poll_del(&state->events, fd);
540 if (ret) {
541 ERR("[notification-thread] Failed to remove consumer monitoring pipe from poll set");
542 }
543 goto end;
544 }
545
546 ret = handle_notification_thread_channel_sample(
547 state, fd, domain);
548 if (ret) {
4149ace8 549 ERR("[notification-thread] Consumer sample handling error occurred");
ab0ee2ca
JG
550 ret = -1;
551 goto end;
552 }
553end:
554 return ret;
555}
556
94078603
JR
557static int handle_event_notification_pipe(int event_source_fd,
558 enum lttng_domain_type domain,
559 uint32_t revents,
560 struct notification_thread_state *state)
561{
562 int ret = 0;
563
564 if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
565 ret = handle_notification_thread_remove_tracer_event_source_no_result(
566 state, event_source_fd);
567 if (ret) {
568 ERR("[notification-thread] Failed to remove event notification pipe from poll set: fd = %d",
569 event_source_fd);
570 }
571 goto end;
572 }
573
574 ret = handle_notification_thread_event_notification(
575 state, event_source_fd, domain);
576 if (ret) {
577 ERR("[notification-thread] Event notification handling error occurred for fd: %d",
578 event_source_fd);
579 ret = -1;
580 goto end;
581 }
582end:
583 return ret;
584}
585
586/*
587 * Return the event source domain type via parameter.
588 */
589static bool fd_is_event_notification_source(const struct notification_thread_state *state,
590 int fd,
591 enum lttng_domain_type *domain)
592{
593 struct notification_event_tracer_event_source_element *source_element;
594
595 assert(domain);
596
597 cds_list_for_each_entry(source_element,
598 &state->tracer_event_sources_list, node) {
599 if (source_element->fd != fd) {
600 continue;
601 }
602
603 *domain = source_element->domain;
604 return true;
605 }
606
607 return false;
608}
609
ab0ee2ca
JG
610/*
611 * This thread services notification channel clients and commands received
612 * from various lttng-sessiond components over a command queue.
613 */
c8a9de5a 614static
ab0ee2ca
JG
615void *thread_notification(void *data)
616{
617 int ret;
618 struct notification_thread_handle *handle = data;
619 struct notification_thread_state state;
94078603 620 enum lttng_domain_type domain;
ab0ee2ca
JG
621
622 DBG("[notification-thread] Started notification thread");
623
f620cc28
JG
624 health_register(health_sessiond, HEALTH_SESSIOND_TYPE_NOTIFICATION);
625 rcu_register_thread();
626 rcu_thread_online();
627
ab0ee2ca
JG
628 if (!handle) {
629 ERR("[notification-thread] Invalid thread context provided");
630 goto end;
631 }
632
ab0ee2ca
JG
633 health_code_update();
634
635 ret = init_thread_state(handle, &state);
636 if (ret) {
637 goto end;
638 }
639
ab0ee2ca
JG
640 while (true) {
641 int fd_count, i;
642
643 health_poll_entry();
644 DBG("[notification-thread] Entering poll wait");
645 ret = lttng_poll_wait(&state.events, -1);
646 DBG("[notification-thread] Poll wait returned (%i)", ret);
647 health_poll_exit();
648 if (ret < 0) {
649 /*
650 * Restart interrupted system call.
651 */
652 if (errno == EINTR) {
653 continue;
654 }
655 ERR("[notification-thread] Error encountered during lttng_poll_wait (%i)", ret);
656 goto error;
657 }
658
659 fd_count = ret;
660 for (i = 0; i < fd_count; i++) {
661 int fd = LTTNG_POLL_GETFD(&state.events, i);
662 uint32_t revents = LTTNG_POLL_GETEV(&state.events, i);
663
664 DBG("[notification-thread] Handling fd (%i) activity (%u)", fd, revents);
665
666 if (fd == state.notification_channel_socket) {
667 if (revents & LPOLLIN) {
668 ret = handle_notification_thread_client_connect(
669 &state);
670 if (ret < 0) {
671 goto error;
672 }
673 } else if (revents &
674 (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
675 ERR("[notification-thread] Notification socket poll error");
676 goto error;
677 } else {
678 ERR("[notification-thread] Unexpected poll events %u for notification socket %i", revents, fd);
679 goto error;
680 }
814b4934 681 } else if (fd == lttng_pipe_get_readfd(handle->cmd_queue.event_pipe)) {
ab0ee2ca
JG
682 ret = handle_notification_thread_command(handle,
683 &state);
684 if (ret < 0) {
685 DBG("[notification-thread] Error encountered while servicing command queue");
686 goto error;
687 } else if (ret > 0) {
688 goto exit;
689 }
690 } else if (fd == handle->channel_monitoring_pipes.ust32_consumer ||
691 fd == handle->channel_monitoring_pipes.ust64_consumer ||
692 fd == handle->channel_monitoring_pipes.kernel_consumer) {
693 ret = handle_channel_monitoring_pipe(fd,
694 revents, handle, &state);
695 if (ret) {
696 goto error;
697 }
94078603
JR
698 } else if (fd_is_event_notification_source(&state, fd, &domain)) {
699 ret = handle_event_notification_pipe(fd, domain, revents, &state);
700 if (ret) {
701 goto error;
702 }
ab0ee2ca
JG
703 } else {
704 /* Activity on a client's socket. */
705 if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
706 /*
707 * It doesn't matter if a command was
708 * pending on the client socket at this
709 * point since it now has no way to
710 * receive the notifications to which
711 * it was subscribing or unsubscribing.
712 */
713 ret = handle_notification_thread_client_disconnect(
714 fd, &state);
715 if (ret) {
716 goto error;
717 }
718 } else {
719 if (revents & LPOLLIN) {
720 ret = handle_notification_thread_client_in(
721 &state, fd);
722 if (ret) {
723 goto error;
724 }
725 }
726
727 if (revents & LPOLLOUT) {
728 ret = handle_notification_thread_client_out(
729 &state, fd);
730 if (ret) {
731 goto error;
732 }
733 }
734 }
735 }
736 }
737 }
738exit:
739error:
740 fini_thread_state(&state);
f620cc28 741end:
ab0ee2ca
JG
742 rcu_thread_offline();
743 rcu_unregister_thread();
f620cc28 744 health_unregister(health_sessiond);
ab0ee2ca
JG
745 return NULL;
746}
c8a9de5a
JG
747
748static
749bool shutdown_notification_thread(void *thread_data)
750{
751 struct notification_thread_handle *handle = thread_data;
752
753 notification_thread_command_quit(handle);
754 return true;
755}
756
4a91420c
JG
757struct lttng_thread *launch_notification_thread(
758 struct notification_thread_handle *handle)
c8a9de5a
JG
759{
760 struct lttng_thread *thread;
761
762 thread = lttng_thread_create("Notification",
763 thread_notification,
764 shutdown_notification_thread,
765 NULL,
766 handle);
767 if (!thread) {
768 goto error;
769 }
770
771 /*
772 * Wait for the thread to be marked as "ready" before returning
773 * as other subsystems depend on the notification subsystem
774 * (e.g. rotation thread).
775 */
776 wait_until_thread_is_ready(handle);
4a91420c 777 return thread;
c8a9de5a 778error:
4a91420c 779 return NULL;
c8a9de5a 780}
This page took 0.067474 seconds and 4 git commands to generate.