Fix: use lttng_waiter instead of futex in notification thread
[lttng-tools.git] / src / bin / lttng-sessiond / notification-thread.c
1 /*
2 * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18 #define _LGPL_SOURCE
19 #include <lttng/trigger/trigger.h>
20 #include <lttng/notification/channel-internal.h>
21 #include <lttng/notification/notification-internal.h>
22 #include <lttng/condition/condition-internal.h>
23 #include <lttng/condition/buffer-usage-internal.h>
24 #include <common/error.h>
25 #include <common/config/session-config.h>
26 #include <common/defaults.h>
27 #include <common/utils.h>
28 #include <common/align.h>
29 #include <common/time.h>
30 #include <sys/eventfd.h>
31 #include <sys/stat.h>
32 #include <time.h>
33 #include <signal.h>
34
35 #include "notification-thread.h"
36 #include "notification-thread-events.h"
37 #include "notification-thread-commands.h"
38 #include "lttng-sessiond.h"
39 #include "health-sessiond.h"
40
41 #include <urcu.h>
42 #include <urcu/list.h>
43 #include <urcu/rculfhash.h>
44
45 /**
46 * This thread maintains an internal state associating clients and triggers.
47 *
48 * In order to speed-up and simplify queries, hash tables providing the
49 * following associations are maintained:
50 *
51 * - client_socket_ht: associate a client's socket (fd) to its "struct client"
52 * This hash table owns the "struct client" which must thus be
53 * disposed-of on removal from the hash table.
54 *
55 * - channel_triggers_ht:
56 * associates a channel key to a list of
57 * struct lttng_trigger_list_nodes. The triggers in this list are
58 * those that have conditions that apply to this channel.
59 * This hash table owns the list, but not the triggers themselves.
60 *
61 * - channel_state_ht:
62 * associates a pair (channel key, channel domain) to its last
63 * sampled state received from the consumer daemon
64 * (struct channel_state).
65 * This previous sample is kept to implement edge-triggered
66 * conditions as we need to detect the state transitions.
67 * This hash table owns the channel state.
68 *
69 * - notification_trigger_clients_ht:
70 * associates notification-emitting triggers to clients
71 * (struct notification_client_ht_node) subscribed to those
72 * conditions.
73 * The condition's hash and match functions are used directly since
74 * all triggers in this hash table have the "notify" action.
75 * This hash table holds no ownership.
76 *
77 * - channels_ht:
78 * associates a channel_key to a struct channel_info. The hash table
79 * holds the ownership of the struct channel_info.
80 *
81 * - triggers_ht:
82 * associated a condition to a struct lttng_trigger_ht_element.
83 * The hash table holds the ownership of the
84 * lttng_trigger_ht_elements along with the triggers themselves.
85 *
86 * The thread reacts to the following internal events:
87 * 1) creation of a tracing channel,
88 * 2) destruction of a tracing channel,
89 * 3) registration of a trigger,
90 * 4) unregistration of a trigger,
91 * 5) reception of a channel monitor sample from the consumer daemon.
92 *
93 * Events specific to notification-emitting triggers:
94 * 6) connection of a notification client,
95 * 7) disconnection of a notification client,
96 * 8) subscription of a client to a conditions' notifications,
97 * 9) unsubscription of a client from a conditions' notifications,
98 *
99 *
100 * 1) Creation of a tracing channel
101 * - notification_trigger_clients_ht is traversed to identify
102 * triggers which apply to this new channel,
103 * - triggers identified are added to the channel_triggers_ht.
104 * - add channel to channels_ht
105 *
106 * 2) Destruction of a tracing channel
107 * - remove entry from channel_triggers_ht, releasing the list wrapper and
108 * elements,
109 * - remove entry from the channel_state_ht.
110 * - remove channel from channels_ht
111 *
112 * 3) Registration of a trigger
113 * - if the trigger's action is of type "notify",
114 * - traverse the list of conditions of every client to build a list of
115 * clients which have to be notified when this trigger's condition is met,
116 * - add list of clients (even if it is empty) to the
117 * notification_trigger_clients_ht,
118 * - add trigger to channel_triggers_ht (if applicable),
119 * - add trigger to triggers_ht
120 *
121 * 4) Unregistration of a trigger
122 * - if the trigger's action is of type "notify",
123 * - remove the trigger from the notification_trigger_clients_ht,
124 * - remove trigger from channel_triggers_ht (if applicable),
125 * - remove trigger from triggers_ht
126 *
127 * 5) Reception of a channel monitor sample from the consumer daemon
128 * - evaluate the conditions associated with the triggers found in
129 * the channel_triggers_ht,
130 * - if a condition evaluates to "true" and the condition is of type
131 * "notify", query the notification_trigger_clients_ht and send
132 * a notification to the clients.
133 *
134 * 6) Connection of a client
135 * - add client socket to the client_socket_ht.
136 *
137 * 7) Disconnection of a client
138 * - remove client socket from the client_socket_ht,
139 * - traverse all conditions to which the client is subscribed and remove
140 * the client from the notification_trigger_clients_ht.
141 *
142 * 8) Subscription of a client to a condition's notifications
143 * - Add the condition to the client's list of subscribed conditions,
144 * - Look-up notification_trigger_clients_ht and add the client to
145 * list of clients.
146 *
147 * 9) Unsubscription of a client to a condition's notifications
148 * - Remove the condition from the client's list of subscribed conditions,
149 * - Look-up notification_trigger_clients_ht and remove the client
150 * from the list of clients.
151 */
152
153 /*
154 * Destroy the thread data previously created by the init function.
155 */
156 void notification_thread_handle_destroy(
157 struct notification_thread_handle *handle)
158 {
159 int ret;
160
161 if (!handle) {
162 goto end;
163 }
164
165 if (handle->cmd_queue.event_fd < 0) {
166 goto end;
167 }
168 ret = close(handle->cmd_queue.event_fd);
169 if (ret < 0) {
170 PERROR("close notification command queue event_fd");
171 }
172
173 assert(cds_list_empty(&handle->cmd_queue.list));
174 pthread_mutex_destroy(&handle->cmd_queue.lock);
175
176 if (handle->channel_monitoring_pipes.ust32_consumer >= 0) {
177 ret = close(handle->channel_monitoring_pipes.ust32_consumer);
178 if (ret) {
179 PERROR("close 32-bit consumer channel monitoring pipe");
180 }
181 }
182 if (handle->channel_monitoring_pipes.ust64_consumer >= 0) {
183 ret = close(handle->channel_monitoring_pipes.ust64_consumer);
184 if (ret) {
185 PERROR("close 64-bit consumer channel monitoring pipe");
186 }
187 }
188 if (handle->channel_monitoring_pipes.kernel_consumer >= 0) {
189 ret = close(handle->channel_monitoring_pipes.kernel_consumer);
190 if (ret) {
191 PERROR("close kernel consumer channel monitoring pipe");
192 }
193 }
194 end:
195 free(handle);
196 }
197
198 struct notification_thread_handle *notification_thread_handle_create(
199 struct lttng_pipe *ust32_channel_monitor_pipe,
200 struct lttng_pipe *ust64_channel_monitor_pipe,
201 struct lttng_pipe *kernel_channel_monitor_pipe)
202 {
203 int ret;
204 struct notification_thread_handle *handle;
205
206 handle = zmalloc(sizeof(*handle));
207 if (!handle) {
208 goto end;
209 }
210
211 /* FIXME Replace eventfd by a pipe to support older kernels. */
212 handle->cmd_queue.event_fd = eventfd(0, EFD_CLOEXEC);
213 if (handle->cmd_queue.event_fd < 0) {
214 PERROR("eventfd notification command queue");
215 goto error;
216 }
217 CDS_INIT_LIST_HEAD(&handle->cmd_queue.list);
218 ret = pthread_mutex_init(&handle->cmd_queue.lock, NULL);
219 if (ret) {
220 goto error;
221 }
222
223 if (ust32_channel_monitor_pipe) {
224 handle->channel_monitoring_pipes.ust32_consumer =
225 lttng_pipe_release_readfd(
226 ust32_channel_monitor_pipe);
227 if (handle->channel_monitoring_pipes.ust32_consumer < 0) {
228 goto error;
229 }
230 } else {
231 handle->channel_monitoring_pipes.ust32_consumer = -1;
232 }
233 if (ust64_channel_monitor_pipe) {
234 handle->channel_monitoring_pipes.ust64_consumer =
235 lttng_pipe_release_readfd(
236 ust64_channel_monitor_pipe);
237 if (handle->channel_monitoring_pipes.ust64_consumer < 0) {
238 goto error;
239 }
240 } else {
241 handle->channel_monitoring_pipes.ust64_consumer = -1;
242 }
243 if (kernel_channel_monitor_pipe) {
244 handle->channel_monitoring_pipes.kernel_consumer =
245 lttng_pipe_release_readfd(
246 kernel_channel_monitor_pipe);
247 if (handle->channel_monitoring_pipes.kernel_consumer < 0) {
248 goto error;
249 }
250 } else {
251 handle->channel_monitoring_pipes.kernel_consumer = -1;
252 }
253 end:
254 return handle;
255 error:
256 notification_thread_handle_destroy(handle);
257 return NULL;
258 }
259
260 static
261 char *get_notification_channel_sock_path(void)
262 {
263 int ret;
264 bool is_root = !getuid();
265 char *sock_path;
266
267 sock_path = zmalloc(LTTNG_PATH_MAX);
268 if (!sock_path) {
269 goto error;
270 }
271
272 if (is_root) {
273 ret = snprintf(sock_path, LTTNG_PATH_MAX,
274 DEFAULT_GLOBAL_NOTIFICATION_CHANNEL_UNIX_SOCK);
275 if (ret < 0) {
276 goto error;
277 }
278 } else {
279 char *home_path = utils_get_home_dir();
280
281 if (!home_path) {
282 ERR("Can't get HOME directory for socket creation");
283 goto error;
284 }
285
286 ret = snprintf(sock_path, LTTNG_PATH_MAX,
287 DEFAULT_HOME_NOTIFICATION_CHANNEL_UNIX_SOCK,
288 home_path);
289 if (ret < 0) {
290 goto error;
291 }
292 }
293
294 return sock_path;
295 error:
296 free(sock_path);
297 return NULL;
298 }
299
300 static
301 void notification_channel_socket_destroy(int fd)
302 {
303 int ret;
304 char *sock_path = get_notification_channel_sock_path();
305
306 DBG("[notification-thread] Destroying notification channel socket");
307
308 if (sock_path) {
309 ret = unlink(sock_path);
310 free(sock_path);
311 if (ret < 0) {
312 PERROR("unlink notification channel socket");
313 }
314 }
315
316 ret = close(fd);
317 if (ret) {
318 PERROR("close notification channel socket");
319 }
320 }
321
322 static
323 int notification_channel_socket_create(void)
324 {
325 int fd = -1, ret;
326 char *sock_path = get_notification_channel_sock_path();
327
328 DBG("[notification-thread] Creating notification channel UNIX socket at %s",
329 sock_path);
330
331 ret = lttcomm_create_unix_sock(sock_path);
332 if (ret < 0) {
333 ERR("[notification-thread] Failed to create notification socket");
334 goto error;
335 }
336 fd = ret;
337
338 ret = chmod(sock_path, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
339 if (ret < 0) {
340 ERR("Set file permissions failed: %s", sock_path);
341 PERROR("chmod notification channel socket");
342 goto error;
343 }
344
345 if (getuid() == 0) {
346 ret = chown(sock_path, 0,
347 utils_get_group_id(tracing_group_name));
348 if (ret) {
349 ERR("Failed to set the notification channel socket's group");
350 ret = -1;
351 goto error;
352 }
353 }
354
355 DBG("[notification-thread] Notification channel UNIX socket created (fd = %i)",
356 fd);
357 free(sock_path);
358 return fd;
359 error:
360 if (fd >= 0 && close(fd) < 0) {
361 PERROR("close notification channel socket");
362 }
363 free(sock_path);
364 return ret;
365 }
366
367 static
368 int init_poll_set(struct lttng_poll_event *poll_set,
369 struct notification_thread_handle *handle,
370 int notification_channel_socket)
371 {
372 int ret;
373
374 /*
375 * Create pollset with size 5:
376 * - notification channel socket (listen for new connections),
377 * - command queue event fd (internal sessiond commands),
378 * - consumerd (32-bit user space) channel monitor pipe,
379 * - consumerd (64-bit user space) channel monitor pipe,
380 * - consumerd (kernel) channel monitor pipe.
381 */
382 ret = lttng_poll_create(poll_set, 5, LTTNG_CLOEXEC);
383 if (ret < 0) {
384 goto end;
385 }
386
387 ret = lttng_poll_add(poll_set, notification_channel_socket,
388 LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP);
389 if (ret < 0) {
390 ERR("[notification-thread] Failed to add notification channel socket to pollset");
391 goto error;
392 }
393 ret = lttng_poll_add(poll_set, handle->cmd_queue.event_fd,
394 LPOLLIN | LPOLLERR);
395 if (ret < 0) {
396 ERR("[notification-thread] Failed to add notification command queue event fd to pollset");
397 goto error;
398 }
399 ret = lttng_poll_add(poll_set,
400 handle->channel_monitoring_pipes.ust32_consumer,
401 LPOLLIN | LPOLLERR);
402 if (ret < 0) {
403 ERR("[notification-thread] Failed to add ust-32 channel monitoring pipe fd to pollset");
404 goto error;
405 }
406 ret = lttng_poll_add(poll_set,
407 handle->channel_monitoring_pipes.ust64_consumer,
408 LPOLLIN | LPOLLERR);
409 if (ret < 0) {
410 ERR("[notification-thread] Failed to add ust-64 channel monitoring pipe fd to pollset");
411 goto error;
412 }
413 if (handle->channel_monitoring_pipes.kernel_consumer < 0) {
414 goto end;
415 }
416 ret = lttng_poll_add(poll_set,
417 handle->channel_monitoring_pipes.kernel_consumer,
418 LPOLLIN | LPOLLERR);
419 if (ret < 0) {
420 ERR("[notification-thread] Failed to add kernel channel monitoring pipe fd to pollset");
421 goto error;
422 }
423 end:
424 return ret;
425 error:
426 lttng_poll_clean(poll_set);
427 return ret;
428 }
429
430 static
431 void fini_thread_state(struct notification_thread_state *state)
432 {
433 int ret;
434
435 if (state->client_socket_ht) {
436 ret = handle_notification_thread_client_disconnect_all(state);
437 assert(!ret);
438 ret = cds_lfht_destroy(state->client_socket_ht, NULL);
439 assert(!ret);
440 }
441 if (state->triggers_ht) {
442 ret = handle_notification_thread_trigger_unregister_all(state);
443 assert(!ret);
444 ret = cds_lfht_destroy(state->triggers_ht, NULL);
445 assert(!ret);
446 }
447 if (state->channel_triggers_ht) {
448 ret = cds_lfht_destroy(state->channel_triggers_ht, NULL);
449 assert(!ret);
450 }
451 if (state->channel_state_ht) {
452 ret = cds_lfht_destroy(state->channel_state_ht, NULL);
453 assert(!ret);
454 }
455 if (state->notification_trigger_clients_ht) {
456 ret = cds_lfht_destroy(state->notification_trigger_clients_ht,
457 NULL);
458 assert(!ret);
459 }
460 if (state->channels_ht) {
461 ret = cds_lfht_destroy(state->channels_ht,
462 NULL);
463 assert(!ret);
464 }
465
466 if (state->notification_channel_socket >= 0) {
467 notification_channel_socket_destroy(
468 state->notification_channel_socket);
469 }
470 lttng_poll_clean(&state->events);
471 }
472
473 static
474 int init_thread_state(struct notification_thread_handle *handle,
475 struct notification_thread_state *state)
476 {
477 int ret;
478
479 memset(state, 0, sizeof(*state));
480 state->notification_channel_socket = -1;
481 lttng_poll_init(&state->events);
482
483 ret = notification_channel_socket_create();
484 if (ret < 0) {
485 goto end;
486 }
487 state->notification_channel_socket = ret;
488
489 ret = init_poll_set(&state->events, handle,
490 state->notification_channel_socket);
491 if (ret) {
492 goto end;
493 }
494
495 DBG("[notification-thread] Listening on notification channel socket");
496 ret = lttcomm_listen_unix_sock(state->notification_channel_socket);
497 if (ret < 0) {
498 ERR("[notification-thread] Listen failed on notification channel socket");
499 goto error;
500 }
501
502 state->client_socket_ht = cds_lfht_new(DEFAULT_HT_SIZE, 1, 0,
503 CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
504 if (!state->client_socket_ht) {
505 goto error;
506 }
507
508 state->channel_triggers_ht = cds_lfht_new(DEFAULT_HT_SIZE, 1, 0,
509 CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
510 if (!state->channel_triggers_ht) {
511 goto error;
512 }
513
514 state->channel_state_ht = cds_lfht_new(DEFAULT_HT_SIZE, 1, 0,
515 CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
516 if (!state->channel_state_ht) {
517 goto error;
518 }
519
520 state->notification_trigger_clients_ht = cds_lfht_new(DEFAULT_HT_SIZE,
521 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
522 if (!state->notification_trigger_clients_ht) {
523 goto error;
524 }
525
526 state->channels_ht = cds_lfht_new(DEFAULT_HT_SIZE,
527 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
528 if (!state->channels_ht) {
529 goto error;
530 }
531
532 state->triggers_ht = cds_lfht_new(DEFAULT_HT_SIZE,
533 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
534 if (!state->triggers_ht) {
535 goto error;
536 }
537 end:
538 return 0;
539 error:
540 fini_thread_state(state);
541 return -1;
542 }
543
544 static
545 int handle_channel_monitoring_pipe(int fd, uint32_t revents,
546 struct notification_thread_handle *handle,
547 struct notification_thread_state *state)
548 {
549 int ret = 0;
550 enum lttng_domain_type domain;
551
552 if (fd == handle->channel_monitoring_pipes.ust32_consumer ||
553 fd == handle->channel_monitoring_pipes.ust64_consumer) {
554 domain = LTTNG_DOMAIN_UST;
555 } else if (fd == handle->channel_monitoring_pipes.kernel_consumer) {
556 domain = LTTNG_DOMAIN_KERNEL;
557 } else {
558 abort();
559 }
560
561 if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
562 ret = lttng_poll_del(&state->events, fd);
563 if (ret) {
564 ERR("[notification-thread] Failed to remove consumer monitoring pipe from poll set");
565 }
566 goto end;
567 }
568
569 ret = handle_notification_thread_channel_sample(
570 state, fd, domain);
571 if (ret) {
572 ERR("[notification-thread] Consumer sample handling error occured");
573 ret = -1;
574 goto end;
575 }
576 end:
577 return ret;
578 }
579
580 /*
581 * This thread services notification channel clients and commands received
582 * from various lttng-sessiond components over a command queue.
583 */
584 void *thread_notification(void *data)
585 {
586 int ret;
587 struct notification_thread_handle *handle = data;
588 struct notification_thread_state state;
589
590 DBG("[notification-thread] Started notification thread");
591
592 if (!handle) {
593 ERR("[notification-thread] Invalid thread context provided");
594 goto end;
595 }
596
597 rcu_register_thread();
598 rcu_thread_online();
599
600 health_register(health_sessiond, HEALTH_SESSIOND_TYPE_NOTIFICATION);
601 health_code_update();
602
603 ret = init_thread_state(handle, &state);
604 if (ret) {
605 goto end;
606 }
607
608 /* Ready to handle client connections. */
609 sessiond_notify_ready();
610
611 while (true) {
612 int fd_count, i;
613
614 health_poll_entry();
615 DBG("[notification-thread] Entering poll wait");
616 ret = lttng_poll_wait(&state.events, -1);
617 DBG("[notification-thread] Poll wait returned (%i)", ret);
618 health_poll_exit();
619 if (ret < 0) {
620 /*
621 * Restart interrupted system call.
622 */
623 if (errno == EINTR) {
624 continue;
625 }
626 ERR("[notification-thread] Error encountered during lttng_poll_wait (%i)", ret);
627 goto error;
628 }
629
630 fd_count = ret;
631 for (i = 0; i < fd_count; i++) {
632 int fd = LTTNG_POLL_GETFD(&state.events, i);
633 uint32_t revents = LTTNG_POLL_GETEV(&state.events, i);
634
635 if (!revents) {
636 continue;
637 }
638 DBG("[notification-thread] Handling fd (%i) activity (%u)", fd, revents);
639
640 if (fd == state.notification_channel_socket) {
641 if (revents & LPOLLIN) {
642 ret = handle_notification_thread_client_connect(
643 &state);
644 if (ret < 0) {
645 goto error;
646 }
647 } else if (revents &
648 (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
649 ERR("[notification-thread] Notification socket poll error");
650 goto error;
651 } else {
652 ERR("[notification-thread] Unexpected poll events %u for notification socket %i", revents, fd);
653 goto error;
654 }
655 } else if (fd == handle->cmd_queue.event_fd) {
656 ret = handle_notification_thread_command(handle,
657 &state);
658 if (ret < 0) {
659 DBG("[notification-thread] Error encountered while servicing command queue");
660 goto error;
661 } else if (ret > 0) {
662 goto exit;
663 }
664 } else if (fd == handle->channel_monitoring_pipes.ust32_consumer ||
665 fd == handle->channel_monitoring_pipes.ust64_consumer ||
666 fd == handle->channel_monitoring_pipes.kernel_consumer) {
667 ret = handle_channel_monitoring_pipe(fd,
668 revents, handle, &state);
669 if (ret) {
670 goto error;
671 }
672 } else {
673 /* Activity on a client's socket. */
674 if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
675 /*
676 * It doesn't matter if a command was
677 * pending on the client socket at this
678 * point since it now has no way to
679 * receive the notifications to which
680 * it was subscribing or unsubscribing.
681 */
682 ret = handle_notification_thread_client_disconnect(
683 fd, &state);
684 if (ret) {
685 goto error;
686 }
687 } else {
688 if (revents & LPOLLIN) {
689 ret = handle_notification_thread_client_in(
690 &state, fd);
691 if (ret) {
692 goto error;
693 }
694 }
695
696 if (revents & LPOLLOUT) {
697 ret = handle_notification_thread_client_out(
698 &state, fd);
699 if (ret) {
700 goto error;
701 }
702 }
703 }
704 }
705 }
706 }
707 exit:
708 error:
709 fini_thread_state(&state);
710 health_unregister(health_sessiond);
711 rcu_thread_offline();
712 rcu_unregister_thread();
713 end:
714 return NULL;
715 }
This page took 0.043846 seconds and 4 git commands to generate.