Add the sessiond notification-handling subsystem
[lttng-tools.git] / src / bin / lttng-sessiond / notification-thread.c
CommitLineData
ab0ee2ca
JG
1/*
2 * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18#define _LGPL_SOURCE
19#include <lttng/trigger/trigger.h>
20#include <lttng/notification/channel-internal.h>
21#include <lttng/notification/notification-internal.h>
22#include <lttng/condition/condition-internal.h>
23#include <lttng/condition/buffer-usage-internal.h>
24#include <common/error.h>
25#include <common/config/session-config.h>
26#include <common/defaults.h>
27#include <common/utils.h>
28#include <common/futex.h>
29#include <common/align.h>
30#include <common/time.h>
31#include <sys/eventfd.h>
32#include <sys/stat.h>
33#include <time.h>
34#include <signal.h>
35
36#include "notification-thread.h"
37#include "notification-thread-events.h"
38#include "notification-thread-commands.h"
39#include "lttng-sessiond.h"
40#include "health-sessiond.h"
41
42#include <urcu.h>
43#include <urcu/list.h>
44#include <urcu/rculfhash.h>
45
46/**
47 * This thread maintains an internal state associating clients and triggers.
48 *
49 * In order to speed-up and simplify queries, hash tables providing the
50 * following associations are maintained:
51 *
52 * - client_socket_ht: associate a client's socket (fd) to its "struct client"
53 * This hash table owns the "struct client" which must thus be
54 * disposed-of on removal from the hash table.
55 *
56 * - channel_triggers_ht:
57 * associates a channel key to a list of
58 * struct lttng_trigger_list_nodes. The triggers in this list are
59 * those that have conditions that apply to this channel.
60 * This hash table owns the list, but not the triggers themselves.
61 *
62 * - channel_state_ht:
63 * associates a pair (channel key, channel domain) to its last
64 * sampled state received from the consumer daemon
65 * (struct channel_state).
66 * This previous sample is kept to implement edge-triggered
67 * conditions as we need to detect the state transitions.
68 * This hash table owns the channel state.
69 *
70 * - notification_trigger_clients_ht:
71 * associates notification-emitting triggers to clients
72 * (struct notification_client_ht_node) subscribed to those
73 * conditions.
74 * The condition's hash and match functions are used directly since
75 * all triggers in this hash table have the "notify" action.
76 * This hash table holds no ownership.
77 *
78 * - channels_ht:
79 * associates a channel_key to a struct channel_info. The hash table
80 * holds the ownership of the struct channel_info.
81 *
82 * - triggers_ht:
83 * associated a condition to a struct lttng_trigger_ht_element.
84 * The hash table holds the ownership of the
85 * lttng_trigger_ht_elements along with the triggers themselves.
86 *
87 * The thread reacts to the following internal events:
88 * 1) creation of a tracing channel,
89 * 2) destruction of a tracing channel,
90 * 3) registration of a trigger,
91 * 4) unregistration of a trigger,
92 * 5) reception of a channel monitor sample from the consumer daemon.
93 *
94 * Events specific to notification-emitting triggers:
95 * 6) connection of a notification client,
96 * 7) disconnection of a notification client,
97 * 8) subscription of a client to a conditions' notifications,
98 * 9) unsubscription of a client from a conditions' notifications,
99 *
100 *
101 * 1) Creation of a tracing channel
102 * - notification_trigger_clients_ht is traversed to identify
103 * triggers which apply to this new channel,
104 * - triggers identified are added to the channel_triggers_ht.
105 * - add channel to channels_ht
106 *
107 * 2) Destruction of a tracing channel
108 * - remove entry from channel_triggers_ht, releasing the list wrapper and
109 * elements,
110 * - remove entry from the channel_state_ht.
111 * - remove channel from channels_ht
112 *
113 * 3) Registration of a trigger
114 * - if the trigger's action is of type "notify",
115 * - traverse the list of conditions of every client to build a list of
116 * clients which have to be notified when this trigger's condition is met,
117 * - add list of clients (even if it is empty) to the
118 * notification_trigger_clients_ht,
119 * - add trigger to channel_triggers_ht (if applicable),
120 * - add trigger to triggers_ht
121 *
122 * 4) Unregistration of a trigger
123 * - if the trigger's action is of type "notify",
124 * - remove the trigger from the notification_trigger_clients_ht,
125 * - remove trigger from channel_triggers_ht (if applicable),
126 * - remove trigger from triggers_ht
127 *
128 * 5) Reception of a channel monitor sample from the consumer daemon
129 * - evaluate the conditions associated with the triggers found in
130 * the channel_triggers_ht,
131 * - if a condition evaluates to "true" and the condition is of type
132 * "notify", query the notification_trigger_clients_ht and send
133 * a notification to the clients.
134 *
135 * 6) Connection of a client
136 * - add client socket to the client_socket_ht.
137 *
138 * 7) Disconnection of a client
139 * - remove client socket from the client_socket_ht,
140 * - traverse all conditions to which the client is subscribed and remove
141 * the client from the notification_trigger_clients_ht.
142 *
143 * 8) Subscription of a client to a condition's notifications
144 * - Add the condition to the client's list of subscribed conditions,
145 * - Look-up notification_trigger_clients_ht and add the client to
146 * list of clients.
147 *
148 * 9) Unsubscription of a client to a condition's notifications
149 * - Remove the condition from the client's list of subscribed conditions,
150 * - Look-up notification_trigger_clients_ht and remove the client
151 * from the list of clients.
152 */
153
154/*
155 * Destroy the thread data previously created by the init function.
156 */
157void notification_thread_handle_destroy(
158 struct notification_thread_handle *handle)
159{
160 int ret;
161 struct notification_thread_command *cmd, *tmp;
162
163 if (!handle) {
164 goto end;
165 }
166
167 if (handle->cmd_queue.event_fd < 0) {
168 goto end;
169 }
170 ret = close(handle->cmd_queue.event_fd);
171 if (ret < 0) {
172 PERROR("close notification command queue event_fd");
173 }
174
175 pthread_mutex_lock(&handle->cmd_queue.lock);
176 /* Purge queue of in-flight commands and mark them as cancelled. */
177 cds_list_for_each_entry_safe(cmd, tmp, &handle->cmd_queue.list,
178 cmd_list_node) {
179 cds_list_del(&cmd->cmd_list_node);
180 cmd->reply_code = LTTNG_ERR_COMMAND_CANCELLED;
181 futex_nto1_wake(&cmd->reply_futex);
182 }
183 pthread_mutex_unlock(&handle->cmd_queue.lock);
184 pthread_mutex_destroy(&handle->cmd_queue.lock);
185
186 if (handle->channel_monitoring_pipes.ust32_consumer >= 0) {
187 ret = close(handle->channel_monitoring_pipes.ust32_consumer);
188 if (ret) {
189 PERROR("close 32-bit consumer channel monitoring pipe");
190 }
191 }
192 if (handle->channel_monitoring_pipes.ust64_consumer >= 0) {
193 ret = close(handle->channel_monitoring_pipes.ust64_consumer);
194 if (ret) {
195 PERROR("close 64-bit consumer channel monitoring pipe");
196 }
197 }
198 if (handle->channel_monitoring_pipes.kernel_consumer >= 0) {
199 ret = close(handle->channel_monitoring_pipes.kernel_consumer);
200 if (ret) {
201 PERROR("close kernel consumer channel monitoring pipe");
202 }
203 }
204end:
205 free(handle);
206}
207
208struct notification_thread_handle *notification_thread_handle_create(
209 struct lttng_pipe *ust32_channel_monitor_pipe,
210 struct lttng_pipe *ust64_channel_monitor_pipe,
211 struct lttng_pipe *kernel_channel_monitor_pipe)
212{
213 int ret;
214 struct notification_thread_handle *handle;
215
216 handle = zmalloc(sizeof(*handle));
217 if (!handle) {
218 goto end;
219 }
220
221 /* FIXME Replace eventfd by a pipe to support older kernels. */
222 handle->cmd_queue.event_fd = eventfd(0, EFD_CLOEXEC);
223 if (handle->cmd_queue.event_fd < 0) {
224 PERROR("eventfd notification command queue");
225 goto error;
226 }
227 CDS_INIT_LIST_HEAD(&handle->cmd_queue.list);
228 ret = pthread_mutex_init(&handle->cmd_queue.lock, NULL);
229 if (ret) {
230 goto error;
231 }
232
233 if (ust32_channel_monitor_pipe) {
234 handle->channel_monitoring_pipes.ust32_consumer =
235 lttng_pipe_release_readfd(
236 ust32_channel_monitor_pipe);
237 if (handle->channel_monitoring_pipes.ust32_consumer < 0) {
238 goto error;
239 }
240 } else {
241 handle->channel_monitoring_pipes.ust32_consumer = -1;
242 }
243 if (ust64_channel_monitor_pipe) {
244 handle->channel_monitoring_pipes.ust64_consumer =
245 lttng_pipe_release_readfd(
246 ust64_channel_monitor_pipe);
247 if (handle->channel_monitoring_pipes.ust64_consumer < 0) {
248 goto error;
249 }
250 } else {
251 handle->channel_monitoring_pipes.ust64_consumer = -1;
252 }
253 if (kernel_channel_monitor_pipe) {
254 handle->channel_monitoring_pipes.kernel_consumer =
255 lttng_pipe_release_readfd(
256 kernel_channel_monitor_pipe);
257 if (handle->channel_monitoring_pipes.kernel_consumer < 0) {
258 goto error;
259 }
260 } else {
261 handle->channel_monitoring_pipes.kernel_consumer = -1;
262 }
263end:
264 return handle;
265error:
266 notification_thread_handle_destroy(handle);
267 return NULL;
268}
269
270static
271char *get_notification_channel_sock_path(void)
272{
273 int ret;
274 bool is_root = !getuid();
275 char *sock_path;
276
277 sock_path = zmalloc(LTTNG_PATH_MAX);
278 if (!sock_path) {
279 goto error;
280 }
281
282 if (is_root) {
283 ret = snprintf(sock_path, LTTNG_PATH_MAX,
284 DEFAULT_GLOBAL_NOTIFICATION_CHANNEL_UNIX_SOCK);
285 if (ret < 0) {
286 goto error;
287 }
288 } else {
289 char *home_path = utils_get_home_dir();
290
291 if (!home_path) {
292 ERR("Can't get HOME directory for socket creation");
293 goto error;
294 }
295
296 ret = snprintf(sock_path, LTTNG_PATH_MAX,
297 DEFAULT_HOME_NOTIFICATION_CHANNEL_UNIX_SOCK,
298 home_path);
299 if (ret < 0) {
300 goto error;
301 }
302 }
303
304 return sock_path;
305error:
306 free(sock_path);
307 return NULL;
308}
309
310static
311void notification_channel_socket_destroy(int fd)
312{
313 int ret;
314 char *sock_path = get_notification_channel_sock_path();
315
316 DBG("[notification-thread] Destroying notification channel socket");
317
318 if (sock_path) {
319 ret = unlink(sock_path);
320 free(sock_path);
321 if (ret < 0) {
322 PERROR("unlink notification channel socket");
323 }
324 }
325
326 ret = close(fd);
327 if (ret) {
328 PERROR("close notification channel socket");
329 }
330}
331
332static
333int notification_channel_socket_create(void)
334{
335 int fd = -1, ret;
336 char *sock_path = get_notification_channel_sock_path();
337
338 DBG("[notification-thread] Creating notification channel UNIX socket at %s",
339 sock_path);
340
341 ret = lttcomm_create_unix_sock(sock_path);
342 if (ret < 0) {
343 ERR("[notification-thread] Failed to create notification socket");
344 goto error;
345 }
346 fd = ret;
347
348 ret = chmod(sock_path, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
349 if (ret < 0) {
350 ERR("Set file permissions failed: %s", sock_path);
351 PERROR("chmod notification channel socket");
352 goto error;
353 }
354
355 if (getuid() == 0) {
356 ret = chown(sock_path, 0,
357 utils_get_group_id(tracing_group_name));
358 if (ret) {
359 ERR("Failed to set the notification channel socket's group");
360 ret = -1;
361 goto error;
362 }
363 }
364
365 DBG("[notification-thread] Notification channel UNIX socket created (fd = %i)",
366 fd);
367 free(sock_path);
368 return fd;
369error:
370 if (fd >= 0 && close(fd) < 0) {
371 PERROR("close notification channel socket");
372 }
373 free(sock_path);
374 return ret;
375}
376
377static
378int init_poll_set(struct lttng_poll_event *poll_set,
379 struct notification_thread_handle *handle,
380 int notification_channel_socket)
381{
382 int ret;
383
384 /*
385 * Create pollset with size 5:
386 * - notification channel socket (listen for new connections),
387 * - command queue event fd (internal sessiond commands),
388 * - consumerd (32-bit user space) channel monitor pipe,
389 * - consumerd (64-bit user space) channel monitor pipe,
390 * - consumerd (kernel) channel monitor pipe.
391 */
392 ret = lttng_poll_create(poll_set, 5, LTTNG_CLOEXEC);
393 if (ret < 0) {
394 goto end;
395 }
396
397 ret = lttng_poll_add(poll_set, notification_channel_socket,
398 LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP);
399 if (ret < 0) {
400 ERR("[notification-thread] Failed to add notification channel socket to pollset");
401 goto error;
402 }
403 ret = lttng_poll_add(poll_set, handle->cmd_queue.event_fd,
404 LPOLLIN | LPOLLERR);
405 if (ret < 0) {
406 ERR("[notification-thread] Failed to add notification command queue event fd to pollset");
407 goto error;
408 }
409 ret = lttng_poll_add(poll_set,
410 handle->channel_monitoring_pipes.ust32_consumer,
411 LPOLLIN | LPOLLERR);
412 if (ret < 0) {
413 ERR("[notification-thread] Failed to add ust-32 channel monitoring pipe fd to pollset");
414 goto error;
415 }
416 ret = lttng_poll_add(poll_set,
417 handle->channel_monitoring_pipes.ust64_consumer,
418 LPOLLIN | LPOLLERR);
419 if (ret < 0) {
420 ERR("[notification-thread] Failed to add ust-64 channel monitoring pipe fd to pollset");
421 goto error;
422 }
423 if (handle->channel_monitoring_pipes.kernel_consumer < 0) {
424 goto end;
425 }
426 ret = lttng_poll_add(poll_set,
427 handle->channel_monitoring_pipes.kernel_consumer,
428 LPOLLIN | LPOLLERR);
429 if (ret < 0) {
430 ERR("[notification-thread] Failed to add kernel channel monitoring pipe fd to pollset");
431 goto error;
432 }
433end:
434 return ret;
435error:
436 lttng_poll_clean(poll_set);
437 return ret;
438}
439
440static
441void fini_thread_state(struct notification_thread_state *state)
442{
443 int ret;
444
445 if (state->client_socket_ht) {
446 ret = handle_notification_thread_client_disconnect_all(state);
447 assert(!ret);
448 ret = cds_lfht_destroy(state->client_socket_ht, NULL);
449 assert(!ret);
450 }
451 if (state->triggers_ht) {
452 ret = handle_notification_thread_trigger_unregister_all(state);
453 assert(!ret);
454 ret = cds_lfht_destroy(state->triggers_ht, NULL);
455 assert(!ret);
456 }
457 if (state->channel_triggers_ht) {
458 ret = cds_lfht_destroy(state->channel_triggers_ht, NULL);
459 assert(!ret);
460 }
461 if (state->channel_state_ht) {
462 ret = cds_lfht_destroy(state->channel_state_ht, NULL);
463 assert(!ret);
464 }
465 if (state->notification_trigger_clients_ht) {
466 ret = cds_lfht_destroy(state->notification_trigger_clients_ht,
467 NULL);
468 assert(!ret);
469 }
470 if (state->channels_ht) {
471 ret = cds_lfht_destroy(state->channels_ht,
472 NULL);
473 assert(!ret);
474 }
475
476 if (state->notification_channel_socket >= 0) {
477 notification_channel_socket_destroy(
478 state->notification_channel_socket);
479 }
480 lttng_poll_clean(&state->events);
481}
482
483static
484int init_thread_state(struct notification_thread_handle *handle,
485 struct notification_thread_state *state)
486{
487 int ret;
488
489 memset(state, 0, sizeof(*state));
490 state->notification_channel_socket = -1;
491 lttng_poll_init(&state->events);
492
493 ret = notification_channel_socket_create();
494 if (ret < 0) {
495 goto end;
496 }
497 state->notification_channel_socket = ret;
498
499 ret = init_poll_set(&state->events, handle,
500 state->notification_channel_socket);
501 if (ret) {
502 goto end;
503 }
504
505 DBG("[notification-thread] Listening on notification channel socket");
506 ret = lttcomm_listen_unix_sock(state->notification_channel_socket);
507 if (ret < 0) {
508 ERR("[notification-thread] Listen failed on notification channel socket");
509 goto error;
510 }
511
512 state->client_socket_ht = cds_lfht_new(DEFAULT_HT_SIZE, 1, 0,
513 CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
514 if (!state->client_socket_ht) {
515 goto error;
516 }
517
518 state->channel_triggers_ht = cds_lfht_new(DEFAULT_HT_SIZE, 1, 0,
519 CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
520 if (!state->channel_triggers_ht) {
521 goto error;
522 }
523
524 state->channel_state_ht = cds_lfht_new(DEFAULT_HT_SIZE, 1, 0,
525 CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
526 if (!state->channel_state_ht) {
527 goto error;
528 }
529
530 state->notification_trigger_clients_ht = cds_lfht_new(DEFAULT_HT_SIZE,
531 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
532 if (!state->notification_trigger_clients_ht) {
533 goto error;
534 }
535
536 state->channels_ht = cds_lfht_new(DEFAULT_HT_SIZE,
537 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
538 if (!state->channels_ht) {
539 goto error;
540 }
541
542 state->triggers_ht = cds_lfht_new(DEFAULT_HT_SIZE,
543 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
544 if (!state->triggers_ht) {
545 goto error;
546 }
547end:
548 return 0;
549error:
550 fini_thread_state(state);
551 return -1;
552}
553
554static
555int handle_channel_monitoring_pipe(int fd, uint32_t revents,
556 struct notification_thread_handle *handle,
557 struct notification_thread_state *state)
558{
559 int ret = 0;
560 enum lttng_domain_type domain;
561
562 if (fd == handle->channel_monitoring_pipes.ust32_consumer ||
563 fd == handle->channel_monitoring_pipes.ust64_consumer) {
564 domain = LTTNG_DOMAIN_UST;
565 } else if (fd == handle->channel_monitoring_pipes.kernel_consumer) {
566 domain = LTTNG_DOMAIN_KERNEL;
567 } else {
568 abort();
569 }
570
571 if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
572 ret = lttng_poll_del(&state->events, fd);
573 if (ret) {
574 ERR("[notification-thread] Failed to remove consumer monitoring pipe from poll set");
575 }
576 goto end;
577 }
578
579 ret = handle_notification_thread_channel_sample(
580 state, fd, domain);
581 if (ret) {
582 ERR("[notification-thread] Consumer sample handling error occured");
583 ret = -1;
584 goto end;
585 }
586end:
587 return ret;
588}
589
590/*
591 * This thread services notification channel clients and commands received
592 * from various lttng-sessiond components over a command queue.
593 */
594void *thread_notification(void *data)
595{
596 int ret;
597 struct notification_thread_handle *handle = data;
598 struct notification_thread_state state;
599
600 DBG("[notification-thread] Started notification thread");
601
602 if (!handle) {
603 ERR("[notification-thread] Invalid thread context provided");
604 goto end;
605 }
606
607 rcu_register_thread();
608 rcu_thread_online();
609
610 health_register(health_sessiond, HEALTH_SESSIOND_TYPE_NOTIFICATION);
611 health_code_update();
612
613 ret = init_thread_state(handle, &state);
614 if (ret) {
615 goto end;
616 }
617
618 /* Ready to handle client connections. */
619 sessiond_notify_ready();
620
621 while (true) {
622 int fd_count, i;
623
624 health_poll_entry();
625 DBG("[notification-thread] Entering poll wait");
626 ret = lttng_poll_wait(&state.events, -1);
627 DBG("[notification-thread] Poll wait returned (%i)", ret);
628 health_poll_exit();
629 if (ret < 0) {
630 /*
631 * Restart interrupted system call.
632 */
633 if (errno == EINTR) {
634 continue;
635 }
636 ERR("[notification-thread] Error encountered during lttng_poll_wait (%i)", ret);
637 goto error;
638 }
639
640 fd_count = ret;
641 for (i = 0; i < fd_count; i++) {
642 int fd = LTTNG_POLL_GETFD(&state.events, i);
643 uint32_t revents = LTTNG_POLL_GETEV(&state.events, i);
644
645 DBG("[notification-thread] Handling fd (%i) activity (%u)", fd, revents);
646
647 if (fd == state.notification_channel_socket) {
648 if (revents & LPOLLIN) {
649 ret = handle_notification_thread_client_connect(
650 &state);
651 if (ret < 0) {
652 goto error;
653 }
654 } else if (revents &
655 (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
656 ERR("[notification-thread] Notification socket poll error");
657 goto error;
658 } else {
659 ERR("[notification-thread] Unexpected poll events %u for notification socket %i", revents, fd);
660 goto error;
661 }
662 } else if (fd == handle->cmd_queue.event_fd) {
663 ret = handle_notification_thread_command(handle,
664 &state);
665 if (ret < 0) {
666 DBG("[notification-thread] Error encountered while servicing command queue");
667 goto error;
668 } else if (ret > 0) {
669 goto exit;
670 }
671 } else if (fd == handle->channel_monitoring_pipes.ust32_consumer ||
672 fd == handle->channel_monitoring_pipes.ust64_consumer ||
673 fd == handle->channel_monitoring_pipes.kernel_consumer) {
674 ret = handle_channel_monitoring_pipe(fd,
675 revents, handle, &state);
676 if (ret) {
677 goto error;
678 }
679 } else {
680 /* Activity on a client's socket. */
681 if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
682 /*
683 * It doesn't matter if a command was
684 * pending on the client socket at this
685 * point since it now has no way to
686 * receive the notifications to which
687 * it was subscribing or unsubscribing.
688 */
689 ret = handle_notification_thread_client_disconnect(
690 fd, &state);
691 if (ret) {
692 goto error;
693 }
694 } else {
695 if (revents & LPOLLIN) {
696 ret = handle_notification_thread_client_in(
697 &state, fd);
698 if (ret) {
699 goto error;
700 }
701 }
702
703 if (revents & LPOLLOUT) {
704 ret = handle_notification_thread_client_out(
705 &state, fd);
706 if (ret) {
707 goto error;
708 }
709 }
710 }
711 }
712 }
713 }
714exit:
715error:
716 fini_thread_state(&state);
717 health_unregister(health_sessiond);
718 rcu_thread_offline();
719 rcu_unregister_thread();
720end:
721 return NULL;
722}
This page took 0.048208 seconds and 4 git commands to generate.