Tests: fix: leak of notification-client arguments
[lttng-tools.git] / src / bin / lttng-sessiond / notification-thread.c
CommitLineData
ab0ee2ca 1/*
ab5be9fa 2 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
ab0ee2ca 3 *
ab5be9fa 4 * SPDX-License-Identifier: GPL-2.0-only
ab0ee2ca 5 *
ab0ee2ca
JG
6 */
7
8#define _LGPL_SOURCE
9#include <lttng/trigger/trigger.h>
10#include <lttng/notification/channel-internal.h>
11#include <lttng/notification/notification-internal.h>
12#include <lttng/condition/condition-internal.h>
13#include <lttng/condition/buffer-usage-internal.h>
14#include <common/error.h>
15#include <common/config/session-config.h>
16#include <common/defaults.h>
17#include <common/utils.h>
ab0ee2ca
JG
18#include <common/align.h>
19#include <common/time.h>
ab0ee2ca
JG
20#include <sys/stat.h>
21#include <time.h>
22#include <signal.h>
23
24#include "notification-thread.h"
25#include "notification-thread-events.h"
26#include "notification-thread-commands.h"
27#include "lttng-sessiond.h"
28#include "health-sessiond.h"
c8a9de5a 29#include "thread.h"
38eb8a68 30#include "testpoint.h"
ab0ee2ca 31
94078603
JR
32#include "kernel.h"
33#include <common/kernel-ctl/kernel-ctl.h>
34
ab0ee2ca
JG
35#include <urcu.h>
36#include <urcu/list.h>
37#include <urcu/rculfhash.h>
38
38eb8a68
FD
39
40int notifier_consumption_paused;
ab0ee2ca
JG
41/*
42 * Destroy the thread data previously created by the init function.
43 */
44void notification_thread_handle_destroy(
45 struct notification_thread_handle *handle)
46{
47 int ret;
ab0ee2ca
JG
48
49 if (!handle) {
50 goto end;
51 }
52
8ada111f 53 assert(cds_list_empty(&handle->cmd_queue.list));
ab0ee2ca 54 pthread_mutex_destroy(&handle->cmd_queue.lock);
c8a9de5a 55 sem_destroy(&handle->ready);
ab0ee2ca 56
814b4934
JR
57 if (handle->cmd_queue.event_pipe) {
58 lttng_pipe_destroy(handle->cmd_queue.event_pipe);
59 }
ab0ee2ca
JG
60 if (handle->channel_monitoring_pipes.ust32_consumer >= 0) {
61 ret = close(handle->channel_monitoring_pipes.ust32_consumer);
62 if (ret) {
63 PERROR("close 32-bit consumer channel monitoring pipe");
64 }
65 }
66 if (handle->channel_monitoring_pipes.ust64_consumer >= 0) {
67 ret = close(handle->channel_monitoring_pipes.ust64_consumer);
68 if (ret) {
69 PERROR("close 64-bit consumer channel monitoring pipe");
70 }
71 }
72 if (handle->channel_monitoring_pipes.kernel_consumer >= 0) {
73 ret = close(handle->channel_monitoring_pipes.kernel_consumer);
74 if (ret) {
75 PERROR("close kernel consumer channel monitoring pipe");
76 }
77 }
94078603 78
ab0ee2ca
JG
79end:
80 free(handle);
81}
82
83struct notification_thread_handle *notification_thread_handle_create(
84 struct lttng_pipe *ust32_channel_monitor_pipe,
85 struct lttng_pipe *ust64_channel_monitor_pipe,
c8a9de5a 86 struct lttng_pipe *kernel_channel_monitor_pipe)
ab0ee2ca
JG
87{
88 int ret;
89 struct notification_thread_handle *handle;
814b4934 90 struct lttng_pipe *event_pipe = NULL;
ab0ee2ca
JG
91
92 handle = zmalloc(sizeof(*handle));
93 if (!handle) {
94 goto end;
95 }
96
c8a9de5a
JG
97 sem_init(&handle->ready, 0, 0);
98
18d08850 99 event_pipe = lttng_pipe_open(FD_CLOEXEC);
814b4934
JR
100 if (!event_pipe) {
101 ERR("event_pipe creation");
ab0ee2ca
JG
102 goto error;
103 }
814b4934
JR
104
105 handle->cmd_queue.event_pipe = event_pipe;
106 event_pipe = NULL;
107
ab0ee2ca
JG
108 CDS_INIT_LIST_HEAD(&handle->cmd_queue.list);
109 ret = pthread_mutex_init(&handle->cmd_queue.lock, NULL);
110 if (ret) {
111 goto error;
112 }
113
114 if (ust32_channel_monitor_pipe) {
115 handle->channel_monitoring_pipes.ust32_consumer =
116 lttng_pipe_release_readfd(
117 ust32_channel_monitor_pipe);
118 if (handle->channel_monitoring_pipes.ust32_consumer < 0) {
119 goto error;
120 }
121 } else {
122 handle->channel_monitoring_pipes.ust32_consumer = -1;
123 }
124 if (ust64_channel_monitor_pipe) {
125 handle->channel_monitoring_pipes.ust64_consumer =
126 lttng_pipe_release_readfd(
127 ust64_channel_monitor_pipe);
128 if (handle->channel_monitoring_pipes.ust64_consumer < 0) {
129 goto error;
130 }
131 } else {
132 handle->channel_monitoring_pipes.ust64_consumer = -1;
133 }
134 if (kernel_channel_monitor_pipe) {
135 handle->channel_monitoring_pipes.kernel_consumer =
136 lttng_pipe_release_readfd(
137 kernel_channel_monitor_pipe);
138 if (handle->channel_monitoring_pipes.kernel_consumer < 0) {
139 goto error;
140 }
141 } else {
142 handle->channel_monitoring_pipes.kernel_consumer = -1;
143 }
d02d7404 144
ab0ee2ca
JG
145end:
146 return handle;
147error:
814b4934 148 lttng_pipe_destroy(event_pipe);
ab0ee2ca
JG
149 notification_thread_handle_destroy(handle);
150 return NULL;
151}
152
153static
154char *get_notification_channel_sock_path(void)
155{
156 int ret;
157 bool is_root = !getuid();
158 char *sock_path;
159
160 sock_path = zmalloc(LTTNG_PATH_MAX);
161 if (!sock_path) {
162 goto error;
163 }
164
165 if (is_root) {
166 ret = snprintf(sock_path, LTTNG_PATH_MAX,
167 DEFAULT_GLOBAL_NOTIFICATION_CHANNEL_UNIX_SOCK);
168 if (ret < 0) {
169 goto error;
170 }
171 } else {
4f00620d 172 const char *home_path = utils_get_home_dir();
ab0ee2ca
JG
173
174 if (!home_path) {
175 ERR("Can't get HOME directory for socket creation");
176 goto error;
177 }
178
179 ret = snprintf(sock_path, LTTNG_PATH_MAX,
180 DEFAULT_HOME_NOTIFICATION_CHANNEL_UNIX_SOCK,
181 home_path);
182 if (ret < 0) {
183 goto error;
184 }
185 }
186
187 return sock_path;
188error:
189 free(sock_path);
190 return NULL;
191}
192
193static
194void notification_channel_socket_destroy(int fd)
195{
196 int ret;
197 char *sock_path = get_notification_channel_sock_path();
198
199 DBG("[notification-thread] Destroying notification channel socket");
200
201 if (sock_path) {
202 ret = unlink(sock_path);
203 free(sock_path);
204 if (ret < 0) {
205 PERROR("unlink notification channel socket");
206 }
207 }
208
209 ret = close(fd);
210 if (ret) {
211 PERROR("close notification channel socket");
212 }
213}
214
215static
216int notification_channel_socket_create(void)
217{
218 int fd = -1, ret;
219 char *sock_path = get_notification_channel_sock_path();
220
221 DBG("[notification-thread] Creating notification channel UNIX socket at %s",
222 sock_path);
223
224 ret = lttcomm_create_unix_sock(sock_path);
225 if (ret < 0) {
226 ERR("[notification-thread] Failed to create notification socket");
227 goto error;
228 }
229 fd = ret;
230
231 ret = chmod(sock_path, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
232 if (ret < 0) {
233 ERR("Set file permissions failed: %s", sock_path);
234 PERROR("chmod notification channel socket");
235 goto error;
236 }
237
238 if (getuid() == 0) {
28ab59d0
JR
239 gid_t gid;
240
412d7227
SM
241 ret = utils_get_group_id(the_config.tracing_group_name.value,
242 true, &gid);
28ab59d0
JR
243 if (ret) {
244 /* Default to root group. */
245 gid = 0;
246 }
247
248 ret = chown(sock_path, 0, gid);
ab0ee2ca
JG
249 if (ret) {
250 ERR("Failed to set the notification channel socket's group");
251 ret = -1;
252 goto error;
253 }
254 }
255
256 DBG("[notification-thread] Notification channel UNIX socket created (fd = %i)",
257 fd);
258 free(sock_path);
259 return fd;
260error:
261 if (fd >= 0 && close(fd) < 0) {
262 PERROR("close notification channel socket");
263 }
264 free(sock_path);
265 return ret;
266}
267
268static
269int init_poll_set(struct lttng_poll_event *poll_set,
270 struct notification_thread_handle *handle,
271 int notification_channel_socket)
272{
273 int ret;
274
275 /*
276 * Create pollset with size 5:
277 * - notification channel socket (listen for new connections),
278 * - command queue event fd (internal sessiond commands),
279 * - consumerd (32-bit user space) channel monitor pipe,
280 * - consumerd (64-bit user space) channel monitor pipe,
281 * - consumerd (kernel) channel monitor pipe.
282 */
283 ret = lttng_poll_create(poll_set, 5, LTTNG_CLOEXEC);
284 if (ret < 0) {
285 goto end;
286 }
287
288 ret = lttng_poll_add(poll_set, notification_channel_socket,
289 LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP);
290 if (ret < 0) {
291 ERR("[notification-thread] Failed to add notification channel socket to pollset");
292 goto error;
293 }
814b4934 294 ret = lttng_poll_add(poll_set, lttng_pipe_get_readfd(handle->cmd_queue.event_pipe),
ab0ee2ca
JG
295 LPOLLIN | LPOLLERR);
296 if (ret < 0) {
297 ERR("[notification-thread] Failed to add notification command queue event fd to pollset");
298 goto error;
299 }
300 ret = lttng_poll_add(poll_set,
301 handle->channel_monitoring_pipes.ust32_consumer,
302 LPOLLIN | LPOLLERR);
303 if (ret < 0) {
304 ERR("[notification-thread] Failed to add ust-32 channel monitoring pipe fd to pollset");
305 goto error;
306 }
307 ret = lttng_poll_add(poll_set,
308 handle->channel_monitoring_pipes.ust64_consumer,
309 LPOLLIN | LPOLLERR);
310 if (ret < 0) {
311 ERR("[notification-thread] Failed to add ust-64 channel monitoring pipe fd to pollset");
312 goto error;
313 }
314 if (handle->channel_monitoring_pipes.kernel_consumer < 0) {
315 goto end;
316 }
317 ret = lttng_poll_add(poll_set,
318 handle->channel_monitoring_pipes.kernel_consumer,
319 LPOLLIN | LPOLLERR);
320 if (ret < 0) {
321 ERR("[notification-thread] Failed to add kernel channel monitoring pipe fd to pollset");
322 goto error;
323 }
324end:
325 return ret;
326error:
327 lttng_poll_clean(poll_set);
328 return ret;
329}
330
331static
332void fini_thread_state(struct notification_thread_state *state)
333{
334 int ret;
335
336 if (state->client_socket_ht) {
337 ret = handle_notification_thread_client_disconnect_all(state);
338 assert(!ret);
339 ret = cds_lfht_destroy(state->client_socket_ht, NULL);
340 assert(!ret);
341 }
ac1889bf
JG
342 if (state->client_id_ht) {
343 ret = cds_lfht_destroy(state->client_id_ht, NULL);
344 assert(!ret);
345 }
ab0ee2ca
JG
346 if (state->triggers_ht) {
347 ret = handle_notification_thread_trigger_unregister_all(state);
348 assert(!ret);
349 ret = cds_lfht_destroy(state->triggers_ht, NULL);
350 assert(!ret);
351 }
352 if (state->channel_triggers_ht) {
353 ret = cds_lfht_destroy(state->channel_triggers_ht, NULL);
354 assert(!ret);
355 }
356 if (state->channel_state_ht) {
357 ret = cds_lfht_destroy(state->channel_state_ht, NULL);
358 assert(!ret);
359 }
360 if (state->notification_trigger_clients_ht) {
361 ret = cds_lfht_destroy(state->notification_trigger_clients_ht,
362 NULL);
363 assert(!ret);
364 }
365 if (state->channels_ht) {
8abe313a
JG
366 ret = cds_lfht_destroy(state->channels_ht, NULL);
367 assert(!ret);
368 }
369 if (state->sessions_ht) {
370 ret = cds_lfht_destroy(state->sessions_ht, NULL);
ab0ee2ca
JG
371 assert(!ret);
372 }
242388e4
JR
373 if (state->triggers_by_name_uid_ht) {
374 ret = cds_lfht_destroy(state->triggers_by_name_uid_ht, NULL);
375 assert(!ret);
376 }
e7c93cf9
JR
377 if (state->trigger_tokens_ht) {
378 ret = cds_lfht_destroy(state->trigger_tokens_ht, NULL);
379 assert(!ret);
380 }
ea9a44f0
JG
381 /*
382 * Must be destroyed after all channels have been destroyed.
383 * See comment in struct lttng_session_trigger_list.
384 */
385 if (state->session_triggers_ht) {
386 ret = cds_lfht_destroy(state->session_triggers_ht, NULL);
387 assert(!ret);
388 }
ab0ee2ca
JG
389 if (state->notification_channel_socket >= 0) {
390 notification_channel_socket_destroy(
391 state->notification_channel_socket);
392 }
d02d7404
JR
393
394 assert(cds_list_empty(&state->tracer_event_sources_list));
395
f2b3ef9f
JG
396 if (state->executor) {
397 action_executor_destroy(state->executor);
398 }
ab0ee2ca
JG
399 lttng_poll_clean(&state->events);
400}
401
c8a9de5a
JG
402static
403void mark_thread_as_ready(struct notification_thread_handle *handle)
404{
405 DBG("Marking notification thread as ready");
406 sem_post(&handle->ready);
407}
408
409static
410void wait_until_thread_is_ready(struct notification_thread_handle *handle)
411{
412 DBG("Waiting for notification thread to be ready");
413 sem_wait(&handle->ready);
414 DBG("Notification thread is ready");
415}
416
ab0ee2ca
JG
417static
418int init_thread_state(struct notification_thread_handle *handle,
419 struct notification_thread_state *state)
420{
421 int ret;
422
423 memset(state, 0, sizeof(*state));
424 state->notification_channel_socket = -1;
e6887944 425 state->trigger_id.next_tracer_token = 1;
ab0ee2ca
JG
426 lttng_poll_init(&state->events);
427
428 ret = notification_channel_socket_create();
429 if (ret < 0) {
430 goto end;
431 }
432 state->notification_channel_socket = ret;
433
434 ret = init_poll_set(&state->events, handle,
435 state->notification_channel_socket);
436 if (ret) {
437 goto end;
438 }
439
440 DBG("[notification-thread] Listening on notification channel socket");
441 ret = lttcomm_listen_unix_sock(state->notification_channel_socket);
442 if (ret < 0) {
443 ERR("[notification-thread] Listen failed on notification channel socket");
444 goto error;
445 }
446
447 state->client_socket_ht = cds_lfht_new(DEFAULT_HT_SIZE, 1, 0,
448 CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
449 if (!state->client_socket_ht) {
450 goto error;
451 }
452
ac1889bf
JG
453 state->client_id_ht = cds_lfht_new(DEFAULT_HT_SIZE, 1, 0,
454 CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
455 if (!state->client_id_ht) {
456 goto error;
457 }
458
ab0ee2ca
JG
459 state->channel_triggers_ht = cds_lfht_new(DEFAULT_HT_SIZE, 1, 0,
460 CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
461 if (!state->channel_triggers_ht) {
462 goto error;
463 }
464
ea9a44f0
JG
465 state->session_triggers_ht = cds_lfht_new(DEFAULT_HT_SIZE, 1, 0,
466 CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
467 if (!state->session_triggers_ht) {
468 goto error;
469 }
470
ab0ee2ca
JG
471 state->channel_state_ht = cds_lfht_new(DEFAULT_HT_SIZE, 1, 0,
472 CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
473 if (!state->channel_state_ht) {
474 goto error;
475 }
476
477 state->notification_trigger_clients_ht = cds_lfht_new(DEFAULT_HT_SIZE,
478 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
479 if (!state->notification_trigger_clients_ht) {
480 goto error;
481 }
482
483 state->channels_ht = cds_lfht_new(DEFAULT_HT_SIZE,
484 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
485 if (!state->channels_ht) {
486 goto error;
487 }
8abe313a
JG
488 state->sessions_ht = cds_lfht_new(DEFAULT_HT_SIZE,
489 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
490 if (!state->sessions_ht) {
491 goto error;
492 }
ab0ee2ca
JG
493 state->triggers_ht = cds_lfht_new(DEFAULT_HT_SIZE,
494 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
495 if (!state->triggers_ht) {
496 goto error;
f2b3ef9f 497 }
242388e4
JR
498 state->triggers_by_name_uid_ht = cds_lfht_new(DEFAULT_HT_SIZE,
499 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
500 if (!state->triggers_by_name_uid_ht) {
501 goto error;
502 }
f2b3ef9f 503
e7c93cf9
JR
504 state->trigger_tokens_ht = cds_lfht_new(DEFAULT_HT_SIZE,
505 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
506 if (!state->trigger_tokens_ht) {
507 goto error;
508 }
509
d02d7404
JR
510 CDS_INIT_LIST_HEAD(&state->tracer_event_sources_list);
511
f2b3ef9f
JG
512 state->executor = action_executor_create(handle);
513 if (!state->executor) {
514 goto error;
ab0ee2ca 515 }
8b524060
FD
516
517 state->restart_poll = false;
518
c8a9de5a 519 mark_thread_as_ready(handle);
ab0ee2ca
JG
520end:
521 return 0;
522error:
523 fini_thread_state(state);
524 return -1;
525}
526
527static
528int handle_channel_monitoring_pipe(int fd, uint32_t revents,
529 struct notification_thread_handle *handle,
530 struct notification_thread_state *state)
531{
532 int ret = 0;
533 enum lttng_domain_type domain;
534
535 if (fd == handle->channel_monitoring_pipes.ust32_consumer ||
536 fd == handle->channel_monitoring_pipes.ust64_consumer) {
537 domain = LTTNG_DOMAIN_UST;
538 } else if (fd == handle->channel_monitoring_pipes.kernel_consumer) {
539 domain = LTTNG_DOMAIN_KERNEL;
540 } else {
541 abort();
542 }
543
544 if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
545 ret = lttng_poll_del(&state->events, fd);
546 if (ret) {
547 ERR("[notification-thread] Failed to remove consumer monitoring pipe from poll set");
548 }
549 goto end;
550 }
551
552 ret = handle_notification_thread_channel_sample(
553 state, fd, domain);
554 if (ret) {
4149ace8 555 ERR("[notification-thread] Consumer sample handling error occurred");
ab0ee2ca
JG
556 ret = -1;
557 goto end;
558 }
559end:
560 return ret;
561}
562
94078603
JR
563static int handle_event_notification_pipe(int event_source_fd,
564 enum lttng_domain_type domain,
565 uint32_t revents,
566 struct notification_thread_state *state)
567{
568 int ret = 0;
569
570 if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
571 ret = handle_notification_thread_remove_tracer_event_source_no_result(
572 state, event_source_fd);
573 if (ret) {
574 ERR("[notification-thread] Failed to remove event notification pipe from poll set: fd = %d",
575 event_source_fd);
576 }
577 goto end;
578 }
579
38eb8a68
FD
580 if (testpoint(sessiond_handle_notifier_event_pipe)) {
581 ret = 0;
582 goto end;
583 }
584
585 if (caa_unlikely(notifier_consumption_paused)) {
586 DBG("Event notifier notification consumption paused, sleeping...");
587 sleep(1);
588 goto end;
589 }
590
94078603
JR
591 ret = handle_notification_thread_event_notification(
592 state, event_source_fd, domain);
593 if (ret) {
594 ERR("[notification-thread] Event notification handling error occurred for fd: %d",
595 event_source_fd);
596 ret = -1;
597 goto end;
598 }
38eb8a68 599
94078603
JR
600end:
601 return ret;
602}
603
604/*
605 * Return the event source domain type via parameter.
606 */
607static bool fd_is_event_notification_source(const struct notification_thread_state *state,
608 int fd,
609 enum lttng_domain_type *domain)
610{
611 struct notification_event_tracer_event_source_element *source_element;
612
613 assert(domain);
614
615 cds_list_for_each_entry(source_element,
616 &state->tracer_event_sources_list, node) {
617 if (source_element->fd != fd) {
618 continue;
619 }
620
621 *domain = source_element->domain;
622 return true;
623 }
624
625 return false;
626}
627
ab0ee2ca
JG
628/*
629 * This thread services notification channel clients and commands received
630 * from various lttng-sessiond components over a command queue.
631 */
c8a9de5a 632static
ab0ee2ca
JG
633void *thread_notification(void *data)
634{
635 int ret;
636 struct notification_thread_handle *handle = data;
637 struct notification_thread_state state;
94078603 638 enum lttng_domain_type domain;
ab0ee2ca
JG
639
640 DBG("[notification-thread] Started notification thread");
641
412d7227 642 health_register(the_health_sessiond, HEALTH_SESSIOND_TYPE_NOTIFICATION);
f620cc28
JG
643 rcu_register_thread();
644 rcu_thread_online();
645
ab0ee2ca
JG
646 if (!handle) {
647 ERR("[notification-thread] Invalid thread context provided");
648 goto end;
649 }
650
ab0ee2ca
JG
651 health_code_update();
652
653 ret = init_thread_state(handle, &state);
654 if (ret) {
655 goto end;
656 }
657
38eb8a68
FD
658 if (testpoint(sessiond_thread_notification)) {
659 goto end;
660 }
661
ab0ee2ca
JG
662 while (true) {
663 int fd_count, i;
664
665 health_poll_entry();
666 DBG("[notification-thread] Entering poll wait");
667 ret = lttng_poll_wait(&state.events, -1);
668 DBG("[notification-thread] Poll wait returned (%i)", ret);
669 health_poll_exit();
670 if (ret < 0) {
671 /*
672 * Restart interrupted system call.
673 */
674 if (errno == EINTR) {
675 continue;
676 }
677 ERR("[notification-thread] Error encountered during lttng_poll_wait (%i)", ret);
678 goto error;
679 }
680
8b524060
FD
681 /*
682 * Reset restart_poll flag so that calls below might turn it
683 * on.
684 */
685 state.restart_poll = false;
686
ab0ee2ca
JG
687 fd_count = ret;
688 for (i = 0; i < fd_count; i++) {
689 int fd = LTTNG_POLL_GETFD(&state.events, i);
690 uint32_t revents = LTTNG_POLL_GETEV(&state.events, i);
691
692 DBG("[notification-thread] Handling fd (%i) activity (%u)", fd, revents);
693
694 if (fd == state.notification_channel_socket) {
695 if (revents & LPOLLIN) {
696 ret = handle_notification_thread_client_connect(
697 &state);
698 if (ret < 0) {
699 goto error;
700 }
701 } else if (revents &
702 (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
703 ERR("[notification-thread] Notification socket poll error");
704 goto error;
705 } else {
706 ERR("[notification-thread] Unexpected poll events %u for notification socket %i", revents, fd);
707 goto error;
708 }
814b4934 709 } else if (fd == lttng_pipe_get_readfd(handle->cmd_queue.event_pipe)) {
ab0ee2ca
JG
710 ret = handle_notification_thread_command(handle,
711 &state);
712 if (ret < 0) {
713 DBG("[notification-thread] Error encountered while servicing command queue");
714 goto error;
715 } else if (ret > 0) {
716 goto exit;
717 }
718 } else if (fd == handle->channel_monitoring_pipes.ust32_consumer ||
719 fd == handle->channel_monitoring_pipes.ust64_consumer ||
720 fd == handle->channel_monitoring_pipes.kernel_consumer) {
721 ret = handle_channel_monitoring_pipe(fd,
722 revents, handle, &state);
723 if (ret) {
724 goto error;
725 }
94078603
JR
726 } else if (fd_is_event_notification_source(&state, fd, &domain)) {
727 ret = handle_event_notification_pipe(fd, domain, revents, &state);
728 if (ret) {
729 goto error;
730 }
ab0ee2ca
JG
731 } else {
732 /* Activity on a client's socket. */
733 if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
734 /*
735 * It doesn't matter if a command was
736 * pending on the client socket at this
737 * point since it now has no way to
738 * receive the notifications to which
739 * it was subscribing or unsubscribing.
740 */
741 ret = handle_notification_thread_client_disconnect(
742 fd, &state);
743 if (ret) {
744 goto error;
745 }
746 } else {
747 if (revents & LPOLLIN) {
748 ret = handle_notification_thread_client_in(
749 &state, fd);
750 if (ret) {
751 goto error;
752 }
753 }
754
755 if (revents & LPOLLOUT) {
756 ret = handle_notification_thread_client_out(
757 &state, fd);
758 if (ret) {
759 goto error;
760 }
761 }
762 }
763 }
8b524060
FD
764
765 /*
766 * Calls above might have changed the state of the
767 * FDs in `state.events`. Call _poll_wait() again to
768 * ensure we have a consistent state.
769 */
770 if (state.restart_poll) {
771 break;
772 }
ab0ee2ca
JG
773 }
774 }
775exit:
776error:
777 fini_thread_state(&state);
f620cc28 778end:
ab0ee2ca
JG
779 rcu_thread_offline();
780 rcu_unregister_thread();
412d7227 781 health_unregister(the_health_sessiond);
ab0ee2ca
JG
782 return NULL;
783}
c8a9de5a
JG
784
785static
786bool shutdown_notification_thread(void *thread_data)
787{
788 struct notification_thread_handle *handle = thread_data;
789
790 notification_thread_command_quit(handle);
791 return true;
792}
793
4a91420c
JG
794struct lttng_thread *launch_notification_thread(
795 struct notification_thread_handle *handle)
c8a9de5a
JG
796{
797 struct lttng_thread *thread;
798
799 thread = lttng_thread_create("Notification",
800 thread_notification,
801 shutdown_notification_thread,
802 NULL,
803 handle);
804 if (!thread) {
805 goto error;
806 }
807
808 /*
809 * Wait for the thread to be marked as "ready" before returning
810 * as other subsystems depend on the notification subsystem
811 * (e.g. rotation thread).
812 */
813 wait_until_thread_is_ready(handle);
4a91420c 814 return thread;
c8a9de5a 815error:
4a91420c 816 return NULL;
c8a9de5a 817}
This page took 0.104797 seconds and 4 git commands to generate.