Remove fcntl wrapper
[lttng-tools.git] / src / bin / lttng-sessiond / notification-thread.cpp
CommitLineData
ab0ee2ca 1/*
ab5be9fa 2 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
ab0ee2ca 3 *
ab5be9fa 4 * SPDX-License-Identifier: GPL-2.0-only
ab0ee2ca 5 *
ab0ee2ca
JG
6 */
7
8#define _LGPL_SOURCE
c9e313bc 9#include "health-sessiond.hpp"
28ab034a
JG
10#include "kernel.hpp"
11#include "lttng-sessiond.hpp"
12#include "notification-thread-commands.hpp"
13#include "notification-thread-events.hpp"
14#include "notification-thread.hpp"
c9e313bc 15#include "testpoint.hpp"
28ab034a 16#include "thread.hpp"
ab0ee2ca 17
28ab034a
JG
18#include <common/align.hpp>
19#include <common/config/session-config.hpp>
20#include <common/defaults.hpp>
21#include <common/error.hpp>
c9e313bc 22#include <common/kernel-ctl/kernel-ctl.hpp>
28ab034a
JG
23#include <common/time.hpp>
24#include <common/utils.hpp>
94078603 25
28ab034a
JG
26#include <lttng/condition/buffer-usage-internal.hpp>
27#include <lttng/condition/condition-internal.hpp>
28#include <lttng/notification/channel-internal.hpp>
29#include <lttng/notification/notification-internal.hpp>
30#include <lttng/trigger/trigger.h>
31
32#include <signal.h>
33#include <sys/eventfd.h>
34#include <sys/stat.h>
35#include <time.h>
ab0ee2ca
JG
36#include <urcu.h>
37#include <urcu/list.h>
38#include <urcu/rculfhash.h>
39
4bd69c5f
SM
40/*
41 * Flag used to temporarily pause data consumption from testpoints.
42 *
43 * This variable is dlsym-ed from a test, so needs to be exported.
44 */
45LTTNG_EXPORT int notifier_consumption_paused;
38eb8a68 46
ab0ee2ca
JG
47/*
48 * Destroy the thread data previously created by the init function.
49 */
28ab034a 50void notification_thread_handle_destroy(struct notification_thread_handle *handle)
ab0ee2ca
JG
51{
52 int ret;
ab0ee2ca
JG
53
54 if (!handle) {
55 goto end;
56 }
57
a0377dfe 58 LTTNG_ASSERT(cds_list_empty(&handle->cmd_queue.list));
ab0ee2ca 59 pthread_mutex_destroy(&handle->cmd_queue.lock);
c8a9de5a 60 sem_destroy(&handle->ready);
ab0ee2ca 61
f370852f
JR
62 if (handle->cmd_queue.event_fd >= 0) {
63 ret = close(handle->cmd_queue.event_fd);
64 if (ret < 0) {
65 PERROR("Failed to close notification command queue event fd");
66 }
814b4934 67 }
ab0ee2ca
JG
68 if (handle->channel_monitoring_pipes.ust32_consumer >= 0) {
69 ret = close(handle->channel_monitoring_pipes.ust32_consumer);
70 if (ret) {
71 PERROR("close 32-bit consumer channel monitoring pipe");
72 }
73 }
74 if (handle->channel_monitoring_pipes.ust64_consumer >= 0) {
75 ret = close(handle->channel_monitoring_pipes.ust64_consumer);
76 if (ret) {
77 PERROR("close 64-bit consumer channel monitoring pipe");
78 }
79 }
80 if (handle->channel_monitoring_pipes.kernel_consumer >= 0) {
81 ret = close(handle->channel_monitoring_pipes.kernel_consumer);
82 if (ret) {
83 PERROR("close kernel consumer channel monitoring pipe");
84 }
85 }
94078603 86
ab0ee2ca
JG
87end:
88 free(handle);
89}
90
28ab034a
JG
91struct notification_thread_handle *
92notification_thread_handle_create(struct lttng_pipe *ust32_channel_monitor_pipe,
93 struct lttng_pipe *ust64_channel_monitor_pipe,
94 struct lttng_pipe *kernel_channel_monitor_pipe)
ab0ee2ca
JG
95{
96 int ret;
97 struct notification_thread_handle *handle;
f370852f 98 int event_fd = -1;
ab0ee2ca 99
64803277 100 handle = zmalloc<notification_thread_handle>();
ab0ee2ca
JG
101 if (!handle) {
102 goto end;
103 }
104
c8a9de5a
JG
105 sem_init(&handle->ready, 0, 0);
106
28ab034a 107 event_fd = eventfd(0, EFD_CLOEXEC | EFD_SEMAPHORE);
f370852f
JR
108 if (event_fd < 0) {
109 PERROR("event_fd creation");
ab0ee2ca
JG
110 goto error;
111 }
814b4934 112
f370852f 113 handle->cmd_queue.event_fd = event_fd;
814b4934 114
ab0ee2ca 115 CDS_INIT_LIST_HEAD(&handle->cmd_queue.list);
cd9adb8b 116 ret = pthread_mutex_init(&handle->cmd_queue.lock, nullptr);
ab0ee2ca
JG
117 if (ret) {
118 goto error;
119 }
120
121 if (ust32_channel_monitor_pipe) {
122 handle->channel_monitoring_pipes.ust32_consumer =
28ab034a 123 lttng_pipe_release_readfd(ust32_channel_monitor_pipe);
ab0ee2ca
JG
124 if (handle->channel_monitoring_pipes.ust32_consumer < 0) {
125 goto error;
126 }
127 } else {
128 handle->channel_monitoring_pipes.ust32_consumer = -1;
129 }
130 if (ust64_channel_monitor_pipe) {
131 handle->channel_monitoring_pipes.ust64_consumer =
28ab034a 132 lttng_pipe_release_readfd(ust64_channel_monitor_pipe);
ab0ee2ca
JG
133 if (handle->channel_monitoring_pipes.ust64_consumer < 0) {
134 goto error;
135 }
136 } else {
137 handle->channel_monitoring_pipes.ust64_consumer = -1;
138 }
139 if (kernel_channel_monitor_pipe) {
140 handle->channel_monitoring_pipes.kernel_consumer =
28ab034a 141 lttng_pipe_release_readfd(kernel_channel_monitor_pipe);
ab0ee2ca
JG
142 if (handle->channel_monitoring_pipes.kernel_consumer < 0) {
143 goto error;
144 }
145 } else {
146 handle->channel_monitoring_pipes.kernel_consumer = -1;
147 }
d02d7404 148
ab0ee2ca
JG
149end:
150 return handle;
151error:
152 notification_thread_handle_destroy(handle);
cd9adb8b 153 return nullptr;
ab0ee2ca
JG
154}
155
cd9adb8b 156static char *get_notification_channel_sock_path()
ab0ee2ca
JG
157{
158 int ret;
159 bool is_root = !getuid();
160 char *sock_path;
161
64803277 162 sock_path = calloc<char>(LTTNG_PATH_MAX);
ab0ee2ca
JG
163 if (!sock_path) {
164 goto error;
165 }
166
167 if (is_root) {
28ab034a
JG
168 ret = snprintf(
169 sock_path, LTTNG_PATH_MAX, DEFAULT_GLOBAL_NOTIFICATION_CHANNEL_UNIX_SOCK);
ab0ee2ca
JG
170 if (ret < 0) {
171 goto error;
172 }
173 } else {
4f00620d 174 const char *home_path = utils_get_home_dir();
ab0ee2ca
JG
175
176 if (!home_path) {
177 ERR("Can't get HOME directory for socket creation");
178 goto error;
179 }
180
28ab034a
JG
181 ret = snprintf(sock_path,
182 LTTNG_PATH_MAX,
183 DEFAULT_HOME_NOTIFICATION_CHANNEL_UNIX_SOCK,
184 home_path);
ab0ee2ca
JG
185 if (ret < 0) {
186 goto error;
187 }
188 }
189
190 return sock_path;
191error:
192 free(sock_path);
cd9adb8b 193 return nullptr;
ab0ee2ca
JG
194}
195
28ab034a 196static void notification_channel_socket_destroy(int fd)
ab0ee2ca
JG
197{
198 int ret;
199 char *sock_path = get_notification_channel_sock_path();
200
bd0514a5 201 DBG("Destroying notification channel socket");
ab0ee2ca
JG
202
203 if (sock_path) {
204 ret = unlink(sock_path);
205 free(sock_path);
206 if (ret < 0) {
207 PERROR("unlink notification channel socket");
208 }
209 }
210
211 ret = close(fd);
212 if (ret) {
213 PERROR("close notification channel socket");
214 }
215}
216
cd9adb8b 217static int notification_channel_socket_create()
ab0ee2ca
JG
218{
219 int fd = -1, ret;
220 char *sock_path = get_notification_channel_sock_path();
221
28ab034a 222 DBG("Creating notification channel UNIX socket at %s", sock_path);
ab0ee2ca
JG
223
224 ret = lttcomm_create_unix_sock(sock_path);
225 if (ret < 0) {
bd0514a5 226 ERR("Failed to create notification socket");
ab0ee2ca
JG
227 goto error;
228 }
229 fd = ret;
230
231 ret = chmod(sock_path, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
232 if (ret < 0) {
233 ERR("Set file permissions failed: %s", sock_path);
234 PERROR("chmod notification channel socket");
235 goto error;
236 }
237
238 if (getuid() == 0) {
28ab59d0
JR
239 gid_t gid;
240
28ab034a 241 ret = utils_get_group_id(the_config.tracing_group_name.value, true, &gid);
28ab59d0
JR
242 if (ret) {
243 /* Default to root group. */
244 gid = 0;
245 }
246
247 ret = chown(sock_path, 0, gid);
ab0ee2ca
JG
248 if (ret) {
249 ERR("Failed to set the notification channel socket's group");
250 ret = -1;
251 goto error;
252 }
253 }
254
28ab034a 255 DBG("Notification channel UNIX socket created (fd = %i)", fd);
ab0ee2ca
JG
256 free(sock_path);
257 return fd;
258error:
259 if (fd >= 0 && close(fd) < 0) {
260 PERROR("close notification channel socket");
261 }
262 free(sock_path);
263 return ret;
264}
265
28ab034a
JG
266static int init_poll_set(struct lttng_poll_event *poll_set,
267 struct notification_thread_handle *handle,
268 int notification_channel_socket)
ab0ee2ca
JG
269{
270 int ret;
271
272 /*
273 * Create pollset with size 5:
274 * - notification channel socket (listen for new connections),
275 * - command queue event fd (internal sessiond commands),
276 * - consumerd (32-bit user space) channel monitor pipe,
277 * - consumerd (64-bit user space) channel monitor pipe,
278 * - consumerd (kernel) channel monitor pipe.
279 */
280 ret = lttng_poll_create(poll_set, 5, LTTNG_CLOEXEC);
281 if (ret < 0) {
282 goto end;
283 }
284
1524f98c 285 ret = lttng_poll_add(poll_set, notification_channel_socket, LPOLLIN | LPOLLRDHUP);
ab0ee2ca 286 if (ret < 0) {
bd0514a5 287 ERR("Failed to add notification channel socket to pollset");
ab0ee2ca
JG
288 goto error;
289 }
1524f98c 290 ret = lttng_poll_add(poll_set, handle->cmd_queue.event_fd, LPOLLIN);
ab0ee2ca 291 if (ret < 0) {
bd0514a5 292 ERR("Failed to add notification command queue event fd to pollset");
ab0ee2ca
JG
293 goto error;
294 }
28ab034a 295 ret = lttng_poll_add(poll_set, handle->channel_monitoring_pipes.ust32_consumer, LPOLLIN);
ab0ee2ca 296 if (ret < 0) {
bd0514a5 297 ERR("Failed to add ust-32 channel monitoring pipe fd to pollset");
ab0ee2ca
JG
298 goto error;
299 }
28ab034a 300 ret = lttng_poll_add(poll_set, handle->channel_monitoring_pipes.ust64_consumer, LPOLLIN);
ab0ee2ca 301 if (ret < 0) {
bd0514a5 302 ERR("Failed to add ust-64 channel monitoring pipe fd to pollset");
ab0ee2ca
JG
303 goto error;
304 }
305 if (handle->channel_monitoring_pipes.kernel_consumer < 0) {
306 goto end;
307 }
28ab034a 308 ret = lttng_poll_add(poll_set, handle->channel_monitoring_pipes.kernel_consumer, LPOLLIN);
ab0ee2ca 309 if (ret < 0) {
bd0514a5 310 ERR("Failed to add kernel channel monitoring pipe fd to pollset");
ab0ee2ca
JG
311 goto error;
312 }
313end:
314 return ret;
315error:
316 lttng_poll_clean(poll_set);
317 return ret;
318}
319
28ab034a 320static void fini_thread_state(struct notification_thread_state *state)
ab0ee2ca
JG
321{
322 int ret;
323
324 if (state->client_socket_ht) {
325 ret = handle_notification_thread_client_disconnect_all(state);
a0377dfe 326 LTTNG_ASSERT(!ret);
cd9adb8b 327 ret = cds_lfht_destroy(state->client_socket_ht, nullptr);
a0377dfe 328 LTTNG_ASSERT(!ret);
ab0ee2ca 329 }
ac1889bf 330 if (state->client_id_ht) {
cd9adb8b 331 ret = cds_lfht_destroy(state->client_id_ht, nullptr);
a0377dfe 332 LTTNG_ASSERT(!ret);
ac1889bf 333 }
ab0ee2ca
JG
334 if (state->triggers_ht) {
335 ret = handle_notification_thread_trigger_unregister_all(state);
a0377dfe 336 LTTNG_ASSERT(!ret);
cd9adb8b 337 ret = cds_lfht_destroy(state->triggers_ht, nullptr);
a0377dfe 338 LTTNG_ASSERT(!ret);
ab0ee2ca
JG
339 }
340 if (state->channel_triggers_ht) {
cd9adb8b 341 ret = cds_lfht_destroy(state->channel_triggers_ht, nullptr);
a0377dfe 342 LTTNG_ASSERT(!ret);
ab0ee2ca
JG
343 }
344 if (state->channel_state_ht) {
cd9adb8b 345 ret = cds_lfht_destroy(state->channel_state_ht, nullptr);
a0377dfe 346 LTTNG_ASSERT(!ret);
ab0ee2ca
JG
347 }
348 if (state->notification_trigger_clients_ht) {
cd9adb8b 349 ret = cds_lfht_destroy(state->notification_trigger_clients_ht, nullptr);
a0377dfe 350 LTTNG_ASSERT(!ret);
ab0ee2ca
JG
351 }
352 if (state->channels_ht) {
cd9adb8b 353 ret = cds_lfht_destroy(state->channels_ht, nullptr);
a0377dfe 354 LTTNG_ASSERT(!ret);
8abe313a
JG
355 }
356 if (state->sessions_ht) {
cd9adb8b 357 ret = cds_lfht_destroy(state->sessions_ht, nullptr);
a0377dfe 358 LTTNG_ASSERT(!ret);
ab0ee2ca 359 }
242388e4 360 if (state->triggers_by_name_uid_ht) {
cd9adb8b 361 ret = cds_lfht_destroy(state->triggers_by_name_uid_ht, nullptr);
a0377dfe 362 LTTNG_ASSERT(!ret);
242388e4 363 }
e7c93cf9 364 if (state->trigger_tokens_ht) {
cd9adb8b 365 ret = cds_lfht_destroy(state->trigger_tokens_ht, nullptr);
a0377dfe 366 LTTNG_ASSERT(!ret);
e7c93cf9 367 }
ea9a44f0
JG
368 /*
369 * Must be destroyed after all channels have been destroyed.
370 * See comment in struct lttng_session_trigger_list.
371 */
372 if (state->session_triggers_ht) {
cd9adb8b 373 ret = cds_lfht_destroy(state->session_triggers_ht, nullptr);
a0377dfe 374 LTTNG_ASSERT(!ret);
ea9a44f0 375 }
ab0ee2ca 376 if (state->notification_channel_socket >= 0) {
28ab034a 377 notification_channel_socket_destroy(state->notification_channel_socket);
ab0ee2ca 378 }
d02d7404 379
a0377dfe 380 LTTNG_ASSERT(cds_list_empty(&state->tracer_event_sources_list));
d02d7404 381
f2b3ef9f
JG
382 if (state->executor) {
383 action_executor_destroy(state->executor);
384 }
ab0ee2ca
JG
385 lttng_poll_clean(&state->events);
386}
387
28ab034a 388static void mark_thread_as_ready(struct notification_thread_handle *handle)
c8a9de5a
JG
389{
390 DBG("Marking notification thread as ready");
391 sem_post(&handle->ready);
392}
393
28ab034a 394static void wait_until_thread_is_ready(struct notification_thread_handle *handle)
c8a9de5a
JG
395{
396 DBG("Waiting for notification thread to be ready");
397 sem_wait(&handle->ready);
398 DBG("Notification thread is ready");
399}
400
28ab034a
JG
401static int init_thread_state(struct notification_thread_handle *handle,
402 struct notification_thread_state *state)
ab0ee2ca
JG
403{
404 int ret;
405
406 memset(state, 0, sizeof(*state));
407 state->notification_channel_socket = -1;
e6887944 408 state->trigger_id.next_tracer_token = 1;
ab0ee2ca
JG
409 lttng_poll_init(&state->events);
410
411 ret = notification_channel_socket_create();
412 if (ret < 0) {
413 goto end;
414 }
415 state->notification_channel_socket = ret;
416
28ab034a 417 ret = init_poll_set(&state->events, handle, state->notification_channel_socket);
ab0ee2ca
JG
418 if (ret) {
419 goto end;
420 }
421
bd0514a5 422 DBG("Listening on notification channel socket");
ab0ee2ca
JG
423 ret = lttcomm_listen_unix_sock(state->notification_channel_socket);
424 if (ret < 0) {
bd0514a5 425 ERR("Listen failed on notification channel socket");
ab0ee2ca
JG
426 goto error;
427 }
428
28ab034a 429 state->client_socket_ht = cds_lfht_new(
cd9adb8b 430 DEFAULT_HT_SIZE, 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, nullptr);
ab0ee2ca
JG
431 if (!state->client_socket_ht) {
432 goto error;
433 }
434
28ab034a 435 state->client_id_ht = cds_lfht_new(
cd9adb8b 436 DEFAULT_HT_SIZE, 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, nullptr);
ac1889bf
JG
437 if (!state->client_id_ht) {
438 goto error;
439 }
440
28ab034a 441 state->channel_triggers_ht = cds_lfht_new(
cd9adb8b 442 DEFAULT_HT_SIZE, 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, nullptr);
ab0ee2ca
JG
443 if (!state->channel_triggers_ht) {
444 goto error;
445 }
446
28ab034a 447 state->session_triggers_ht = cds_lfht_new(
cd9adb8b 448 DEFAULT_HT_SIZE, 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, nullptr);
ea9a44f0
JG
449 if (!state->session_triggers_ht) {
450 goto error;
451 }
452
28ab034a 453 state->channel_state_ht = cds_lfht_new(
cd9adb8b 454 DEFAULT_HT_SIZE, 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, nullptr);
ab0ee2ca
JG
455 if (!state->channel_state_ht) {
456 goto error;
457 }
458
28ab034a 459 state->notification_trigger_clients_ht = cds_lfht_new(
cd9adb8b 460 DEFAULT_HT_SIZE, 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, nullptr);
ab0ee2ca
JG
461 if (!state->notification_trigger_clients_ht) {
462 goto error;
463 }
464
28ab034a 465 state->channels_ht = cds_lfht_new(
cd9adb8b 466 DEFAULT_HT_SIZE, 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, nullptr);
ab0ee2ca
JG
467 if (!state->channels_ht) {
468 goto error;
469 }
28ab034a 470 state->sessions_ht = cds_lfht_new(
cd9adb8b 471 DEFAULT_HT_SIZE, 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, nullptr);
8abe313a
JG
472 if (!state->sessions_ht) {
473 goto error;
474 }
28ab034a 475 state->triggers_ht = cds_lfht_new(
cd9adb8b 476 DEFAULT_HT_SIZE, 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, nullptr);
ab0ee2ca
JG
477 if (!state->triggers_ht) {
478 goto error;
f2b3ef9f 479 }
28ab034a 480 state->triggers_by_name_uid_ht = cds_lfht_new(
cd9adb8b 481 DEFAULT_HT_SIZE, 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, nullptr);
242388e4
JR
482 if (!state->triggers_by_name_uid_ht) {
483 goto error;
484 }
f2b3ef9f 485
28ab034a 486 state->trigger_tokens_ht = cds_lfht_new(
cd9adb8b 487 DEFAULT_HT_SIZE, 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, nullptr);
e7c93cf9
JR
488 if (!state->trigger_tokens_ht) {
489 goto error;
490 }
491
d02d7404
JR
492 CDS_INIT_LIST_HEAD(&state->tracer_event_sources_list);
493
f2b3ef9f
JG
494 state->executor = action_executor_create(handle);
495 if (!state->executor) {
496 goto error;
ab0ee2ca 497 }
8b524060
FD
498
499 state->restart_poll = false;
500
c8a9de5a 501 mark_thread_as_ready(handle);
ab0ee2ca
JG
502end:
503 return 0;
504error:
505 fini_thread_state(state);
506 return -1;
507}
508
28ab034a
JG
509static int handle_channel_monitoring_pipe(int fd,
510 uint32_t revents,
511 struct notification_thread_handle *handle,
512 struct notification_thread_state *state)
ab0ee2ca
JG
513{
514 int ret = 0;
515 enum lttng_domain_type domain;
516
517 if (fd == handle->channel_monitoring_pipes.ust32_consumer ||
28ab034a 518 fd == handle->channel_monitoring_pipes.ust64_consumer) {
ab0ee2ca
JG
519 domain = LTTNG_DOMAIN_UST;
520 } else if (fd == handle->channel_monitoring_pipes.kernel_consumer) {
521 domain = LTTNG_DOMAIN_KERNEL;
522 } else {
523 abort();
524 }
525
526 if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
527 ret = lttng_poll_del(&state->events, fd);
528 if (ret) {
bd0514a5 529 ERR("Failed to remove consumer monitoring pipe from poll set");
ab0ee2ca
JG
530 }
531 goto end;
532 }
533
28ab034a 534 ret = handle_notification_thread_channel_sample(state, fd, domain);
ab0ee2ca 535 if (ret) {
bd0514a5 536 ERR("Consumer sample handling error occurred");
ab0ee2ca
JG
537 ret = -1;
538 goto end;
539 }
540end:
541 return ret;
542}
543
94078603 544static int handle_event_notification_pipe(int event_source_fd,
28ab034a
JG
545 enum lttng_domain_type domain,
546 uint32_t revents,
547 struct notification_thread_state *state)
94078603
JR
548{
549 int ret = 0;
550
551 if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
28ab034a 552 ret = handle_notification_thread_tracer_event_source_died(state, event_source_fd);
94078603 553 if (ret) {
bd0514a5 554 ERR("Failed to remove event notification pipe from poll set: fd = %d",
28ab034a 555 event_source_fd);
94078603
JR
556 }
557 goto end;
558 }
559
38eb8a68
FD
560 if (testpoint(sessiond_handle_notifier_event_pipe)) {
561 ret = 0;
562 goto end;
563 }
564
565 if (caa_unlikely(notifier_consumption_paused)) {
566 DBG("Event notifier notification consumption paused, sleeping...");
567 sleep(1);
568 goto end;
569 }
570
28ab034a 571 ret = handle_notification_thread_event_notification(state, event_source_fd, domain);
94078603 572 if (ret) {
28ab034a 573 ERR("Event notification handling error occurred for fd: %d", event_source_fd);
94078603
JR
574 ret = -1;
575 goto end;
576 }
38eb8a68 577
94078603
JR
578end:
579 return ret;
580}
581
582/*
583 * Return the event source domain type via parameter.
584 */
585static bool fd_is_event_notification_source(const struct notification_thread_state *state,
28ab034a
JG
586 int fd,
587 enum lttng_domain_type *domain)
94078603
JR
588{
589 struct notification_event_tracer_event_source_element *source_element;
590
a0377dfe 591 LTTNG_ASSERT(domain);
94078603 592
28ab034a 593 cds_list_for_each_entry (source_element, &state->tracer_event_sources_list, node) {
94078603
JR
594 if (source_element->fd != fd) {
595 continue;
596 }
597
598 *domain = source_element->domain;
599 return true;
600 }
601
602 return false;
603}
604
ab0ee2ca
JG
605/*
606 * This thread services notification channel clients and commands received
607 * from various lttng-sessiond components over a command queue.
608 */
28ab034a 609static void *thread_notification(void *data)
ab0ee2ca
JG
610{
611 int ret;
7966af57 612 struct notification_thread_handle *handle = (notification_thread_handle *) data;
ab0ee2ca 613 struct notification_thread_state state;
94078603 614 enum lttng_domain_type domain;
ab0ee2ca 615
bd0514a5 616 DBG("Started notification thread");
ab0ee2ca 617
412d7227 618 health_register(the_health_sessiond, HEALTH_SESSIOND_TYPE_NOTIFICATION);
f620cc28
JG
619 rcu_register_thread();
620 rcu_thread_online();
621
ab0ee2ca 622 if (!handle) {
bd0514a5 623 ERR("Invalid thread context provided");
ab0ee2ca
JG
624 goto end;
625 }
626
ab0ee2ca
JG
627 health_code_update();
628
629 ret = init_thread_state(handle, &state);
630 if (ret) {
631 goto end;
632 }
633
38eb8a68
FD
634 if (testpoint(sessiond_thread_notification)) {
635 goto end;
636 }
637
ab0ee2ca
JG
638 while (true) {
639 int fd_count, i;
640
641 health_poll_entry();
bd0514a5 642 DBG("Entering poll wait");
ab0ee2ca 643 ret = lttng_poll_wait(&state.events, -1);
bd0514a5 644 DBG("Poll wait returned (%i)", ret);
ab0ee2ca
JG
645 health_poll_exit();
646 if (ret < 0) {
647 /*
648 * Restart interrupted system call.
649 */
650 if (errno == EINTR) {
651 continue;
652 }
bd0514a5 653 ERR("Error encountered during lttng_poll_wait (%i)", ret);
ab0ee2ca
JG
654 goto error;
655 }
656
8b524060
FD
657 /*
658 * Reset restart_poll flag so that calls below might turn it
659 * on.
660 */
661 state.restart_poll = false;
662
ab0ee2ca
JG
663 fd_count = ret;
664 for (i = 0; i < fd_count; i++) {
665 int fd = LTTNG_POLL_GETFD(&state.events, i);
666 uint32_t revents = LTTNG_POLL_GETEV(&state.events, i);
667
bd0514a5 668 DBG("Handling fd (%i) activity (%u)", fd, revents);
ab0ee2ca
JG
669
670 if (fd == state.notification_channel_socket) {
671 if (revents & LPOLLIN) {
28ab034a 672 ret = handle_notification_thread_client_connect(&state);
ab0ee2ca
JG
673 if (ret < 0) {
674 goto error;
675 }
28ab034a 676 } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
bd0514a5 677 ERR("Notification socket poll error");
ab0ee2ca
JG
678 goto error;
679 } else {
28ab034a
JG
680 ERR("Unexpected poll events %u for notification socket %i",
681 revents,
682 fd);
ab0ee2ca
JG
683 goto error;
684 }
f370852f 685 } else if (fd == handle->cmd_queue.event_fd) {
28ab034a 686 ret = handle_notification_thread_command(handle, &state);
ab0ee2ca 687 if (ret < 0) {
bd0514a5 688 DBG("Error encountered while servicing command queue");
ab0ee2ca
JG
689 goto error;
690 } else if (ret > 0) {
691 goto exit;
692 }
693 } else if (fd == handle->channel_monitoring_pipes.ust32_consumer ||
28ab034a
JG
694 fd == handle->channel_monitoring_pipes.ust64_consumer ||
695 fd == handle->channel_monitoring_pipes.kernel_consumer) {
696 ret = handle_channel_monitoring_pipe(fd, revents, handle, &state);
ab0ee2ca
JG
697 if (ret) {
698 goto error;
699 }
94078603
JR
700 } else if (fd_is_event_notification_source(&state, fd, &domain)) {
701 ret = handle_event_notification_pipe(fd, domain, revents, &state);
702 if (ret) {
703 goto error;
704 }
ab0ee2ca
JG
705 } else {
706 /* Activity on a client's socket. */
707 if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
708 /*
709 * It doesn't matter if a command was
710 * pending on the client socket at this
711 * point since it now has no way to
712 * receive the notifications to which
713 * it was subscribing or unsubscribing.
714 */
28ab034a
JG
715 ret = handle_notification_thread_client_disconnect(fd,
716 &state);
ab0ee2ca
JG
717 if (ret) {
718 goto error;
719 }
720 } else {
721 if (revents & LPOLLIN) {
28ab034a
JG
722 ret = handle_notification_thread_client_in(&state,
723 fd);
ab0ee2ca
JG
724 if (ret) {
725 goto error;
726 }
727 }
728
729 if (revents & LPOLLOUT) {
28ab034a
JG
730 ret = handle_notification_thread_client_out(&state,
731 fd);
ab0ee2ca
JG
732 if (ret) {
733 goto error;
734 }
735 }
736 }
737 }
8b524060
FD
738
739 /*
740 * Calls above might have changed the state of the
741 * FDs in `state.events`. Call _poll_wait() again to
742 * ensure we have a consistent state.
743 */
744 if (state.restart_poll) {
745 break;
746 }
ab0ee2ca
JG
747 }
748 }
749exit:
750error:
751 fini_thread_state(&state);
f620cc28 752end:
ab0ee2ca
JG
753 rcu_thread_offline();
754 rcu_unregister_thread();
412d7227 755 health_unregister(the_health_sessiond);
cd9adb8b 756 return nullptr;
ab0ee2ca 757}
c8a9de5a 758
28ab034a 759static bool shutdown_notification_thread(void *thread_data)
c8a9de5a 760{
7966af57 761 struct notification_thread_handle *handle = (notification_thread_handle *) thread_data;
c8a9de5a
JG
762
763 notification_thread_command_quit(handle);
764 return true;
765}
766
28ab034a 767struct lttng_thread *launch_notification_thread(struct notification_thread_handle *handle)
c8a9de5a
JG
768{
769 struct lttng_thread *thread;
770
28ab034a 771 thread = lttng_thread_create(
cd9adb8b 772 "Notification", thread_notification, shutdown_notification_thread, nullptr, handle);
c8a9de5a
JG
773 if (!thread) {
774 goto error;
775 }
776
777 /*
778 * Wait for the thread to be marked as "ready" before returning
779 * as other subsystems depend on the notification subsystem
780 * (e.g. rotation thread).
781 */
782 wait_until_thread_is_ready(handle);
4a91420c 783 return thread;
c8a9de5a 784error:
cd9adb8b 785 return nullptr;
c8a9de5a 786}
This page took 0.104523 seconds and 4 git commands to generate.