Make captured field values available to event rule cond. evaluation
[lttng-tools.git] / src / bin / lttng-sessiond / notification-thread-events.c
1 /*
2 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * SPDX-License-Identifier: GPL-2.0-only
5 *
6 */
7
8 #include "lttng/action/action.h"
9 #include "lttng/trigger/trigger-internal.h"
10 #define _LGPL_SOURCE
11 #include <urcu.h>
12 #include <urcu/rculfhash.h>
13
14 #include <common/defaults.h>
15 #include <common/error.h>
16 #include <common/futex.h>
17 #include <common/unix.h>
18 #include <common/dynamic-buffer.h>
19 #include <common/hashtable/utils.h>
20 #include <common/sessiond-comm/sessiond-comm.h>
21 #include <common/macros.h>
22 #include <lttng/condition/condition.h>
23 #include <lttng/action/action-internal.h>
24 #include <lttng/notification/notification-internal.h>
25 #include <lttng/condition/condition-internal.h>
26 #include <lttng/condition/buffer-usage-internal.h>
27 #include <lttng/condition/session-consumed-size-internal.h>
28 #include <lttng/condition/session-rotation-internal.h>
29 #include <lttng/condition/event-rule-internal.h>
30 #include <lttng/domain-internal.h>
31 #include <lttng/notification/channel-internal.h>
32 #include <lttng/trigger/trigger-internal.h>
33 #include <lttng/event-rule/event-rule-internal.h>
34
35 #include <time.h>
36 #include <unistd.h>
37 #include <assert.h>
38 #include <inttypes.h>
39 #include <fcntl.h>
40
41 #include "condition-internal.h"
42 #include "notification-thread.h"
43 #include "notification-thread-events.h"
44 #include "notification-thread-commands.h"
45 #include "lttng-sessiond.h"
46 #include "kernel.h"
47
48 #define CLIENT_POLL_MASK_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
49 #define CLIENT_POLL_MASK_IN_OUT (CLIENT_POLL_MASK_IN | LPOLLOUT)
50
51 /* The tracers currently limit the capture size to PIPE_BUF (4kb on linux). */
52 #define MAX_CAPTURE_SIZE (PIPE_BUF)
53
54 enum lttng_object_type {
55 LTTNG_OBJECT_TYPE_UNKNOWN,
56 LTTNG_OBJECT_TYPE_NONE,
57 LTTNG_OBJECT_TYPE_CHANNEL,
58 LTTNG_OBJECT_TYPE_SESSION,
59 };
60
61 struct lttng_trigger_list_element {
62 /* No ownership of the trigger object is assumed. */
63 struct lttng_trigger *trigger;
64 struct cds_list_head node;
65 };
66
67 struct lttng_channel_trigger_list {
68 struct channel_key channel_key;
69 /* List of struct lttng_trigger_list_element. */
70 struct cds_list_head list;
71 /* Node in the channel_triggers_ht */
72 struct cds_lfht_node channel_triggers_ht_node;
73 /* call_rcu delayed reclaim. */
74 struct rcu_head rcu_node;
75 };
76
77 /*
78 * List of triggers applying to a given session.
79 *
80 * See:
81 * - lttng_session_trigger_list_create()
82 * - lttng_session_trigger_list_build()
83 * - lttng_session_trigger_list_destroy()
84 * - lttng_session_trigger_list_add()
85 */
86 struct lttng_session_trigger_list {
87 /*
88 * Not owned by this; points to the session_info structure's
89 * session name.
90 */
91 const char *session_name;
92 /* List of struct lttng_trigger_list_element. */
93 struct cds_list_head list;
94 /* Node in the session_triggers_ht */
95 struct cds_lfht_node session_triggers_ht_node;
96 /*
97 * Weak reference to the notification system's session triggers
98 * hashtable.
99 *
100 * The session trigger list structure structure is owned by
101 * the session's session_info.
102 *
103 * The session_info is kept alive the the channel_infos holding a
104 * reference to it (reference counting). When those channels are
105 * destroyed (at runtime or on teardown), the reference they hold
106 * to the session_info are released. On destruction of session_info,
107 * session_info_destroy() will remove the list of triggers applying
108 * to this session from the notification system's state.
109 *
110 * This implies that the session_triggers_ht must be destroyed
111 * after the channels.
112 */
113 struct cds_lfht *session_triggers_ht;
114 /* Used for delayed RCU reclaim. */
115 struct rcu_head rcu_node;
116 };
117
118 struct lttng_trigger_ht_element {
119 struct lttng_trigger *trigger;
120 struct cds_lfht_node node;
121 struct cds_lfht_node node_by_name_uid;
122 /* call_rcu delayed reclaim. */
123 struct rcu_head rcu_node;
124 };
125
126 struct lttng_condition_list_element {
127 struct lttng_condition *condition;
128 struct cds_list_head node;
129 };
130
131 struct channel_state_sample {
132 struct channel_key key;
133 struct cds_lfht_node channel_state_ht_node;
134 uint64_t highest_usage;
135 uint64_t lowest_usage;
136 uint64_t channel_total_consumed;
137 /* call_rcu delayed reclaim. */
138 struct rcu_head rcu_node;
139 };
140
141 static unsigned long hash_channel_key(struct channel_key *key);
142 static int evaluate_buffer_condition(const struct lttng_condition *condition,
143 struct lttng_evaluation **evaluation,
144 const struct notification_thread_state *state,
145 const struct channel_state_sample *previous_sample,
146 const struct channel_state_sample *latest_sample,
147 uint64_t previous_session_consumed_total,
148 uint64_t latest_session_consumed_total,
149 struct channel_info *channel_info);
150 static
151 int send_evaluation_to_clients(const struct lttng_trigger *trigger,
152 const struct lttng_evaluation *evaluation,
153 struct notification_client_list *client_list,
154 struct notification_thread_state *state,
155 uid_t channel_uid, gid_t channel_gid);
156
157
158 /* session_info API */
159 static
160 void session_info_destroy(void *_data);
161 static
162 void session_info_get(struct session_info *session_info);
163 static
164 void session_info_put(struct session_info *session_info);
165 static
166 struct session_info *session_info_create(const char *name,
167 uid_t uid, gid_t gid,
168 struct lttng_session_trigger_list *trigger_list,
169 struct cds_lfht *sessions_ht);
170 static
171 void session_info_add_channel(struct session_info *session_info,
172 struct channel_info *channel_info);
173 static
174 void session_info_remove_channel(struct session_info *session_info,
175 struct channel_info *channel_info);
176
177 /* lttng_session_trigger_list API */
178 static
179 struct lttng_session_trigger_list *lttng_session_trigger_list_create(
180 const char *session_name,
181 struct cds_lfht *session_triggers_ht);
182 static
183 struct lttng_session_trigger_list *lttng_session_trigger_list_build(
184 const struct notification_thread_state *state,
185 const char *session_name);
186 static
187 void lttng_session_trigger_list_destroy(
188 struct lttng_session_trigger_list *list);
189 static
190 int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
191 struct lttng_trigger *trigger);
192
193 static
194 int client_handle_transmission_status(
195 struct notification_client *client,
196 enum client_transmission_status transmission_status,
197 struct notification_thread_state *state);
198
199 static
200 int handle_one_event_notifier_notification(
201 struct notification_thread_state *state,
202 int pipe, enum lttng_domain_type domain);
203
204 static
205 void free_lttng_trigger_ht_element_rcu(struct rcu_head *node);
206
207 static
208 int match_client_socket(struct cds_lfht_node *node, const void *key)
209 {
210 /* This double-cast is intended to supress pointer-to-cast warning. */
211 const int socket = (int) (intptr_t) key;
212 const struct notification_client *client = caa_container_of(node,
213 struct notification_client, client_socket_ht_node);
214
215 return client->socket == socket;
216 }
217
218 static
219 int match_client_id(struct cds_lfht_node *node, const void *key)
220 {
221 /* This double-cast is intended to supress pointer-to-cast warning. */
222 const notification_client_id id = *((notification_client_id *) key);
223 const struct notification_client *client = caa_container_of(
224 node, struct notification_client, client_id_ht_node);
225
226 return client->id == id;
227 }
228
229 static
230 int match_channel_trigger_list(struct cds_lfht_node *node, const void *key)
231 {
232 struct channel_key *channel_key = (struct channel_key *) key;
233 struct lttng_channel_trigger_list *trigger_list;
234
235 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
236 channel_triggers_ht_node);
237
238 return !!((channel_key->key == trigger_list->channel_key.key) &&
239 (channel_key->domain == trigger_list->channel_key.domain));
240 }
241
242 static
243 int match_session_trigger_list(struct cds_lfht_node *node, const void *key)
244 {
245 const char *session_name = (const char *) key;
246 struct lttng_session_trigger_list *trigger_list;
247
248 trigger_list = caa_container_of(node, struct lttng_session_trigger_list,
249 session_triggers_ht_node);
250
251 return !!(strcmp(trigger_list->session_name, session_name) == 0);
252 }
253
254 static
255 int match_channel_state_sample(struct cds_lfht_node *node, const void *key)
256 {
257 struct channel_key *channel_key = (struct channel_key *) key;
258 struct channel_state_sample *sample;
259
260 sample = caa_container_of(node, struct channel_state_sample,
261 channel_state_ht_node);
262
263 return !!((channel_key->key == sample->key.key) &&
264 (channel_key->domain == sample->key.domain));
265 }
266
267 static
268 int match_channel_info(struct cds_lfht_node *node, const void *key)
269 {
270 struct channel_key *channel_key = (struct channel_key *) key;
271 struct channel_info *channel_info;
272
273 channel_info = caa_container_of(node, struct channel_info,
274 channels_ht_node);
275
276 return !!((channel_key->key == channel_info->key.key) &&
277 (channel_key->domain == channel_info->key.domain));
278 }
279
280 static
281 int match_trigger(struct cds_lfht_node *node, const void *key)
282 {
283 struct lttng_trigger *trigger_key = (struct lttng_trigger *) key;
284 struct lttng_trigger_ht_element *trigger_ht_element;
285
286 trigger_ht_element = caa_container_of(node, struct lttng_trigger_ht_element,
287 node);
288
289 return !!lttng_trigger_is_equal(trigger_key, trigger_ht_element->trigger);
290 }
291
292 static
293 int match_trigger_token(struct cds_lfht_node *node, const void *key)
294 {
295 const uint64_t *_key = key;
296 struct notification_trigger_tokens_ht_element *element;
297
298 element = caa_container_of(node,
299 struct notification_trigger_tokens_ht_element, node);
300 return *_key == element->token;
301 }
302
303 static
304 int match_client_list_condition(struct cds_lfht_node *node, const void *key)
305 {
306 struct lttng_condition *condition_key = (struct lttng_condition *) key;
307 struct notification_client_list *client_list;
308 const struct lttng_condition *condition;
309
310 assert(condition_key);
311
312 client_list = caa_container_of(node, struct notification_client_list,
313 notification_trigger_clients_ht_node);
314 condition = lttng_trigger_get_const_condition(client_list->trigger);
315
316 return !!lttng_condition_is_equal(condition_key, condition);
317 }
318
319 static
320 int match_session(struct cds_lfht_node *node, const void *key)
321 {
322 const char *name = key;
323 struct session_info *session_info = caa_container_of(
324 node, struct session_info, sessions_ht_node);
325
326 return !strcmp(session_info->name, name);
327 }
328
329 static
330 const char *notification_command_type_str(
331 enum notification_thread_command_type type)
332 {
333 switch (type) {
334 case NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER:
335 return "REGISTER_TRIGGER";
336 case NOTIFICATION_COMMAND_TYPE_UNREGISTER_TRIGGER:
337 return "UNREGISTER_TRIGGER";
338 case NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL:
339 return "ADD_CHANNEL";
340 case NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL:
341 return "REMOVE_CHANNEL";
342 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING:
343 return "SESSION_ROTATION_ONGOING";
344 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED:
345 return "SESSION_ROTATION_COMPLETED";
346 case NOTIFICATION_COMMAND_TYPE_ADD_TRACER_EVENT_SOURCE:
347 return "ADD_TRACER_EVENT_SOURCE";
348 case NOTIFICATION_COMMAND_TYPE_REMOVE_TRACER_EVENT_SOURCE:
349 return "REMOVE_TRACER_EVENT_SOURCE";
350 case NOTIFICATION_COMMAND_TYPE_LIST_TRIGGERS:
351 return "LIST_TRIGGERS";
352 case NOTIFICATION_COMMAND_TYPE_QUIT:
353 return "QUIT";
354 case NOTIFICATION_COMMAND_TYPE_CLIENT_COMMUNICATION_UPDATE:
355 return "CLIENT_COMMUNICATION_UPDATE";
356 default:
357 abort();
358 }
359 }
360
361 /*
362 * Match trigger based on name and credentials only.
363 * Name duplication is NOT allowed for the same uid.
364 */
365 static
366 int match_trigger_by_name_uid(struct cds_lfht_node *node,
367 const void *key)
368 {
369 bool match = false;
370 const char *name;
371 const char *key_name;
372 enum lttng_trigger_status status;
373 const struct lttng_credentials *key_creds;
374 const struct lttng_credentials *node_creds;
375 const struct lttng_trigger *trigger_key =
376 (const struct lttng_trigger *) key;
377 const struct lttng_trigger_ht_element *trigger_ht_element =
378 caa_container_of(node,
379 struct lttng_trigger_ht_element,
380 node_by_name_uid);
381
382 status = lttng_trigger_get_name(trigger_ht_element->trigger, &name);
383 assert(status == LTTNG_TRIGGER_STATUS_OK);
384
385 status = lttng_trigger_get_name(trigger_key, &key_name);
386 assert(status == LTTNG_TRIGGER_STATUS_OK);
387
388 /* Compare the names. */
389 if (strcmp(name, key_name) != 0) {
390 goto end;
391 }
392
393 /* Compare the owners' UIDs. */
394 key_creds = lttng_trigger_get_credentials(trigger_key);
395 node_creds = lttng_trigger_get_credentials(trigger_ht_element->trigger);
396
397 match = lttng_credentials_is_equal_uid(key_creds, node_creds);
398
399 end:
400 return match;
401 }
402
403 /*
404 * Hash trigger based on name and credentials only.
405 */
406 static
407 unsigned long hash_trigger_by_name_uid(const struct lttng_trigger *trigger)
408 {
409 unsigned long hash = 0;
410 const struct lttng_credentials *trigger_creds;
411 const char *trigger_name;
412 enum lttng_trigger_status status;
413
414 status = lttng_trigger_get_name(trigger, &trigger_name);
415 if (status == LTTNG_TRIGGER_STATUS_OK) {
416 hash = hash_key_str(trigger_name, lttng_ht_seed);
417 }
418
419 trigger_creds = lttng_trigger_get_credentials(trigger);
420 hash ^= hash_key_ulong((void *) (unsigned long) LTTNG_OPTIONAL_GET(trigger_creds->uid),
421 lttng_ht_seed);
422
423 return hash;
424 }
425
426 static
427 unsigned long hash_channel_key(struct channel_key *key)
428 {
429 unsigned long key_hash = hash_key_u64(&key->key, lttng_ht_seed);
430 unsigned long domain_hash = hash_key_ulong(
431 (void *) (unsigned long) key->domain, lttng_ht_seed);
432
433 return key_hash ^ domain_hash;
434 }
435
436 static
437 unsigned long hash_client_socket(int socket)
438 {
439 return hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed);
440 }
441
442 static
443 unsigned long hash_client_id(notification_client_id id)
444 {
445 return hash_key_u64(&id, lttng_ht_seed);
446 }
447
448 /*
449 * Get the type of object to which a given condition applies. Bindings let
450 * the notification system evaluate a trigger's condition when a given
451 * object's state is updated.
452 *
453 * For instance, a condition bound to a channel will be evaluated everytime
454 * the channel's state is changed by a channel monitoring sample.
455 */
456 static
457 enum lttng_object_type get_condition_binding_object(
458 const struct lttng_condition *condition)
459 {
460 switch (lttng_condition_get_type(condition)) {
461 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
462 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
463 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
464 return LTTNG_OBJECT_TYPE_CHANNEL;
465 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
466 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
467 return LTTNG_OBJECT_TYPE_SESSION;
468 case LTTNG_CONDITION_TYPE_EVENT_RULE_HIT:
469 return LTTNG_OBJECT_TYPE_NONE;
470 default:
471 return LTTNG_OBJECT_TYPE_UNKNOWN;
472 }
473 }
474
475 static
476 void free_channel_info_rcu(struct rcu_head *node)
477 {
478 free(caa_container_of(node, struct channel_info, rcu_node));
479 }
480
481 static
482 void channel_info_destroy(struct channel_info *channel_info)
483 {
484 if (!channel_info) {
485 return;
486 }
487
488 if (channel_info->session_info) {
489 session_info_remove_channel(channel_info->session_info,
490 channel_info);
491 session_info_put(channel_info->session_info);
492 }
493 if (channel_info->name) {
494 free(channel_info->name);
495 }
496 call_rcu(&channel_info->rcu_node, free_channel_info_rcu);
497 }
498
499 static
500 void free_session_info_rcu(struct rcu_head *node)
501 {
502 free(caa_container_of(node, struct session_info, rcu_node));
503 }
504
505 /* Don't call directly, use the ref-counting mechanism. */
506 static
507 void session_info_destroy(void *_data)
508 {
509 struct session_info *session_info = _data;
510 int ret;
511
512 assert(session_info);
513 if (session_info->channel_infos_ht) {
514 ret = cds_lfht_destroy(session_info->channel_infos_ht, NULL);
515 if (ret) {
516 ERR("[notification-thread] Failed to destroy channel information hash table");
517 }
518 }
519 lttng_session_trigger_list_destroy(session_info->trigger_list);
520
521 rcu_read_lock();
522 cds_lfht_del(session_info->sessions_ht,
523 &session_info->sessions_ht_node);
524 rcu_read_unlock();
525 free(session_info->name);
526 call_rcu(&session_info->rcu_node, free_session_info_rcu);
527 }
528
529 static
530 void session_info_get(struct session_info *session_info)
531 {
532 if (!session_info) {
533 return;
534 }
535 lttng_ref_get(&session_info->ref);
536 }
537
538 static
539 void session_info_put(struct session_info *session_info)
540 {
541 if (!session_info) {
542 return;
543 }
544 lttng_ref_put(&session_info->ref);
545 }
546
547 static
548 struct session_info *session_info_create(const char *name, uid_t uid, gid_t gid,
549 struct lttng_session_trigger_list *trigger_list,
550 struct cds_lfht *sessions_ht)
551 {
552 struct session_info *session_info;
553
554 assert(name);
555
556 session_info = zmalloc(sizeof(*session_info));
557 if (!session_info) {
558 goto end;
559 }
560 lttng_ref_init(&session_info->ref, session_info_destroy);
561
562 session_info->channel_infos_ht = cds_lfht_new(DEFAULT_HT_SIZE,
563 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
564 if (!session_info->channel_infos_ht) {
565 goto error;
566 }
567
568 cds_lfht_node_init(&session_info->sessions_ht_node);
569 session_info->name = strdup(name);
570 if (!session_info->name) {
571 goto error;
572 }
573 session_info->uid = uid;
574 session_info->gid = gid;
575 session_info->trigger_list = trigger_list;
576 session_info->sessions_ht = sessions_ht;
577 end:
578 return session_info;
579 error:
580 session_info_put(session_info);
581 return NULL;
582 }
583
584 static
585 void session_info_add_channel(struct session_info *session_info,
586 struct channel_info *channel_info)
587 {
588 rcu_read_lock();
589 cds_lfht_add(session_info->channel_infos_ht,
590 hash_channel_key(&channel_info->key),
591 &channel_info->session_info_channels_ht_node);
592 rcu_read_unlock();
593 }
594
595 static
596 void session_info_remove_channel(struct session_info *session_info,
597 struct channel_info *channel_info)
598 {
599 rcu_read_lock();
600 cds_lfht_del(session_info->channel_infos_ht,
601 &channel_info->session_info_channels_ht_node);
602 rcu_read_unlock();
603 }
604
605 static
606 struct channel_info *channel_info_create(const char *channel_name,
607 struct channel_key *channel_key, uint64_t channel_capacity,
608 struct session_info *session_info)
609 {
610 struct channel_info *channel_info = zmalloc(sizeof(*channel_info));
611
612 if (!channel_info) {
613 goto end;
614 }
615
616 cds_lfht_node_init(&channel_info->channels_ht_node);
617 cds_lfht_node_init(&channel_info->session_info_channels_ht_node);
618 memcpy(&channel_info->key, channel_key, sizeof(*channel_key));
619 channel_info->capacity = channel_capacity;
620
621 channel_info->name = strdup(channel_name);
622 if (!channel_info->name) {
623 goto error;
624 }
625
626 /*
627 * Set the references between session and channel infos:
628 * - channel_info holds a strong reference to session_info
629 * - session_info holds a weak reference to channel_info
630 */
631 session_info_get(session_info);
632 session_info_add_channel(session_info, channel_info);
633 channel_info->session_info = session_info;
634 end:
635 return channel_info;
636 error:
637 channel_info_destroy(channel_info);
638 return NULL;
639 }
640
641 LTTNG_HIDDEN
642 bool notification_client_list_get(struct notification_client_list *list)
643 {
644 return urcu_ref_get_unless_zero(&list->ref);
645 }
646
647 static
648 void free_notification_client_list_rcu(struct rcu_head *node)
649 {
650 free(caa_container_of(node, struct notification_client_list,
651 rcu_node));
652 }
653
654 static
655 void notification_client_list_release(struct urcu_ref *list_ref)
656 {
657 struct notification_client_list *list =
658 container_of(list_ref, typeof(*list), ref);
659 struct notification_client_list_element *client_list_element, *tmp;
660
661 if (list->notification_trigger_clients_ht) {
662 rcu_read_lock();
663 cds_lfht_del(list->notification_trigger_clients_ht,
664 &list->notification_trigger_clients_ht_node);
665 rcu_read_unlock();
666 list->notification_trigger_clients_ht = NULL;
667 }
668 cds_list_for_each_entry_safe(client_list_element, tmp,
669 &list->list, node) {
670 free(client_list_element);
671 }
672 pthread_mutex_destroy(&list->lock);
673 call_rcu(&list->rcu_node, free_notification_client_list_rcu);
674 }
675
676 static
677 struct notification_client_list *notification_client_list_create(
678 const struct lttng_trigger *trigger)
679 {
680 struct notification_client_list *client_list =
681 zmalloc(sizeof(*client_list));
682
683 if (!client_list) {
684 goto error;
685 }
686 pthread_mutex_init(&client_list->lock, NULL);
687 urcu_ref_init(&client_list->ref);
688 cds_lfht_node_init(&client_list->notification_trigger_clients_ht_node);
689 CDS_INIT_LIST_HEAD(&client_list->list);
690 client_list->trigger = trigger;
691 error:
692 return client_list;
693 }
694
695 static
696 void publish_notification_client_list(
697 struct notification_thread_state *state,
698 struct notification_client_list *list)
699 {
700 const struct lttng_condition *condition =
701 lttng_trigger_get_const_condition(list->trigger);
702
703 assert(!list->notification_trigger_clients_ht);
704 notification_client_list_get(list);
705
706 list->notification_trigger_clients_ht =
707 state->notification_trigger_clients_ht;
708
709 rcu_read_lock();
710 cds_lfht_add(state->notification_trigger_clients_ht,
711 lttng_condition_hash(condition),
712 &list->notification_trigger_clients_ht_node);
713 rcu_read_unlock();
714 }
715
716 LTTNG_HIDDEN
717 void notification_client_list_put(struct notification_client_list *list)
718 {
719 if (!list) {
720 return;
721 }
722 return urcu_ref_put(&list->ref, notification_client_list_release);
723 }
724
725 /* Provides a reference to the returned list. */
726 static
727 struct notification_client_list *get_client_list_from_condition(
728 struct notification_thread_state *state,
729 const struct lttng_condition *condition)
730 {
731 struct cds_lfht_node *node;
732 struct cds_lfht_iter iter;
733 struct notification_client_list *list = NULL;
734
735 rcu_read_lock();
736 cds_lfht_lookup(state->notification_trigger_clients_ht,
737 lttng_condition_hash(condition),
738 match_client_list_condition,
739 condition,
740 &iter);
741 node = cds_lfht_iter_get_node(&iter);
742 if (node) {
743 list = container_of(node, struct notification_client_list,
744 notification_trigger_clients_ht_node);
745 list = notification_client_list_get(list) ? list : NULL;
746 }
747
748 rcu_read_unlock();
749 return list;
750 }
751
752 static
753 int evaluate_channel_condition_for_client(
754 const struct lttng_condition *condition,
755 struct notification_thread_state *state,
756 struct lttng_evaluation **evaluation,
757 uid_t *session_uid, gid_t *session_gid)
758 {
759 int ret;
760 struct cds_lfht_iter iter;
761 struct cds_lfht_node *node;
762 struct channel_info *channel_info = NULL;
763 struct channel_key *channel_key = NULL;
764 struct channel_state_sample *last_sample = NULL;
765 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
766
767 rcu_read_lock();
768
769 /* Find the channel associated with the condition. */
770 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter,
771 channel_trigger_list, channel_triggers_ht_node) {
772 struct lttng_trigger_list_element *element;
773
774 cds_list_for_each_entry(element, &channel_trigger_list->list, node) {
775 const struct lttng_condition *current_condition =
776 lttng_trigger_get_const_condition(
777 element->trigger);
778
779 assert(current_condition);
780 if (!lttng_condition_is_equal(condition,
781 current_condition)) {
782 continue;
783 }
784
785 /* Found the trigger, save the channel key. */
786 channel_key = &channel_trigger_list->channel_key;
787 break;
788 }
789 if (channel_key) {
790 /* The channel key was found stop iteration. */
791 break;
792 }
793 }
794
795 if (!channel_key){
796 /* No channel found; normal exit. */
797 DBG("[notification-thread] No known channel associated with newly subscribed-to condition");
798 ret = 0;
799 goto end;
800 }
801
802 /* Fetch channel info for the matching channel. */
803 cds_lfht_lookup(state->channels_ht,
804 hash_channel_key(channel_key),
805 match_channel_info,
806 channel_key,
807 &iter);
808 node = cds_lfht_iter_get_node(&iter);
809 assert(node);
810 channel_info = caa_container_of(node, struct channel_info,
811 channels_ht_node);
812
813 /* Retrieve the channel's last sample, if it exists. */
814 cds_lfht_lookup(state->channel_state_ht,
815 hash_channel_key(channel_key),
816 match_channel_state_sample,
817 channel_key,
818 &iter);
819 node = cds_lfht_iter_get_node(&iter);
820 if (node) {
821 last_sample = caa_container_of(node,
822 struct channel_state_sample,
823 channel_state_ht_node);
824 } else {
825 /* Nothing to evaluate, no sample was ever taken. Normal exit */
826 DBG("[notification-thread] No channel sample associated with newly subscribed-to condition");
827 ret = 0;
828 goto end;
829 }
830
831 ret = evaluate_buffer_condition(condition, evaluation, state,
832 NULL, last_sample,
833 0, channel_info->session_info->consumed_data_size,
834 channel_info);
835 if (ret) {
836 WARN("[notification-thread] Fatal error occurred while evaluating a newly subscribed-to condition");
837 goto end;
838 }
839
840 *session_uid = channel_info->session_info->uid;
841 *session_gid = channel_info->session_info->gid;
842 end:
843 rcu_read_unlock();
844 return ret;
845 }
846
847 static
848 const char *get_condition_session_name(const struct lttng_condition *condition)
849 {
850 const char *session_name = NULL;
851 enum lttng_condition_status status;
852
853 switch (lttng_condition_get_type(condition)) {
854 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
855 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
856 status = lttng_condition_buffer_usage_get_session_name(
857 condition, &session_name);
858 break;
859 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
860 status = lttng_condition_session_consumed_size_get_session_name(
861 condition, &session_name);
862 break;
863 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
864 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
865 status = lttng_condition_session_rotation_get_session_name(
866 condition, &session_name);
867 break;
868 default:
869 abort();
870 }
871 if (status != LTTNG_CONDITION_STATUS_OK) {
872 ERR("[notification-thread] Failed to retrieve session rotation condition's session name");
873 goto end;
874 }
875 end:
876 return session_name;
877 }
878
879 static
880 int evaluate_session_condition_for_client(
881 const struct lttng_condition *condition,
882 struct notification_thread_state *state,
883 struct lttng_evaluation **evaluation,
884 uid_t *session_uid, gid_t *session_gid)
885 {
886 int ret;
887 struct cds_lfht_iter iter;
888 struct cds_lfht_node *node;
889 const char *session_name;
890 struct session_info *session_info = NULL;
891
892 rcu_read_lock();
893 session_name = get_condition_session_name(condition);
894
895 /* Find the session associated with the trigger. */
896 cds_lfht_lookup(state->sessions_ht,
897 hash_key_str(session_name, lttng_ht_seed),
898 match_session,
899 session_name,
900 &iter);
901 node = cds_lfht_iter_get_node(&iter);
902 if (!node) {
903 DBG("[notification-thread] No known session matching name \"%s\"",
904 session_name);
905 ret = 0;
906 goto end;
907 }
908
909 session_info = caa_container_of(node, struct session_info,
910 sessions_ht_node);
911 session_info_get(session_info);
912
913 /*
914 * Evaluation is performed in-line here since only one type of
915 * session-bound condition is handled for the moment.
916 */
917 switch (lttng_condition_get_type(condition)) {
918 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
919 if (!session_info->rotation.ongoing) {
920 ret = 0;
921 goto end_session_put;
922 }
923
924 *evaluation = lttng_evaluation_session_rotation_ongoing_create(
925 session_info->rotation.id);
926 if (!*evaluation) {
927 /* Fatal error. */
928 ERR("[notification-thread] Failed to create session rotation ongoing evaluation for session \"%s\"",
929 session_info->name);
930 ret = -1;
931 goto end_session_put;
932 }
933 ret = 0;
934 break;
935 default:
936 ret = 0;
937 goto end_session_put;
938 }
939
940 *session_uid = session_info->uid;
941 *session_gid = session_info->gid;
942
943 end_session_put:
944 session_info_put(session_info);
945 end:
946 rcu_read_unlock();
947 return ret;
948 }
949
950 static
951 int evaluate_condition_for_client(const struct lttng_trigger *trigger,
952 const struct lttng_condition *condition,
953 struct notification_client *client,
954 struct notification_thread_state *state)
955 {
956 int ret;
957 struct lttng_evaluation *evaluation = NULL;
958 struct notification_client_list client_list = {
959 .lock = PTHREAD_MUTEX_INITIALIZER,
960 };
961 struct notification_client_list_element client_list_element = { 0 };
962 uid_t object_uid = 0;
963 gid_t object_gid = 0;
964
965 assert(trigger);
966 assert(condition);
967 assert(client);
968 assert(state);
969
970 switch (get_condition_binding_object(condition)) {
971 case LTTNG_OBJECT_TYPE_SESSION:
972 ret = evaluate_session_condition_for_client(condition, state,
973 &evaluation, &object_uid, &object_gid);
974 break;
975 case LTTNG_OBJECT_TYPE_CHANNEL:
976 ret = evaluate_channel_condition_for_client(condition, state,
977 &evaluation, &object_uid, &object_gid);
978 break;
979 case LTTNG_OBJECT_TYPE_NONE:
980 DBG("[notification-thread] Newly subscribed-to condition not bound to object, nothing to evaluate");
981 ret = 0;
982 goto end;
983 case LTTNG_OBJECT_TYPE_UNKNOWN:
984 default:
985 ret = -1;
986 goto end;
987 }
988 if (ret) {
989 /* Fatal error. */
990 goto end;
991 }
992 if (!evaluation) {
993 /* Evaluation yielded nothing. Normal exit. */
994 DBG("[notification-thread] Newly subscribed-to condition evaluated to false, nothing to report to client");
995 ret = 0;
996 goto end;
997 }
998
999 /*
1000 * Create a temporary client list with the client currently
1001 * subscribing.
1002 */
1003 cds_lfht_node_init(&client_list.notification_trigger_clients_ht_node);
1004 CDS_INIT_LIST_HEAD(&client_list.list);
1005 client_list.trigger = trigger;
1006
1007 CDS_INIT_LIST_HEAD(&client_list_element.node);
1008 client_list_element.client = client;
1009 cds_list_add(&client_list_element.node, &client_list.list);
1010
1011 /* Send evaluation result to the newly-subscribed client. */
1012 DBG("[notification-thread] Newly subscribed-to condition evaluated to true, notifying client");
1013 ret = send_evaluation_to_clients(trigger, evaluation, &client_list,
1014 state, object_uid, object_gid);
1015
1016 end:
1017 return ret;
1018 }
1019
1020 static
1021 int notification_thread_client_subscribe(struct notification_client *client,
1022 struct lttng_condition *condition,
1023 struct notification_thread_state *state,
1024 enum lttng_notification_channel_status *_status)
1025 {
1026 int ret = 0;
1027 struct notification_client_list *client_list = NULL;
1028 struct lttng_condition_list_element *condition_list_element = NULL;
1029 struct notification_client_list_element *client_list_element = NULL;
1030 enum lttng_notification_channel_status status =
1031 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
1032
1033 /*
1034 * Ensure that the client has not already subscribed to this condition
1035 * before.
1036 */
1037 cds_list_for_each_entry(condition_list_element, &client->condition_list, node) {
1038 if (lttng_condition_is_equal(condition_list_element->condition,
1039 condition)) {
1040 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ALREADY_SUBSCRIBED;
1041 goto end;
1042 }
1043 }
1044
1045 condition_list_element = zmalloc(sizeof(*condition_list_element));
1046 if (!condition_list_element) {
1047 ret = -1;
1048 goto error;
1049 }
1050 client_list_element = zmalloc(sizeof(*client_list_element));
1051 if (!client_list_element) {
1052 ret = -1;
1053 goto error;
1054 }
1055
1056 /*
1057 * Add the newly-subscribed condition to the client's subscription list.
1058 */
1059 CDS_INIT_LIST_HEAD(&condition_list_element->node);
1060 condition_list_element->condition = condition;
1061 cds_list_add(&condition_list_element->node, &client->condition_list);
1062
1063 client_list = get_client_list_from_condition(state, condition);
1064 if (!client_list) {
1065 /*
1066 * No notification-emiting trigger registered with this
1067 * condition. We don't evaluate the condition right away
1068 * since this trigger is not registered yet.
1069 */
1070 free(client_list_element);
1071 goto end;
1072 }
1073
1074 /*
1075 * The condition to which the client just subscribed is evaluated
1076 * at this point so that conditions that are already TRUE result
1077 * in a notification being sent out.
1078 *
1079 * The client_list's trigger is used without locking the list itself.
1080 * This is correct since the list doesn't own the trigger and the
1081 * object is immutable.
1082 */
1083 if (evaluate_condition_for_client(client_list->trigger, condition,
1084 client, state)) {
1085 WARN("[notification-thread] Evaluation of a condition on client subscription failed, aborting.");
1086 ret = -1;
1087 free(client_list_element);
1088 goto end;
1089 }
1090
1091 /*
1092 * Add the client to the list of clients interested in a given trigger
1093 * if a "notification" trigger with a corresponding condition was
1094 * added prior.
1095 */
1096 client_list_element->client = client;
1097 CDS_INIT_LIST_HEAD(&client_list_element->node);
1098
1099 pthread_mutex_lock(&client_list->lock);
1100 cds_list_add(&client_list_element->node, &client_list->list);
1101 pthread_mutex_unlock(&client_list->lock);
1102 end:
1103 if (_status) {
1104 *_status = status;
1105 }
1106 if (client_list) {
1107 notification_client_list_put(client_list);
1108 }
1109 return ret;
1110 error:
1111 free(condition_list_element);
1112 free(client_list_element);
1113 return ret;
1114 }
1115
1116 static
1117 int notification_thread_client_unsubscribe(
1118 struct notification_client *client,
1119 struct lttng_condition *condition,
1120 struct notification_thread_state *state,
1121 enum lttng_notification_channel_status *_status)
1122 {
1123 struct notification_client_list *client_list;
1124 struct lttng_condition_list_element *condition_list_element,
1125 *condition_tmp;
1126 struct notification_client_list_element *client_list_element,
1127 *client_tmp;
1128 bool condition_found = false;
1129 enum lttng_notification_channel_status status =
1130 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
1131
1132 /* Remove the condition from the client's condition list. */
1133 cds_list_for_each_entry_safe(condition_list_element, condition_tmp,
1134 &client->condition_list, node) {
1135 if (!lttng_condition_is_equal(condition_list_element->condition,
1136 condition)) {
1137 continue;
1138 }
1139
1140 cds_list_del(&condition_list_element->node);
1141 /*
1142 * The caller may be iterating on the client's conditions to
1143 * tear down a client's connection. In this case, the condition
1144 * will be destroyed at the end.
1145 */
1146 if (condition != condition_list_element->condition) {
1147 lttng_condition_destroy(
1148 condition_list_element->condition);
1149 }
1150 free(condition_list_element);
1151 condition_found = true;
1152 break;
1153 }
1154
1155 if (!condition_found) {
1156 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNKNOWN_CONDITION;
1157 goto end;
1158 }
1159
1160 /*
1161 * Remove the client from the list of clients interested the trigger
1162 * matching the condition.
1163 */
1164 client_list = get_client_list_from_condition(state, condition);
1165 if (!client_list) {
1166 goto end;
1167 }
1168
1169 pthread_mutex_lock(&client_list->lock);
1170 cds_list_for_each_entry_safe(client_list_element, client_tmp,
1171 &client_list->list, node) {
1172 if (client_list_element->client->id != client->id) {
1173 continue;
1174 }
1175 cds_list_del(&client_list_element->node);
1176 free(client_list_element);
1177 break;
1178 }
1179 pthread_mutex_unlock(&client_list->lock);
1180 notification_client_list_put(client_list);
1181 client_list = NULL;
1182 end:
1183 lttng_condition_destroy(condition);
1184 if (_status) {
1185 *_status = status;
1186 }
1187 return 0;
1188 }
1189
1190 static
1191 void free_notification_client_rcu(struct rcu_head *node)
1192 {
1193 free(caa_container_of(node, struct notification_client, rcu_node));
1194 }
1195
1196 static
1197 void notification_client_destroy(struct notification_client *client,
1198 struct notification_thread_state *state)
1199 {
1200 if (!client) {
1201 return;
1202 }
1203
1204 /*
1205 * The client object is not reachable by other threads, no need to lock
1206 * the client here.
1207 */
1208 if (client->socket >= 0) {
1209 (void) lttcomm_close_unix_sock(client->socket);
1210 client->socket = -1;
1211 }
1212 client->communication.active = false;
1213 lttng_payload_reset(&client->communication.inbound.payload);
1214 lttng_payload_reset(&client->communication.outbound.payload);
1215 pthread_mutex_destroy(&client->lock);
1216 call_rcu(&client->rcu_node, free_notification_client_rcu);
1217 }
1218
1219 /*
1220 * Call with rcu_read_lock held (and hold for the lifetime of the returned
1221 * client pointer).
1222 */
1223 static
1224 struct notification_client *get_client_from_socket(int socket,
1225 struct notification_thread_state *state)
1226 {
1227 struct cds_lfht_iter iter;
1228 struct cds_lfht_node *node;
1229 struct notification_client *client = NULL;
1230
1231 cds_lfht_lookup(state->client_socket_ht,
1232 hash_client_socket(socket),
1233 match_client_socket,
1234 (void *) (unsigned long) socket,
1235 &iter);
1236 node = cds_lfht_iter_get_node(&iter);
1237 if (!node) {
1238 goto end;
1239 }
1240
1241 client = caa_container_of(node, struct notification_client,
1242 client_socket_ht_node);
1243 end:
1244 return client;
1245 }
1246
1247 /*
1248 * Call with rcu_read_lock held (and hold for the lifetime of the returned
1249 * client pointer).
1250 */
1251 static
1252 struct notification_client *get_client_from_id(notification_client_id id,
1253 struct notification_thread_state *state)
1254 {
1255 struct cds_lfht_iter iter;
1256 struct cds_lfht_node *node;
1257 struct notification_client *client = NULL;
1258
1259 cds_lfht_lookup(state->client_id_ht,
1260 hash_client_id(id),
1261 match_client_id,
1262 &id,
1263 &iter);
1264 node = cds_lfht_iter_get_node(&iter);
1265 if (!node) {
1266 goto end;
1267 }
1268
1269 client = caa_container_of(node, struct notification_client,
1270 client_id_ht_node);
1271 end:
1272 return client;
1273 }
1274
1275 static
1276 bool buffer_usage_condition_applies_to_channel(
1277 const struct lttng_condition *condition,
1278 const struct channel_info *channel_info)
1279 {
1280 enum lttng_condition_status status;
1281 enum lttng_domain_type condition_domain;
1282 const char *condition_session_name = NULL;
1283 const char *condition_channel_name = NULL;
1284
1285 status = lttng_condition_buffer_usage_get_domain_type(condition,
1286 &condition_domain);
1287 assert(status == LTTNG_CONDITION_STATUS_OK);
1288 if (channel_info->key.domain != condition_domain) {
1289 goto fail;
1290 }
1291
1292 status = lttng_condition_buffer_usage_get_session_name(
1293 condition, &condition_session_name);
1294 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
1295
1296 status = lttng_condition_buffer_usage_get_channel_name(
1297 condition, &condition_channel_name);
1298 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_channel_name);
1299
1300 if (strcmp(channel_info->session_info->name, condition_session_name)) {
1301 goto fail;
1302 }
1303 if (strcmp(channel_info->name, condition_channel_name)) {
1304 goto fail;
1305 }
1306
1307 return true;
1308 fail:
1309 return false;
1310 }
1311
1312 static
1313 bool session_consumed_size_condition_applies_to_channel(
1314 const struct lttng_condition *condition,
1315 const struct channel_info *channel_info)
1316 {
1317 enum lttng_condition_status status;
1318 const char *condition_session_name = NULL;
1319
1320 status = lttng_condition_session_consumed_size_get_session_name(
1321 condition, &condition_session_name);
1322 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
1323
1324 if (strcmp(channel_info->session_info->name, condition_session_name)) {
1325 goto fail;
1326 }
1327
1328 return true;
1329 fail:
1330 return false;
1331 }
1332
1333 static
1334 bool trigger_applies_to_channel(const struct lttng_trigger *trigger,
1335 const struct channel_info *channel_info)
1336 {
1337 const struct lttng_condition *condition;
1338 bool trigger_applies;
1339
1340 condition = lttng_trigger_get_const_condition(trigger);
1341 if (!condition) {
1342 goto fail;
1343 }
1344
1345 switch (lttng_condition_get_type(condition)) {
1346 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
1347 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
1348 trigger_applies = buffer_usage_condition_applies_to_channel(
1349 condition, channel_info);
1350 break;
1351 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
1352 trigger_applies = session_consumed_size_condition_applies_to_channel(
1353 condition, channel_info);
1354 break;
1355 default:
1356 goto fail;
1357 }
1358
1359 return trigger_applies;
1360 fail:
1361 return false;
1362 }
1363
1364 static
1365 bool trigger_applies_to_client(struct lttng_trigger *trigger,
1366 struct notification_client *client)
1367 {
1368 bool applies = false;
1369 struct lttng_condition_list_element *condition_list_element;
1370
1371 cds_list_for_each_entry(condition_list_element, &client->condition_list,
1372 node) {
1373 applies = lttng_condition_is_equal(
1374 condition_list_element->condition,
1375 lttng_trigger_get_condition(trigger));
1376 if (applies) {
1377 break;
1378 }
1379 }
1380 return applies;
1381 }
1382
1383 /* Must be called with RCU read lock held. */
1384 static
1385 struct lttng_session_trigger_list *get_session_trigger_list(
1386 struct notification_thread_state *state,
1387 const char *session_name)
1388 {
1389 struct lttng_session_trigger_list *list = NULL;
1390 struct cds_lfht_node *node;
1391 struct cds_lfht_iter iter;
1392
1393 cds_lfht_lookup(state->session_triggers_ht,
1394 hash_key_str(session_name, lttng_ht_seed),
1395 match_session_trigger_list,
1396 session_name,
1397 &iter);
1398 node = cds_lfht_iter_get_node(&iter);
1399 if (!node) {
1400 /*
1401 * Not an error, the list of triggers applying to that session
1402 * will be initialized when the session is created.
1403 */
1404 DBG("[notification-thread] No trigger list found for session \"%s\" as it is not yet known to the notification system",
1405 session_name);
1406 goto end;
1407 }
1408
1409 list = caa_container_of(node,
1410 struct lttng_session_trigger_list,
1411 session_triggers_ht_node);
1412 end:
1413 return list;
1414 }
1415
1416 /*
1417 * Allocate an empty lttng_session_trigger_list for the session named
1418 * 'session_name'.
1419 *
1420 * No ownership of 'session_name' is assumed by the session trigger list.
1421 * It is the caller's responsability to ensure the session name is alive
1422 * for as long as this list is.
1423 */
1424 static
1425 struct lttng_session_trigger_list *lttng_session_trigger_list_create(
1426 const char *session_name,
1427 struct cds_lfht *session_triggers_ht)
1428 {
1429 struct lttng_session_trigger_list *list;
1430
1431 list = zmalloc(sizeof(*list));
1432 if (!list) {
1433 goto end;
1434 }
1435 list->session_name = session_name;
1436 CDS_INIT_LIST_HEAD(&list->list);
1437 cds_lfht_node_init(&list->session_triggers_ht_node);
1438 list->session_triggers_ht = session_triggers_ht;
1439
1440 rcu_read_lock();
1441 /* Publish the list through the session_triggers_ht. */
1442 cds_lfht_add(session_triggers_ht,
1443 hash_key_str(session_name, lttng_ht_seed),
1444 &list->session_triggers_ht_node);
1445 rcu_read_unlock();
1446 end:
1447 return list;
1448 }
1449
1450 static
1451 void free_session_trigger_list_rcu(struct rcu_head *node)
1452 {
1453 free(caa_container_of(node, struct lttng_session_trigger_list,
1454 rcu_node));
1455 }
1456
1457 static
1458 void lttng_session_trigger_list_destroy(struct lttng_session_trigger_list *list)
1459 {
1460 struct lttng_trigger_list_element *trigger_list_element, *tmp;
1461
1462 /* Empty the list element by element, and then free the list itself. */
1463 cds_list_for_each_entry_safe(trigger_list_element, tmp,
1464 &list->list, node) {
1465 cds_list_del(&trigger_list_element->node);
1466 free(trigger_list_element);
1467 }
1468 rcu_read_lock();
1469 /* Unpublish the list from the session_triggers_ht. */
1470 cds_lfht_del(list->session_triggers_ht,
1471 &list->session_triggers_ht_node);
1472 rcu_read_unlock();
1473 call_rcu(&list->rcu_node, free_session_trigger_list_rcu);
1474 }
1475
1476 static
1477 int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
1478 struct lttng_trigger *trigger)
1479 {
1480 int ret = 0;
1481 struct lttng_trigger_list_element *new_element =
1482 zmalloc(sizeof(*new_element));
1483
1484 if (!new_element) {
1485 ret = -1;
1486 goto end;
1487 }
1488 CDS_INIT_LIST_HEAD(&new_element->node);
1489 new_element->trigger = trigger;
1490 cds_list_add(&new_element->node, &list->list);
1491 end:
1492 return ret;
1493 }
1494
1495 static
1496 bool trigger_applies_to_session(const struct lttng_trigger *trigger,
1497 const char *session_name)
1498 {
1499 bool applies = false;
1500 const struct lttng_condition *condition;
1501
1502 condition = lttng_trigger_get_const_condition(trigger);
1503 switch (lttng_condition_get_type(condition)) {
1504 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
1505 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
1506 {
1507 enum lttng_condition_status condition_status;
1508 const char *condition_session_name;
1509
1510 condition_status = lttng_condition_session_rotation_get_session_name(
1511 condition, &condition_session_name);
1512 if (condition_status != LTTNG_CONDITION_STATUS_OK) {
1513 ERR("[notification-thread] Failed to retrieve session rotation condition's session name");
1514 goto end;
1515 }
1516
1517 assert(condition_session_name);
1518 applies = !strcmp(condition_session_name, session_name);
1519 break;
1520 }
1521 default:
1522 goto end;
1523 }
1524 end:
1525 return applies;
1526 }
1527
1528 /*
1529 * Allocate and initialize an lttng_session_trigger_list which contains
1530 * all triggers that apply to the session named 'session_name'.
1531 *
1532 * No ownership of 'session_name' is assumed by the session trigger list.
1533 * It is the caller's responsability to ensure the session name is alive
1534 * for as long as this list is.
1535 */
1536 static
1537 struct lttng_session_trigger_list *lttng_session_trigger_list_build(
1538 const struct notification_thread_state *state,
1539 const char *session_name)
1540 {
1541 int trigger_count = 0;
1542 struct lttng_session_trigger_list *session_trigger_list = NULL;
1543 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1544 struct cds_lfht_iter iter;
1545
1546 session_trigger_list = lttng_session_trigger_list_create(session_name,
1547 state->session_triggers_ht);
1548
1549 /* Add all triggers applying to the session named 'session_name'. */
1550 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1551 node) {
1552 int ret;
1553
1554 if (!trigger_applies_to_session(trigger_ht_element->trigger,
1555 session_name)) {
1556 continue;
1557 }
1558
1559 ret = lttng_session_trigger_list_add(session_trigger_list,
1560 trigger_ht_element->trigger);
1561 if (ret) {
1562 goto error;
1563 }
1564
1565 trigger_count++;
1566 }
1567
1568 DBG("[notification-thread] Found %i triggers that apply to newly created session",
1569 trigger_count);
1570 return session_trigger_list;
1571 error:
1572 lttng_session_trigger_list_destroy(session_trigger_list);
1573 return NULL;
1574 }
1575
1576 static
1577 struct session_info *find_or_create_session_info(
1578 struct notification_thread_state *state,
1579 const char *name, uid_t uid, gid_t gid)
1580 {
1581 struct session_info *session = NULL;
1582 struct cds_lfht_node *node;
1583 struct cds_lfht_iter iter;
1584 struct lttng_session_trigger_list *trigger_list;
1585
1586 rcu_read_lock();
1587 cds_lfht_lookup(state->sessions_ht,
1588 hash_key_str(name, lttng_ht_seed),
1589 match_session,
1590 name,
1591 &iter);
1592 node = cds_lfht_iter_get_node(&iter);
1593 if (node) {
1594 DBG("[notification-thread] Found session info of session \"%s\" (uid = %i, gid = %i)",
1595 name, uid, gid);
1596 session = caa_container_of(node, struct session_info,
1597 sessions_ht_node);
1598 assert(session->uid == uid);
1599 assert(session->gid == gid);
1600 session_info_get(session);
1601 goto end;
1602 }
1603
1604 trigger_list = lttng_session_trigger_list_build(state, name);
1605 if (!trigger_list) {
1606 goto error;
1607 }
1608
1609 session = session_info_create(name, uid, gid, trigger_list,
1610 state->sessions_ht);
1611 if (!session) {
1612 ERR("[notification-thread] Failed to allocation session info for session \"%s\" (uid = %i, gid = %i)",
1613 name, uid, gid);
1614 lttng_session_trigger_list_destroy(trigger_list);
1615 goto error;
1616 }
1617 trigger_list = NULL;
1618
1619 cds_lfht_add(state->sessions_ht, hash_key_str(name, lttng_ht_seed),
1620 &session->sessions_ht_node);
1621 end:
1622 rcu_read_unlock();
1623 return session;
1624 error:
1625 rcu_read_unlock();
1626 session_info_put(session);
1627 return NULL;
1628 }
1629
1630 static
1631 int handle_notification_thread_command_add_channel(
1632 struct notification_thread_state *state,
1633 const char *session_name, uid_t session_uid, gid_t session_gid,
1634 const char *channel_name, enum lttng_domain_type channel_domain,
1635 uint64_t channel_key_int, uint64_t channel_capacity,
1636 enum lttng_error_code *cmd_result)
1637 {
1638 struct cds_list_head trigger_list;
1639 struct channel_info *new_channel_info = NULL;
1640 struct channel_key channel_key = {
1641 .key = channel_key_int,
1642 .domain = channel_domain,
1643 };
1644 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
1645 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1646 int trigger_count = 0;
1647 struct cds_lfht_iter iter;
1648 struct session_info *session_info = NULL;
1649
1650 DBG("[notification-thread] Adding channel %s from session %s, channel key = %" PRIu64 " in %s domain",
1651 channel_name, session_name, channel_key_int,
1652 lttng_domain_type_str(channel_domain));
1653
1654 CDS_INIT_LIST_HEAD(&trigger_list);
1655
1656 session_info = find_or_create_session_info(state, session_name,
1657 session_uid, session_gid);
1658 if (!session_info) {
1659 /* Allocation error or an internal error occurred. */
1660 goto error;
1661 }
1662
1663 new_channel_info = channel_info_create(channel_name, &channel_key,
1664 channel_capacity, session_info);
1665 if (!new_channel_info) {
1666 goto error;
1667 }
1668
1669 rcu_read_lock();
1670 /* Build a list of all triggers applying to the new channel. */
1671 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1672 node) {
1673 struct lttng_trigger_list_element *new_element;
1674
1675 if (!trigger_applies_to_channel(trigger_ht_element->trigger,
1676 new_channel_info)) {
1677 continue;
1678 }
1679
1680 new_element = zmalloc(sizeof(*new_element));
1681 if (!new_element) {
1682 rcu_read_unlock();
1683 goto error;
1684 }
1685 CDS_INIT_LIST_HEAD(&new_element->node);
1686 new_element->trigger = trigger_ht_element->trigger;
1687 cds_list_add(&new_element->node, &trigger_list);
1688 trigger_count++;
1689 }
1690 rcu_read_unlock();
1691
1692 DBG("[notification-thread] Found %i triggers that apply to newly added channel",
1693 trigger_count);
1694 channel_trigger_list = zmalloc(sizeof(*channel_trigger_list));
1695 if (!channel_trigger_list) {
1696 goto error;
1697 }
1698 channel_trigger_list->channel_key = new_channel_info->key;
1699 CDS_INIT_LIST_HEAD(&channel_trigger_list->list);
1700 cds_lfht_node_init(&channel_trigger_list->channel_triggers_ht_node);
1701 cds_list_splice(&trigger_list, &channel_trigger_list->list);
1702
1703 rcu_read_lock();
1704 /* Add channel to the channel_ht which owns the channel_infos. */
1705 cds_lfht_add(state->channels_ht,
1706 hash_channel_key(&new_channel_info->key),
1707 &new_channel_info->channels_ht_node);
1708 /*
1709 * Add the list of triggers associated with this channel to the
1710 * channel_triggers_ht.
1711 */
1712 cds_lfht_add(state->channel_triggers_ht,
1713 hash_channel_key(&new_channel_info->key),
1714 &channel_trigger_list->channel_triggers_ht_node);
1715 rcu_read_unlock();
1716 session_info_put(session_info);
1717 *cmd_result = LTTNG_OK;
1718 return 0;
1719 error:
1720 channel_info_destroy(new_channel_info);
1721 session_info_put(session_info);
1722 return 1;
1723 }
1724
1725 static
1726 void free_channel_trigger_list_rcu(struct rcu_head *node)
1727 {
1728 free(caa_container_of(node, struct lttng_channel_trigger_list,
1729 rcu_node));
1730 }
1731
1732 static
1733 void free_channel_state_sample_rcu(struct rcu_head *node)
1734 {
1735 free(caa_container_of(node, struct channel_state_sample,
1736 rcu_node));
1737 }
1738
1739 static
1740 int handle_notification_thread_command_remove_channel(
1741 struct notification_thread_state *state,
1742 uint64_t channel_key, enum lttng_domain_type domain,
1743 enum lttng_error_code *cmd_result)
1744 {
1745 struct cds_lfht_node *node;
1746 struct cds_lfht_iter iter;
1747 struct lttng_channel_trigger_list *trigger_list;
1748 struct lttng_trigger_list_element *trigger_list_element, *tmp;
1749 struct channel_key key = { .key = channel_key, .domain = domain };
1750 struct channel_info *channel_info;
1751
1752 DBG("[notification-thread] Removing channel key = %" PRIu64 " in %s domain",
1753 channel_key, lttng_domain_type_str(domain));
1754
1755 rcu_read_lock();
1756
1757 cds_lfht_lookup(state->channel_triggers_ht,
1758 hash_channel_key(&key),
1759 match_channel_trigger_list,
1760 &key,
1761 &iter);
1762 node = cds_lfht_iter_get_node(&iter);
1763 /*
1764 * There is a severe internal error if we are being asked to remove a
1765 * channel that doesn't exist.
1766 */
1767 if (!node) {
1768 ERR("[notification-thread] Channel being removed is unknown to the notification thread");
1769 goto end;
1770 }
1771
1772 /* Free the list of triggers associated with this channel. */
1773 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
1774 channel_triggers_ht_node);
1775 cds_list_for_each_entry_safe(trigger_list_element, tmp,
1776 &trigger_list->list, node) {
1777 cds_list_del(&trigger_list_element->node);
1778 free(trigger_list_element);
1779 }
1780 cds_lfht_del(state->channel_triggers_ht, node);
1781 call_rcu(&trigger_list->rcu_node, free_channel_trigger_list_rcu);
1782
1783 /* Free sampled channel state. */
1784 cds_lfht_lookup(state->channel_state_ht,
1785 hash_channel_key(&key),
1786 match_channel_state_sample,
1787 &key,
1788 &iter);
1789 node = cds_lfht_iter_get_node(&iter);
1790 /*
1791 * This is expected to be NULL if the channel is destroyed before we
1792 * received a sample.
1793 */
1794 if (node) {
1795 struct channel_state_sample *sample = caa_container_of(node,
1796 struct channel_state_sample,
1797 channel_state_ht_node);
1798
1799 cds_lfht_del(state->channel_state_ht, node);
1800 call_rcu(&sample->rcu_node, free_channel_state_sample_rcu);
1801 }
1802
1803 /* Remove the channel from the channels_ht and free it. */
1804 cds_lfht_lookup(state->channels_ht,
1805 hash_channel_key(&key),
1806 match_channel_info,
1807 &key,
1808 &iter);
1809 node = cds_lfht_iter_get_node(&iter);
1810 assert(node);
1811 channel_info = caa_container_of(node, struct channel_info,
1812 channels_ht_node);
1813 cds_lfht_del(state->channels_ht, node);
1814 channel_info_destroy(channel_info);
1815 end:
1816 rcu_read_unlock();
1817 *cmd_result = LTTNG_OK;
1818 return 0;
1819 }
1820
1821 static
1822 int handle_notification_thread_command_session_rotation(
1823 struct notification_thread_state *state,
1824 enum notification_thread_command_type cmd_type,
1825 const char *session_name, uid_t session_uid, gid_t session_gid,
1826 uint64_t trace_archive_chunk_id,
1827 struct lttng_trace_archive_location *location,
1828 enum lttng_error_code *_cmd_result)
1829 {
1830 int ret = 0;
1831 enum lttng_error_code cmd_result = LTTNG_OK;
1832 struct lttng_session_trigger_list *trigger_list;
1833 struct lttng_trigger_list_element *trigger_list_element;
1834 struct session_info *session_info;
1835 const struct lttng_credentials session_creds = {
1836 .uid = LTTNG_OPTIONAL_INIT_VALUE(session_uid),
1837 .gid = LTTNG_OPTIONAL_INIT_VALUE(session_gid),
1838 };
1839
1840 rcu_read_lock();
1841
1842 session_info = find_or_create_session_info(state, session_name,
1843 session_uid, session_gid);
1844 if (!session_info) {
1845 /* Allocation error or an internal error occurred. */
1846 ret = -1;
1847 cmd_result = LTTNG_ERR_NOMEM;
1848 goto end;
1849 }
1850
1851 session_info->rotation.ongoing =
1852 cmd_type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING;
1853 session_info->rotation.id = trace_archive_chunk_id;
1854 trigger_list = get_session_trigger_list(state, session_name);
1855 if (!trigger_list) {
1856 DBG("[notification-thread] No triggers applying to session \"%s\" found",
1857 session_name);
1858 goto end;
1859 }
1860
1861 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
1862 node) {
1863 const struct lttng_condition *condition;
1864 struct lttng_trigger *trigger;
1865 struct notification_client_list *client_list;
1866 struct lttng_evaluation *evaluation = NULL;
1867 enum lttng_condition_type condition_type;
1868 enum action_executor_status executor_status;
1869
1870 trigger = trigger_list_element->trigger;
1871 condition = lttng_trigger_get_const_condition(trigger);
1872 assert(condition);
1873 condition_type = lttng_condition_get_type(condition);
1874
1875 if (condition_type == LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING &&
1876 cmd_type != NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING) {
1877 continue;
1878 } else if (condition_type == LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED &&
1879 cmd_type != NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED) {
1880 continue;
1881 }
1882
1883 client_list = get_client_list_from_condition(state, condition);
1884 if (cmd_type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING) {
1885 evaluation = lttng_evaluation_session_rotation_ongoing_create(
1886 trace_archive_chunk_id);
1887 } else {
1888 evaluation = lttng_evaluation_session_rotation_completed_create(
1889 trace_archive_chunk_id, location);
1890 }
1891
1892 if (!evaluation) {
1893 /* Internal error */
1894 ret = -1;
1895 cmd_result = LTTNG_ERR_UNK;
1896 goto put_list;
1897 }
1898
1899 /*
1900 * Ownership of `evaluation` transferred to the action executor
1901 * no matter the result.
1902 */
1903 executor_status = action_executor_enqueue(state->executor,
1904 trigger, evaluation, &session_creds,
1905 client_list);
1906 evaluation = NULL;
1907 switch (executor_status) {
1908 case ACTION_EXECUTOR_STATUS_OK:
1909 break;
1910 case ACTION_EXECUTOR_STATUS_ERROR:
1911 case ACTION_EXECUTOR_STATUS_INVALID:
1912 /*
1913 * TODO Add trigger identification (name/id) when
1914 * it is added to the API.
1915 */
1916 ERR("Fatal error occurred while enqueuing action associated with session rotation trigger");
1917 ret = -1;
1918 goto put_list;
1919 case ACTION_EXECUTOR_STATUS_OVERFLOW:
1920 /*
1921 * TODO Add trigger identification (name/id) when
1922 * it is added to the API.
1923 *
1924 * Not a fatal error.
1925 */
1926 WARN("No space left when enqueuing action associated with session rotation trigger");
1927 ret = 0;
1928 goto put_list;
1929 default:
1930 abort();
1931 }
1932
1933 put_list:
1934 notification_client_list_put(client_list);
1935 if (caa_unlikely(ret)) {
1936 break;
1937 }
1938 }
1939 end:
1940 session_info_put(session_info);
1941 *_cmd_result = cmd_result;
1942 rcu_read_unlock();
1943 return ret;
1944 }
1945
1946 static
1947 int handle_notification_thread_command_add_tracer_event_source(
1948 struct notification_thread_state *state,
1949 int tracer_event_source_fd,
1950 enum lttng_domain_type domain_type,
1951 enum lttng_error_code *_cmd_result)
1952 {
1953 int ret = 0;
1954 enum lttng_error_code cmd_result = LTTNG_OK;
1955 struct notification_event_tracer_event_source_element *element = NULL;
1956
1957 element = zmalloc(sizeof(*element));
1958 if (!element) {
1959 cmd_result = LTTNG_ERR_NOMEM;
1960 ret = -1;
1961 goto end;
1962 }
1963
1964 element->fd = tracer_event_source_fd;
1965 element->domain = domain_type;
1966
1967 cds_list_add(&element->node, &state->tracer_event_sources_list);
1968
1969 DBG3("[notification-thread] Adding tracer event source fd to poll set: tracer_event_source_fd = %d, domain = '%s'",
1970 tracer_event_source_fd,
1971 lttng_domain_type_str(domain_type));
1972
1973 /* Adding the read side pipe to the event poll. */
1974 ret = lttng_poll_add(&state->events, tracer_event_source_fd, LPOLLIN | LPOLLERR);
1975 if (ret < 0) {
1976 ERR("[notification-thread] Failed to add tracer event source to poll set: tracer_event_source_fd = %d, domain = '%s'",
1977 tracer_event_source_fd,
1978 lttng_domain_type_str(element->domain));
1979 cds_list_del(&element->node);
1980 free(element);
1981 goto end;
1982 }
1983
1984 element->is_fd_in_poll_set = true;
1985
1986 end:
1987 *_cmd_result = cmd_result;
1988 return ret;
1989 }
1990
1991 static
1992 int drain_event_notifier_notification_pipe(
1993 struct notification_thread_state *state,
1994 int pipe, enum lttng_domain_type domain)
1995 {
1996 struct lttng_poll_event events = {0};
1997 int ret;
1998
1999 ret = lttng_poll_create(&events, 1, LTTNG_CLOEXEC);
2000 if (ret < 0) {
2001 ERR("[notification-thread] Error creating lttng_poll_event");
2002 goto end;
2003 }
2004
2005 ret = lttng_poll_add(&events, pipe, LPOLLIN);
2006 if (ret < 0) {
2007 ERR("[notification-thread] Error adding fd event notifier notification pipe to lttng_poll_event: fd = %d",
2008 pipe);
2009 goto end;
2010 }
2011
2012 while (true) {
2013 /*
2014 * Continue to consume notifications as long as there are new
2015 * ones coming in. The tracer has been asked to stop producing
2016 * them.
2017 *
2018 * LPOLLIN is explicitly checked since LPOLLHUP is implicitly
2019 * monitored (on Linux, at least) and will be returned when
2020 * the pipe is closed but empty.
2021 */
2022 ret = lttng_poll_wait_interruptible(&events, 0);
2023 if (ret == 0 || (LTTNG_POLL_GETEV(&events, 0) & LPOLLIN) == 0) {
2024 /* No more notification to be read on this pipe. */
2025 ret = 0;
2026 goto end;
2027 } else if (ret < 0) {
2028 PERROR("Failed on lttng_poll_wait_interruptible() call");
2029 ret = -1;
2030 goto end;
2031 }
2032
2033 ret = handle_one_event_notifier_notification(state, pipe, domain);
2034 if (ret) {
2035 ERR("[notification-thread] Error consuming an event notifier notification from pipe: fd = %d",
2036 pipe);
2037 }
2038 }
2039 end:
2040 lttng_poll_clean(&events);
2041 return ret;
2042 }
2043
2044 static
2045 int handle_notification_thread_command_remove_tracer_event_source(
2046 struct notification_thread_state *state,
2047 int tracer_event_source_fd,
2048 enum lttng_error_code *_cmd_result)
2049 {
2050 int ret = 0;
2051 bool found = false;
2052 enum lttng_error_code cmd_result = LTTNG_OK;
2053 struct notification_event_tracer_event_source_element *source_element = NULL, *tmp;
2054
2055 cds_list_for_each_entry_safe(source_element, tmp,
2056 &state->tracer_event_sources_list, node) {
2057 if (source_element->fd != tracer_event_source_fd) {
2058 continue;
2059 }
2060
2061 DBG("[notification-thread] Removed tracer event source from poll set: tracer_event_source_fd = %d, domain = '%s'",
2062 tracer_event_source_fd,
2063 lttng_domain_type_str(source_element->domain));
2064 cds_list_del(&source_element->node);
2065 found = true;
2066 break;
2067 }
2068
2069 if (!found) {
2070 /*
2071 * This is temporarily allowed since the poll activity set is
2072 * not properly cleaned-up for the moment. This is adressed in
2073 * an upcoming fix.
2074 */
2075 source_element = NULL;
2076 goto end;
2077 }
2078
2079 if (!source_element->is_fd_in_poll_set) {
2080 /* Skip the poll set removal. */
2081 goto end;
2082 }
2083
2084 DBG3("[notification-thread] Removing tracer event source from poll set: tracer_event_source_fd = %d, domain = '%s'",
2085 tracer_event_source_fd,
2086 lttng_domain_type_str(source_element->domain));
2087
2088 /* Removing the fd from the event poll set. */
2089 ret = lttng_poll_del(&state->events, tracer_event_source_fd);
2090 if (ret < 0) {
2091 ERR("[notification-thread] Failed to remove tracer event source from poll set: tracer_event_source_fd = %d, domain = '%s'",
2092 tracer_event_source_fd,
2093 lttng_domain_type_str(source_element->domain));
2094 cmd_result = LTTNG_ERR_FATAL;
2095 goto end;
2096 }
2097
2098 source_element->is_fd_in_poll_set = false;
2099
2100 ret = drain_event_notifier_notification_pipe(state, tracer_event_source_fd,
2101 source_element->domain);
2102 if (ret) {
2103 ERR("[notification-thread] Error draining event notifier notification: tracer_event_source_fd = %d, domain = %s",
2104 tracer_event_source_fd,
2105 lttng_domain_type_str(source_element->domain));
2106 cmd_result = LTTNG_ERR_FATAL;
2107 goto end;
2108 }
2109
2110 /*
2111 * The drain_event_notifier_notification_pipe() call might have read
2112 * data from an fd that we received in event in the latest _poll_wait()
2113 * call. Make sure the thread call poll_wait() again to ensure we have
2114 * a clean state.
2115 */
2116 state->restart_poll = true;
2117
2118 end:
2119 free(source_element);
2120 *_cmd_result = cmd_result;
2121 return ret;
2122 }
2123
2124 int handle_notification_thread_remove_tracer_event_source_no_result(
2125 struct notification_thread_state *state,
2126 int tracer_event_source_fd)
2127 {
2128 int ret;
2129 enum lttng_error_code cmd_result;
2130
2131 ret = handle_notification_thread_command_remove_tracer_event_source(
2132 state, tracer_event_source_fd, &cmd_result);
2133 (void) cmd_result;
2134 return ret;
2135 }
2136
2137 static int handle_notification_thread_command_list_triggers(
2138 struct notification_thread_handle *handle,
2139 struct notification_thread_state *state,
2140 uid_t client_uid,
2141 struct lttng_triggers **triggers,
2142 enum lttng_error_code *_cmd_result)
2143 {
2144 int ret = 0;
2145 enum lttng_error_code cmd_result = LTTNG_OK;
2146 struct cds_lfht_iter iter;
2147 struct lttng_trigger_ht_element *trigger_ht_element;
2148 struct lttng_triggers *local_triggers = NULL;
2149 const struct lttng_credentials *creds;
2150
2151 rcu_read_lock();
2152
2153 local_triggers = lttng_triggers_create();
2154 if (!local_triggers) {
2155 /* Not a fatal error. */
2156 cmd_result = LTTNG_ERR_NOMEM;
2157 goto end;
2158 }
2159
2160 cds_lfht_for_each_entry(state->triggers_ht, &iter,
2161 trigger_ht_element, node) {
2162 /*
2163 * Only return the triggers to which the client has access.
2164 * The root user has visibility over all triggers.
2165 */
2166 creds = lttng_trigger_get_credentials(trigger_ht_element->trigger);
2167 if (client_uid != lttng_credentials_get_uid(creds) && client_uid != 0) {
2168 continue;
2169 }
2170
2171 ret = lttng_triggers_add(local_triggers,
2172 trigger_ht_element->trigger);
2173 if (ret < 0) {
2174 /* Not a fatal error. */
2175 ret = 0;
2176 cmd_result = LTTNG_ERR_NOMEM;
2177 goto end;
2178 }
2179 }
2180
2181 /* Transferring ownership to the caller. */
2182 *triggers = local_triggers;
2183 local_triggers = NULL;
2184
2185 end:
2186 rcu_read_unlock();
2187 lttng_triggers_destroy(local_triggers);
2188 *_cmd_result = cmd_result;
2189 return ret;
2190 }
2191
2192 static
2193 bool condition_is_supported(struct lttng_condition *condition)
2194 {
2195 bool is_supported;
2196
2197 switch (lttng_condition_get_type(condition)) {
2198 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
2199 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
2200 {
2201 int ret;
2202 enum lttng_domain_type domain;
2203
2204 ret = lttng_condition_buffer_usage_get_domain_type(condition,
2205 &domain);
2206 assert(ret == 0);
2207
2208 if (domain != LTTNG_DOMAIN_KERNEL) {
2209 is_supported = true;
2210 goto end;
2211 }
2212
2213 /*
2214 * Older kernel tracers don't expose the API to monitor their
2215 * buffers. Therefore, we reject triggers that require that
2216 * mechanism to be available to be evaluated.
2217 *
2218 * Assume unsupported on error.
2219 */
2220 is_supported = kernel_supports_ring_buffer_snapshot_sample_positions() == 1;
2221 break;
2222 }
2223 case LTTNG_CONDITION_TYPE_EVENT_RULE_HIT:
2224 {
2225 const struct lttng_event_rule *event_rule;
2226 enum lttng_domain_type domain;
2227 const enum lttng_condition_status status =
2228 lttng_condition_event_rule_get_rule(
2229 condition, &event_rule);
2230
2231 assert(status == LTTNG_CONDITION_STATUS_OK);
2232
2233 domain = lttng_event_rule_get_domain_type(event_rule);
2234 if (domain != LTTNG_DOMAIN_KERNEL) {
2235 is_supported = true;
2236 goto end;
2237 }
2238
2239 /*
2240 * Older kernel tracers can't emit notification. Therefore, we
2241 * reject triggers that require that mechanism to be available
2242 * to be evaluated.
2243 *
2244 * Assume unsupported on error.
2245 */
2246 is_supported = kernel_supports_event_notifiers() == 1;
2247 break;
2248 }
2249 default:
2250 is_supported = true;
2251 }
2252 end:
2253 return is_supported;
2254 }
2255
2256 /* Must be called with RCU read lock held. */
2257 static
2258 int bind_trigger_to_matching_session(struct lttng_trigger *trigger,
2259 struct notification_thread_state *state)
2260 {
2261 int ret = 0;
2262 const struct lttng_condition *condition;
2263 const char *session_name;
2264 struct lttng_session_trigger_list *trigger_list;
2265
2266 condition = lttng_trigger_get_const_condition(trigger);
2267 switch (lttng_condition_get_type(condition)) {
2268 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
2269 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
2270 {
2271 enum lttng_condition_status status;
2272
2273 status = lttng_condition_session_rotation_get_session_name(
2274 condition, &session_name);
2275 if (status != LTTNG_CONDITION_STATUS_OK) {
2276 ERR("[notification-thread] Failed to bind trigger to session: unable to get 'session_rotation' condition's session name");
2277 ret = -1;
2278 goto end;
2279 }
2280 break;
2281 }
2282 default:
2283 ret = -1;
2284 goto end;
2285 }
2286
2287 trigger_list = get_session_trigger_list(state, session_name);
2288 if (!trigger_list) {
2289 DBG("[notification-thread] Unable to bind trigger applying to session \"%s\" as it is not yet known to the notification system",
2290 session_name);
2291 goto end;
2292
2293 }
2294
2295 DBG("[notification-thread] Newly registered trigger bound to session \"%s\"",
2296 session_name);
2297 ret = lttng_session_trigger_list_add(trigger_list, trigger);
2298 end:
2299 return ret;
2300 }
2301
2302 /* Must be called with RCU read lock held. */
2303 static
2304 int bind_trigger_to_matching_channels(struct lttng_trigger *trigger,
2305 struct notification_thread_state *state)
2306 {
2307 int ret = 0;
2308 struct cds_lfht_node *node;
2309 struct cds_lfht_iter iter;
2310 struct channel_info *channel;
2311
2312 cds_lfht_for_each_entry(state->channels_ht, &iter, channel,
2313 channels_ht_node) {
2314 struct lttng_trigger_list_element *trigger_list_element;
2315 struct lttng_channel_trigger_list *trigger_list;
2316 struct cds_lfht_iter lookup_iter;
2317
2318 if (!trigger_applies_to_channel(trigger, channel)) {
2319 continue;
2320 }
2321
2322 cds_lfht_lookup(state->channel_triggers_ht,
2323 hash_channel_key(&channel->key),
2324 match_channel_trigger_list,
2325 &channel->key,
2326 &lookup_iter);
2327 node = cds_lfht_iter_get_node(&lookup_iter);
2328 assert(node);
2329 trigger_list = caa_container_of(node,
2330 struct lttng_channel_trigger_list,
2331 channel_triggers_ht_node);
2332
2333 trigger_list_element = zmalloc(sizeof(*trigger_list_element));
2334 if (!trigger_list_element) {
2335 ret = -1;
2336 goto end;
2337 }
2338 CDS_INIT_LIST_HEAD(&trigger_list_element->node);
2339 trigger_list_element->trigger = trigger;
2340 cds_list_add(&trigger_list_element->node, &trigger_list->list);
2341 DBG("[notification-thread] Newly registered trigger bound to channel \"%s\"",
2342 channel->name);
2343 }
2344 end:
2345 return ret;
2346 }
2347
2348 static
2349 bool is_trigger_action_notify(const struct lttng_trigger *trigger)
2350 {
2351 bool is_notify = false;
2352 unsigned int i, count;
2353 enum lttng_action_status action_status;
2354 const struct lttng_action *action =
2355 lttng_trigger_get_const_action(trigger);
2356 enum lttng_action_type action_type;
2357
2358 assert(action);
2359 action_type = lttng_action_get_type(action);
2360 if (action_type == LTTNG_ACTION_TYPE_NOTIFY) {
2361 is_notify = true;
2362 goto end;
2363 } else if (action_type != LTTNG_ACTION_TYPE_GROUP) {
2364 goto end;
2365 }
2366
2367 action_status = lttng_action_group_get_count(action, &count);
2368 assert(action_status == LTTNG_ACTION_STATUS_OK);
2369
2370 for (i = 0; i < count; i++) {
2371 const struct lttng_action *inner_action =
2372 lttng_action_group_get_at_index(
2373 action, i);
2374
2375 action_type = lttng_action_get_type(inner_action);
2376 if (action_type == LTTNG_ACTION_TYPE_NOTIFY) {
2377 is_notify = true;
2378 goto end;
2379 }
2380 }
2381
2382 end:
2383 return is_notify;
2384 }
2385
2386 static bool trigger_name_taken(struct notification_thread_state *state,
2387 const struct lttng_trigger *trigger)
2388 {
2389 struct cds_lfht_iter iter;
2390
2391 /*
2392 * No duplicata is allowed in the triggers_by_name_uid_ht.
2393 * The match is done against the trigger name and uid.
2394 */
2395 cds_lfht_lookup(state->triggers_by_name_uid_ht,
2396 hash_trigger_by_name_uid(trigger),
2397 match_trigger_by_name_uid,
2398 trigger,
2399 &iter);
2400 return !!cds_lfht_iter_get_node(&iter);
2401 }
2402
2403 static
2404 enum lttng_error_code generate_trigger_name(
2405 struct notification_thread_state *state,
2406 struct lttng_trigger *trigger, const char **name)
2407 {
2408 enum lttng_error_code ret_code = LTTNG_OK;
2409 bool taken = false;
2410 enum lttng_trigger_status status;
2411
2412 do {
2413 const int ret = lttng_trigger_generate_name(trigger,
2414 state->trigger_id.name_offset++);
2415 if (ret) {
2416 /* The only reason this can fail right now. */
2417 ret_code = LTTNG_ERR_NOMEM;
2418 break;
2419 }
2420
2421 status = lttng_trigger_get_name(trigger, name);
2422 assert(status == LTTNG_TRIGGER_STATUS_OK);
2423
2424 taken = trigger_name_taken(state, trigger);
2425 } while (taken || state->trigger_id.name_offset == UINT64_MAX);
2426
2427 return ret_code;
2428 }
2429
2430 /*
2431 * FIXME A client's credentials are not checked when registering a trigger.
2432 *
2433 * The effects of this are benign since:
2434 * - The client will succeed in registering the trigger, as it is valid,
2435 * - The trigger will, internally, be bound to the channel/session,
2436 * - The notifications will not be sent since the client's credentials
2437 * are checked against the channel at that moment.
2438 *
2439 * If this function returns a non-zero value, it means something is
2440 * fundamentally broken and the whole subsystem/thread will be torn down.
2441 *
2442 * If a non-fatal error occurs, just set the cmd_result to the appropriate
2443 * error code.
2444 */
2445 static
2446 int handle_notification_thread_command_register_trigger(
2447 struct notification_thread_state *state,
2448 struct lttng_trigger *trigger,
2449 enum lttng_error_code *cmd_result)
2450 {
2451 int ret = 0;
2452 struct lttng_condition *condition;
2453 struct notification_client *client;
2454 struct notification_client_list *client_list = NULL;
2455 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
2456 struct notification_client_list_element *client_list_element;
2457 struct notification_trigger_tokens_ht_element *trigger_tokens_ht_element = NULL;
2458 struct cds_lfht_node *node;
2459 struct cds_lfht_iter iter;
2460 const char* trigger_name;
2461 bool free_trigger = true;
2462 struct lttng_evaluation *evaluation = NULL;
2463 struct lttng_credentials object_creds;
2464 uid_t object_uid;
2465 gid_t object_gid;
2466 enum action_executor_status executor_status;
2467 const uint64_t trigger_tracer_token =
2468 state->trigger_id.next_tracer_token++;
2469
2470 rcu_read_lock();
2471
2472 /* Set the trigger's tracer token. */
2473 lttng_trigger_set_tracer_token(trigger, trigger_tracer_token);
2474
2475 if (lttng_trigger_get_name(trigger, &trigger_name) ==
2476 LTTNG_TRIGGER_STATUS_UNSET) {
2477 const enum lttng_error_code ret_code = generate_trigger_name(
2478 state, trigger, &trigger_name);
2479
2480 if (ret_code != LTTNG_OK) {
2481 /* Fatal error. */
2482 ret = -1;
2483 *cmd_result = ret_code;
2484 goto error;
2485 }
2486 } else if (trigger_name_taken(state, trigger)) {
2487 /* Not a fatal error. */
2488 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
2489 ret = 0;
2490 goto error;
2491 }
2492
2493 condition = lttng_trigger_get_condition(trigger);
2494 assert(condition);
2495
2496 /* Some conditions require tracers to implement a minimal ABI version. */
2497 if (!condition_is_supported(condition)) {
2498 *cmd_result = LTTNG_ERR_NOT_SUPPORTED;
2499 goto error;
2500 }
2501
2502 trigger_ht_element = zmalloc(sizeof(*trigger_ht_element));
2503 if (!trigger_ht_element) {
2504 ret = -1;
2505 goto error;
2506 }
2507
2508 /* Add trigger to the trigger_ht. */
2509 cds_lfht_node_init(&trigger_ht_element->node);
2510 cds_lfht_node_init(&trigger_ht_element->node_by_name_uid);
2511 trigger_ht_element->trigger = trigger;
2512
2513 node = cds_lfht_add_unique(state->triggers_ht,
2514 lttng_condition_hash(condition),
2515 match_trigger,
2516 trigger,
2517 &trigger_ht_element->node);
2518 if (node != &trigger_ht_element->node) {
2519 /* Not a fatal error, simply report it to the client. */
2520 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
2521 goto error_free_ht_element;
2522 }
2523
2524 node = cds_lfht_add_unique(state->triggers_by_name_uid_ht,
2525 hash_trigger_by_name_uid(trigger),
2526 match_trigger_by_name_uid,
2527 trigger,
2528 &trigger_ht_element->node_by_name_uid);
2529 if (node != &trigger_ht_element->node_by_name_uid) {
2530 /* Not a fatal error, simply report it to the client. */
2531 cds_lfht_del(state->triggers_ht, &trigger_ht_element->node);
2532 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
2533 goto error_free_ht_element;
2534 }
2535
2536 if (lttng_condition_get_type(condition) == LTTNG_CONDITION_TYPE_EVENT_RULE_HIT) {
2537 trigger_tokens_ht_element = zmalloc(sizeof(*trigger_tokens_ht_element));
2538 if (!trigger_tokens_ht_element) {
2539 /* Fatal error. */
2540 ret = -1;
2541 cds_lfht_del(state->triggers_ht,
2542 &trigger_ht_element->node);
2543 cds_lfht_del(state->triggers_by_name_uid_ht,
2544 &trigger_ht_element->node_by_name_uid);
2545 goto error_free_ht_element;
2546 }
2547
2548 /* Add trigger token to the trigger_tokens_ht. */
2549 cds_lfht_node_init(&trigger_tokens_ht_element->node);
2550 trigger_tokens_ht_element->token =
2551 LTTNG_OPTIONAL_GET(trigger->tracer_token);
2552 trigger_tokens_ht_element->trigger = trigger;
2553
2554 node = cds_lfht_add_unique(state->trigger_tokens_ht,
2555 hash_key_u64(&trigger_tokens_ht_element->token,
2556 lttng_ht_seed),
2557 match_trigger_token,
2558 &trigger_tokens_ht_element->token,
2559 &trigger_tokens_ht_element->node);
2560 if (node != &trigger_tokens_ht_element->node) {
2561 /* Internal corruption, fatal error. */
2562 ret = -1;
2563 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
2564 cds_lfht_del(state->triggers_ht,
2565 &trigger_ht_element->node);
2566 cds_lfht_del(state->triggers_by_name_uid_ht,
2567 &trigger_ht_element->node_by_name_uid);
2568 goto error_free_ht_element;
2569 }
2570 }
2571
2572 /*
2573 * Ownership of the trigger and of its wrapper was transfered to
2574 * the triggers_ht. Same for token ht element if necessary.
2575 */
2576 trigger_tokens_ht_element = NULL;
2577 trigger_ht_element = NULL;
2578 free_trigger = false;
2579
2580 /*
2581 * The rest only applies to triggers that have a "notify" action.
2582 * It is not skipped as this is the only action type currently
2583 * supported.
2584 */
2585 if (is_trigger_action_notify(trigger)) {
2586 client_list = notification_client_list_create(trigger);
2587 if (!client_list) {
2588 ret = -1;
2589 goto error_free_ht_element;
2590 }
2591
2592 /* Build a list of clients to which this new trigger applies. */
2593 cds_lfht_for_each_entry (state->client_socket_ht, &iter, client,
2594 client_socket_ht_node) {
2595 if (!trigger_applies_to_client(trigger, client)) {
2596 continue;
2597 }
2598
2599 client_list_element =
2600 zmalloc(sizeof(*client_list_element));
2601 if (!client_list_element) {
2602 ret = -1;
2603 goto error_put_client_list;
2604 }
2605
2606 CDS_INIT_LIST_HEAD(&client_list_element->node);
2607 client_list_element->client = client;
2608 cds_list_add(&client_list_element->node,
2609 &client_list->list);
2610 }
2611
2612 /*
2613 * Client list ownership transferred to the
2614 * notification_trigger_clients_ht.
2615 */
2616 publish_notification_client_list(state, client_list);
2617 }
2618
2619 switch (get_condition_binding_object(condition)) {
2620 case LTTNG_OBJECT_TYPE_SESSION:
2621 /* Add the trigger to the list if it matches a known session. */
2622 ret = bind_trigger_to_matching_session(trigger, state);
2623 if (ret) {
2624 goto error_put_client_list;
2625 }
2626 break;
2627 case LTTNG_OBJECT_TYPE_CHANNEL:
2628 /*
2629 * Add the trigger to list of triggers bound to the channels
2630 * currently known.
2631 */
2632 ret = bind_trigger_to_matching_channels(trigger, state);
2633 if (ret) {
2634 goto error_put_client_list;
2635 }
2636 break;
2637 case LTTNG_OBJECT_TYPE_NONE:
2638 break;
2639 default:
2640 ERR("Unknown object type on which to bind a newly registered trigger was encountered");
2641 ret = -1;
2642 goto error_put_client_list;
2643 }
2644
2645 /*
2646 * The new trigger's condition must be evaluated against the current
2647 * state.
2648 *
2649 * In the case of `notify` action, nothing preventing clients from
2650 * subscribing to a condition before the corresponding trigger is
2651 * registered, we have to evaluate this new condition right away.
2652 *
2653 * At some point, we were waiting for the next "evaluation" (e.g. on
2654 * reception of a channel sample) to evaluate this new condition, but
2655 * that was broken.
2656 *
2657 * The reason it was broken is that waiting for the next sample
2658 * does not allow us to properly handle transitions for edge-triggered
2659 * conditions.
2660 *
2661 * Consider this example: when we handle a new channel sample, we
2662 * evaluate each conditions twice: once with the previous state, and
2663 * again with the newest state. We then use those two results to
2664 * determine whether a state change happened: a condition was false and
2665 * became true. If a state change happened, we have to notify clients.
2666 *
2667 * Now, if a client subscribes to a given notification and registers
2668 * a trigger *after* that subscription, we have to make sure the
2669 * condition is evaluated at this point while considering only the
2670 * current state. Otherwise, the next evaluation cycle may only see
2671 * that the evaluations remain the same (true for samples n-1 and n) and
2672 * the client will never know that the condition has been met.
2673 */
2674 switch (get_condition_binding_object(condition)) {
2675 case LTTNG_OBJECT_TYPE_SESSION:
2676 ret = evaluate_session_condition_for_client(condition, state,
2677 &evaluation, &object_uid,
2678 &object_gid);
2679 LTTNG_OPTIONAL_SET(&object_creds.uid, object_uid);
2680 LTTNG_OPTIONAL_SET(&object_creds.gid, object_gid);
2681 break;
2682 case LTTNG_OBJECT_TYPE_CHANNEL:
2683 ret = evaluate_channel_condition_for_client(condition, state,
2684 &evaluation, &object_uid,
2685 &object_gid);
2686 LTTNG_OPTIONAL_SET(&object_creds.uid, object_uid);
2687 LTTNG_OPTIONAL_SET(&object_creds.gid, object_gid);
2688 break;
2689 case LTTNG_OBJECT_TYPE_NONE:
2690 ret = 0;
2691 break;
2692 case LTTNG_OBJECT_TYPE_UNKNOWN:
2693 default:
2694 ret = -1;
2695 break;
2696 }
2697
2698 if (ret) {
2699 /* Fatal error. */
2700 goto error_put_client_list;
2701 }
2702
2703 DBG("Newly registered trigger's condition evaluated to %s",
2704 evaluation ? "true" : "false");
2705 if (!evaluation) {
2706 /* Evaluation yielded nothing. Normal exit. */
2707 ret = 0;
2708 goto end;
2709 }
2710
2711 /*
2712 * Ownership of `evaluation` transferred to the action executor
2713 * no matter the result.
2714 */
2715 executor_status = action_executor_enqueue(state->executor, trigger,
2716 evaluation, &object_creds, client_list);
2717 evaluation = NULL;
2718 switch (executor_status) {
2719 case ACTION_EXECUTOR_STATUS_OK:
2720 break;
2721 case ACTION_EXECUTOR_STATUS_ERROR:
2722 case ACTION_EXECUTOR_STATUS_INVALID:
2723 /*
2724 * TODO Add trigger identification (name/id) when
2725 * it is added to the API.
2726 */
2727 ERR("Fatal error occurred while enqueuing action associated to newly registered trigger");
2728 ret = -1;
2729 goto error_put_client_list;
2730 case ACTION_EXECUTOR_STATUS_OVERFLOW:
2731 /*
2732 * TODO Add trigger identification (name/id) when
2733 * it is added to the API.
2734 *
2735 * Not a fatal error.
2736 */
2737 WARN("No space left when enqueuing action associated to newly registered trigger");
2738 ret = 0;
2739 goto end;
2740 default:
2741 abort();
2742 }
2743
2744 end:
2745 *cmd_result = LTTNG_OK;
2746 DBG("Registered trigger: name = `%s`, tracer token = %" PRIu64,
2747 trigger_name, trigger_tracer_token);
2748
2749 error_put_client_list:
2750 notification_client_list_put(client_list);
2751
2752 error_free_ht_element:
2753 if (trigger_ht_element) {
2754 /* Delayed removal due to RCU constraint on delete. */
2755 call_rcu(&trigger_ht_element->rcu_node,
2756 free_lttng_trigger_ht_element_rcu);
2757 }
2758
2759 free(trigger_tokens_ht_element);
2760 error:
2761 if (free_trigger) {
2762 lttng_trigger_destroy(trigger);
2763 }
2764 rcu_read_unlock();
2765 return ret;
2766 }
2767
2768 static
2769 void free_lttng_trigger_ht_element_rcu(struct rcu_head *node)
2770 {
2771 free(caa_container_of(node, struct lttng_trigger_ht_element,
2772 rcu_node));
2773 }
2774
2775 static
2776 void free_notification_trigger_tokens_ht_element_rcu(struct rcu_head *node)
2777 {
2778 free(caa_container_of(node, struct notification_trigger_tokens_ht_element,
2779 rcu_node));
2780 }
2781
2782 static
2783 int handle_notification_thread_command_unregister_trigger(
2784 struct notification_thread_state *state,
2785 const struct lttng_trigger *trigger,
2786 enum lttng_error_code *_cmd_reply)
2787 {
2788 struct cds_lfht_iter iter;
2789 struct cds_lfht_node *triggers_ht_node;
2790 struct lttng_channel_trigger_list *trigger_list;
2791 struct notification_client_list *client_list;
2792 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
2793 const struct lttng_condition *condition = lttng_trigger_get_const_condition(
2794 trigger);
2795 enum lttng_error_code cmd_reply;
2796
2797 rcu_read_lock();
2798
2799 cds_lfht_lookup(state->triggers_ht,
2800 lttng_condition_hash(condition),
2801 match_trigger,
2802 trigger,
2803 &iter);
2804 triggers_ht_node = cds_lfht_iter_get_node(&iter);
2805 if (!triggers_ht_node) {
2806 cmd_reply = LTTNG_ERR_TRIGGER_NOT_FOUND;
2807 goto end;
2808 } else {
2809 cmd_reply = LTTNG_OK;
2810 }
2811
2812 /* Remove trigger from channel_triggers_ht. */
2813 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter, trigger_list,
2814 channel_triggers_ht_node) {
2815 struct lttng_trigger_list_element *trigger_element, *tmp;
2816
2817 cds_list_for_each_entry_safe(trigger_element, tmp,
2818 &trigger_list->list, node) {
2819 if (!lttng_trigger_is_equal(trigger, trigger_element->trigger)) {
2820 continue;
2821 }
2822
2823 DBG("[notification-thread] Removed trigger from channel_triggers_ht");
2824 cds_list_del(&trigger_element->node);
2825 /* A trigger can only appear once per channel */
2826 break;
2827 }
2828 }
2829
2830 if (lttng_condition_get_type(condition) ==
2831 LTTNG_CONDITION_TYPE_EVENT_RULE_HIT) {
2832 struct notification_trigger_tokens_ht_element
2833 *trigger_tokens_ht_element;
2834
2835 cds_lfht_for_each_entry (state->trigger_tokens_ht, &iter,
2836 trigger_tokens_ht_element, node) {
2837 if (!lttng_trigger_is_equal(trigger,
2838 trigger_tokens_ht_element->trigger)) {
2839 continue;
2840 }
2841
2842 DBG("[notification-thread] Removed trigger from tokens_ht");
2843 cds_lfht_del(state->trigger_tokens_ht,
2844 &trigger_tokens_ht_element->node);
2845 call_rcu(&trigger_tokens_ht_element->rcu_node,
2846 free_notification_trigger_tokens_ht_element_rcu);
2847
2848 break;
2849 }
2850 }
2851
2852 if (is_trigger_action_notify(trigger)) {
2853 /*
2854 * Remove and release the client list from
2855 * notification_trigger_clients_ht.
2856 */
2857 client_list = get_client_list_from_condition(state, condition);
2858 assert(client_list);
2859
2860 /* Put new reference and the hashtable's reference. */
2861 notification_client_list_put(client_list);
2862 notification_client_list_put(client_list);
2863 client_list = NULL;
2864 }
2865
2866 /* Remove trigger from triggers_ht. */
2867 trigger_ht_element = caa_container_of(triggers_ht_node,
2868 struct lttng_trigger_ht_element, node);
2869 cds_lfht_del(state->triggers_by_name_uid_ht, &trigger_ht_element->node_by_name_uid);
2870 cds_lfht_del(state->triggers_ht, triggers_ht_node);
2871
2872 /* Release the ownership of the trigger. */
2873 lttng_trigger_destroy(trigger_ht_element->trigger);
2874 call_rcu(&trigger_ht_element->rcu_node, free_lttng_trigger_ht_element_rcu);
2875 end:
2876 rcu_read_unlock();
2877 if (_cmd_reply) {
2878 *_cmd_reply = cmd_reply;
2879 }
2880 return 0;
2881 }
2882
2883 /* Returns 0 on success, 1 on exit requested, negative value on error. */
2884 int handle_notification_thread_command(
2885 struct notification_thread_handle *handle,
2886 struct notification_thread_state *state)
2887 {
2888 int ret;
2889 uint64_t counter;
2890 struct notification_thread_command *cmd;
2891
2892 /* Read the event pipe to put it back into a quiescent state. */
2893 ret = lttng_read(lttng_pipe_get_readfd(handle->cmd_queue.event_pipe), &counter,
2894 sizeof(counter));
2895 if (ret != sizeof(counter)) {
2896 goto error;
2897 }
2898
2899 pthread_mutex_lock(&handle->cmd_queue.lock);
2900 cmd = cds_list_first_entry(&handle->cmd_queue.list,
2901 struct notification_thread_command, cmd_list_node);
2902 cds_list_del(&cmd->cmd_list_node);
2903 pthread_mutex_unlock(&handle->cmd_queue.lock);
2904
2905 DBG("[notification-thread] Received `%s` command",
2906 notification_command_type_str(cmd->type));
2907 switch (cmd->type) {
2908 case NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER:
2909 ret = handle_notification_thread_command_register_trigger(state,
2910 cmd->parameters.register_trigger.trigger,
2911 &cmd->reply_code);
2912 break;
2913 case NOTIFICATION_COMMAND_TYPE_UNREGISTER_TRIGGER:
2914 ret = handle_notification_thread_command_unregister_trigger(
2915 state,
2916 cmd->parameters.unregister_trigger.trigger,
2917 &cmd->reply_code);
2918 break;
2919 case NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL:
2920 ret = handle_notification_thread_command_add_channel(
2921 state,
2922 cmd->parameters.add_channel.session.name,
2923 cmd->parameters.add_channel.session.uid,
2924 cmd->parameters.add_channel.session.gid,
2925 cmd->parameters.add_channel.channel.name,
2926 cmd->parameters.add_channel.channel.domain,
2927 cmd->parameters.add_channel.channel.key,
2928 cmd->parameters.add_channel.channel.capacity,
2929 &cmd->reply_code);
2930 break;
2931 case NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL:
2932 ret = handle_notification_thread_command_remove_channel(
2933 state, cmd->parameters.remove_channel.key,
2934 cmd->parameters.remove_channel.domain,
2935 &cmd->reply_code);
2936 break;
2937 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING:
2938 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED:
2939 ret = handle_notification_thread_command_session_rotation(
2940 state,
2941 cmd->type,
2942 cmd->parameters.session_rotation.session_name,
2943 cmd->parameters.session_rotation.uid,
2944 cmd->parameters.session_rotation.gid,
2945 cmd->parameters.session_rotation.trace_archive_chunk_id,
2946 cmd->parameters.session_rotation.location,
2947 &cmd->reply_code);
2948 break;
2949 case NOTIFICATION_COMMAND_TYPE_ADD_TRACER_EVENT_SOURCE:
2950 ret = handle_notification_thread_command_add_tracer_event_source(
2951 state,
2952 cmd->parameters.tracer_event_source.tracer_event_source_fd,
2953 cmd->parameters.tracer_event_source.domain,
2954 &cmd->reply_code);
2955 break;
2956 case NOTIFICATION_COMMAND_TYPE_REMOVE_TRACER_EVENT_SOURCE:
2957 ret = handle_notification_thread_command_remove_tracer_event_source(
2958 state,
2959 cmd->parameters.tracer_event_source.tracer_event_source_fd,
2960 &cmd->reply_code);
2961 break;
2962 case NOTIFICATION_COMMAND_TYPE_LIST_TRIGGERS:
2963 {
2964 struct lttng_triggers *triggers = NULL;
2965
2966 ret = handle_notification_thread_command_list_triggers(
2967 handle,
2968 state,
2969 cmd->parameters.list_triggers.uid,
2970 &triggers,
2971 &cmd->reply_code);
2972 cmd->reply.list_triggers.triggers = triggers;
2973 ret = 0;
2974 break;
2975 }
2976 case NOTIFICATION_COMMAND_TYPE_QUIT:
2977 cmd->reply_code = LTTNG_OK;
2978 ret = 1;
2979 goto end;
2980 case NOTIFICATION_COMMAND_TYPE_CLIENT_COMMUNICATION_UPDATE:
2981 {
2982 const enum client_transmission_status client_status =
2983 cmd->parameters.client_communication_update
2984 .status;
2985 const notification_client_id client_id =
2986 cmd->parameters.client_communication_update.id;
2987 struct notification_client *client;
2988
2989 rcu_read_lock();
2990 client = get_client_from_id(client_id, state);
2991
2992 if (!client) {
2993 /*
2994 * Client error was probably already picked-up by the
2995 * notification thread or it has disconnected
2996 * gracefully while this command was queued.
2997 */
2998 DBG("Failed to find notification client to update communication status, client id = %" PRIu64,
2999 client_id);
3000 ret = 0;
3001 } else {
3002 ret = client_handle_transmission_status(
3003 client, client_status, state);
3004 }
3005 rcu_read_unlock();
3006 break;
3007 }
3008 default:
3009 ERR("[notification-thread] Unknown internal command received");
3010 goto error_unlock;
3011 }
3012
3013 if (ret) {
3014 goto error_unlock;
3015 }
3016 end:
3017 if (cmd->is_async) {
3018 free(cmd);
3019 cmd = NULL;
3020 } else {
3021 lttng_waiter_wake_up(&cmd->reply_waiter);
3022 }
3023 return ret;
3024 error_unlock:
3025 /* Wake-up and return a fatal error to the calling thread. */
3026 lttng_waiter_wake_up(&cmd->reply_waiter);
3027 cmd->reply_code = LTTNG_ERR_FATAL;
3028 error:
3029 /* Indicate a fatal error to the caller. */
3030 return -1;
3031 }
3032
3033 static
3034 int socket_set_non_blocking(int socket)
3035 {
3036 int ret, flags;
3037
3038 /* Set the pipe as non-blocking. */
3039 ret = fcntl(socket, F_GETFL, 0);
3040 if (ret == -1) {
3041 PERROR("fcntl get socket flags");
3042 goto end;
3043 }
3044 flags = ret;
3045
3046 ret = fcntl(socket, F_SETFL, flags | O_NONBLOCK);
3047 if (ret == -1) {
3048 PERROR("fcntl set O_NONBLOCK socket flag");
3049 goto end;
3050 }
3051 DBG("Client socket (fd = %i) set as non-blocking", socket);
3052 end:
3053 return ret;
3054 }
3055
3056 static
3057 int client_reset_inbound_state(struct notification_client *client)
3058 {
3059 int ret;
3060
3061
3062 lttng_payload_clear(&client->communication.inbound.payload);
3063
3064 client->communication.inbound.bytes_to_receive =
3065 sizeof(struct lttng_notification_channel_message);
3066 client->communication.inbound.msg_type =
3067 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN;
3068 LTTNG_SOCK_SET_UID_CRED(&client->communication.inbound.creds, -1);
3069 LTTNG_SOCK_SET_GID_CRED(&client->communication.inbound.creds, -1);
3070 ret = lttng_dynamic_buffer_set_size(
3071 &client->communication.inbound.payload.buffer,
3072 client->communication.inbound.bytes_to_receive);
3073
3074 return ret;
3075 }
3076
3077 int handle_notification_thread_client_connect(
3078 struct notification_thread_state *state)
3079 {
3080 int ret;
3081 struct notification_client *client;
3082
3083 DBG("[notification-thread] Handling new notification channel client connection");
3084
3085 client = zmalloc(sizeof(*client));
3086 if (!client) {
3087 /* Fatal error. */
3088 ret = -1;
3089 goto error;
3090 }
3091
3092 pthread_mutex_init(&client->lock, NULL);
3093 client->id = state->next_notification_client_id++;
3094 CDS_INIT_LIST_HEAD(&client->condition_list);
3095 lttng_payload_init(&client->communication.inbound.payload);
3096 lttng_payload_init(&client->communication.outbound.payload);
3097 client->communication.inbound.expect_creds = true;
3098
3099 ret = client_reset_inbound_state(client);
3100 if (ret) {
3101 ERR("[notification-thread] Failed to reset client communication's inbound state");
3102 ret = 0;
3103 goto error;
3104 }
3105
3106 ret = lttcomm_accept_unix_sock(state->notification_channel_socket);
3107 if (ret < 0) {
3108 ERR("[notification-thread] Failed to accept new notification channel client connection");
3109 ret = 0;
3110 goto error;
3111 }
3112
3113 client->socket = ret;
3114
3115 ret = socket_set_non_blocking(client->socket);
3116 if (ret) {
3117 ERR("[notification-thread] Failed to set new notification channel client connection socket as non-blocking");
3118 goto error;
3119 }
3120
3121 ret = lttcomm_setsockopt_creds_unix_sock(client->socket);
3122 if (ret < 0) {
3123 ERR("[notification-thread] Failed to set socket options on new notification channel client socket");
3124 ret = 0;
3125 goto error;
3126 }
3127
3128 ret = lttng_poll_add(&state->events, client->socket,
3129 LPOLLIN | LPOLLERR |
3130 LPOLLHUP | LPOLLRDHUP);
3131 if (ret < 0) {
3132 ERR("[notification-thread] Failed to add notification channel client socket to poll set");
3133 ret = 0;
3134 goto error;
3135 }
3136 DBG("[notification-thread] Added new notification channel client socket (%i) to poll set",
3137 client->socket);
3138
3139 rcu_read_lock();
3140 cds_lfht_add(state->client_socket_ht,
3141 hash_client_socket(client->socket),
3142 &client->client_socket_ht_node);
3143 cds_lfht_add(state->client_id_ht,
3144 hash_client_id(client->id),
3145 &client->client_id_ht_node);
3146 rcu_read_unlock();
3147
3148 return ret;
3149
3150 error:
3151 notification_client_destroy(client, state);
3152 return ret;
3153 }
3154
3155 /*
3156 * RCU read-lock must be held by the caller.
3157 * Client lock must _not_ be held by the caller.
3158 */
3159 static
3160 int notification_thread_client_disconnect(
3161 struct notification_client *client,
3162 struct notification_thread_state *state)
3163 {
3164 int ret;
3165 struct lttng_condition_list_element *condition_list_element, *tmp;
3166
3167 /* Acquire the client lock to disable its communication atomically. */
3168 pthread_mutex_lock(&client->lock);
3169 client->communication.active = false;
3170 cds_lfht_del(state->client_socket_ht, &client->client_socket_ht_node);
3171 cds_lfht_del(state->client_id_ht, &client->client_id_ht_node);
3172 pthread_mutex_unlock(&client->lock);
3173
3174 ret = lttng_poll_del(&state->events, client->socket);
3175 if (ret) {
3176 ERR("[notification-thread] Failed to remove client socket %d from poll set",
3177 client->socket);
3178 }
3179
3180 /* Release all conditions to which the client was subscribed. */
3181 cds_list_for_each_entry_safe(condition_list_element, tmp,
3182 &client->condition_list, node) {
3183 (void) notification_thread_client_unsubscribe(client,
3184 condition_list_element->condition, state, NULL);
3185 }
3186
3187 /*
3188 * Client no longer accessible to other threads (through the
3189 * client lists).
3190 */
3191 notification_client_destroy(client, state);
3192 return ret;
3193 }
3194
3195 int handle_notification_thread_client_disconnect(
3196 int client_socket, struct notification_thread_state *state)
3197 {
3198 int ret = 0;
3199 struct notification_client *client;
3200
3201 rcu_read_lock();
3202 DBG("[notification-thread] Closing client connection (socket fd = %i)",
3203 client_socket);
3204 client = get_client_from_socket(client_socket, state);
3205 if (!client) {
3206 /* Internal state corruption, fatal error. */
3207 ERR("[notification-thread] Unable to find client (socket fd = %i)",
3208 client_socket);
3209 ret = -1;
3210 goto end;
3211 }
3212
3213 ret = notification_thread_client_disconnect(client, state);
3214 end:
3215 rcu_read_unlock();
3216 return ret;
3217 }
3218
3219 int handle_notification_thread_client_disconnect_all(
3220 struct notification_thread_state *state)
3221 {
3222 struct cds_lfht_iter iter;
3223 struct notification_client *client;
3224 bool error_encoutered = false;
3225
3226 rcu_read_lock();
3227 DBG("[notification-thread] Closing all client connections");
3228 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
3229 client_socket_ht_node) {
3230 int ret;
3231
3232 ret = notification_thread_client_disconnect(
3233 client, state);
3234 if (ret) {
3235 error_encoutered = true;
3236 }
3237 }
3238 rcu_read_unlock();
3239 return error_encoutered ? 1 : 0;
3240 }
3241
3242 int handle_notification_thread_trigger_unregister_all(
3243 struct notification_thread_state *state)
3244 {
3245 bool error_occurred = false;
3246 struct cds_lfht_iter iter;
3247 struct lttng_trigger_ht_element *trigger_ht_element;
3248
3249 rcu_read_lock();
3250 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
3251 node) {
3252 int ret = handle_notification_thread_command_unregister_trigger(
3253 state, trigger_ht_element->trigger, NULL);
3254 if (ret) {
3255 error_occurred = true;
3256 }
3257 }
3258 rcu_read_unlock();
3259 return error_occurred ? -1 : 0;
3260 }
3261
3262 static
3263 int client_handle_transmission_status(
3264 struct notification_client *client,
3265 enum client_transmission_status transmission_status,
3266 struct notification_thread_state *state)
3267 {
3268 int ret = 0;
3269
3270 switch (transmission_status) {
3271 case CLIENT_TRANSMISSION_STATUS_COMPLETE:
3272 ret = lttng_poll_mod(&state->events, client->socket,
3273 CLIENT_POLL_MASK_IN);
3274 if (ret) {
3275 goto end;
3276 }
3277
3278 break;
3279 case CLIENT_TRANSMISSION_STATUS_QUEUED:
3280 /*
3281 * We want to be notified whenever there is buffer space
3282 * available to send the rest of the payload.
3283 */
3284 ret = lttng_poll_mod(&state->events, client->socket,
3285 CLIENT_POLL_MASK_IN_OUT);
3286 if (ret) {
3287 goto end;
3288 }
3289 break;
3290 case CLIENT_TRANSMISSION_STATUS_FAIL:
3291 ret = notification_thread_client_disconnect(client, state);
3292 if (ret) {
3293 goto end;
3294 }
3295 break;
3296 case CLIENT_TRANSMISSION_STATUS_ERROR:
3297 ret = -1;
3298 goto end;
3299 default:
3300 abort();
3301 }
3302 end:
3303 return ret;
3304 }
3305
3306 /* Client lock must be acquired by caller. */
3307 static
3308 enum client_transmission_status client_flush_outgoing_queue(
3309 struct notification_client *client)
3310 {
3311 ssize_t ret;
3312 size_t to_send_count;
3313 enum client_transmission_status status;
3314 struct lttng_payload_view pv = lttng_payload_view_from_payload(
3315 &client->communication.outbound.payload, 0, -1);
3316 const int fds_to_send_count =
3317 lttng_payload_view_get_fd_handle_count(&pv);
3318
3319 ASSERT_LOCKED(client->lock);
3320
3321 if (!client->communication.active) {
3322 status = CLIENT_TRANSMISSION_STATUS_FAIL;
3323 goto end;
3324 }
3325
3326 if (pv.buffer.size == 0) {
3327 /*
3328 * If both data and fds are equal to zero, we are in an invalid
3329 * state.
3330 */
3331 assert(fds_to_send_count != 0);
3332 goto send_fds;
3333 }
3334
3335 /* Send data. */
3336 to_send_count = pv.buffer.size;
3337 DBG("[notification-thread] Flushing client (socket fd = %i) outgoing queue",
3338 client->socket);
3339
3340 ret = lttcomm_send_unix_sock_non_block(client->socket,
3341 pv.buffer.data,
3342 to_send_count);
3343 if ((ret >= 0 && ret < to_send_count)) {
3344 DBG("[notification-thread] Client (socket fd = %i) outgoing queue could not be completely flushed",
3345 client->socket);
3346 to_send_count -= max(ret, 0);
3347
3348 memmove(client->communication.outbound.payload.buffer.data,
3349 pv.buffer.data +
3350 pv.buffer.size - to_send_count,
3351 to_send_count);
3352 ret = lttng_dynamic_buffer_set_size(
3353 &client->communication.outbound.payload.buffer,
3354 to_send_count);
3355 if (ret) {
3356 goto error;
3357 }
3358
3359 status = CLIENT_TRANSMISSION_STATUS_QUEUED;
3360 goto end;
3361 } else if (ret < 0) {
3362 /* Generic error, disable the client's communication. */
3363 ERR("[notification-thread] Failed to flush outgoing queue, disconnecting client (socket fd = %i)",
3364 client->socket);
3365 client->communication.active = false;
3366 status = CLIENT_TRANSMISSION_STATUS_FAIL;
3367 goto end;
3368 } else {
3369 /*
3370 * No error and flushed the queue completely.
3371 *
3372 * The payload buffer size is used later to
3373 * check if there is notifications queued. So albeit that the
3374 * direct caller knows that the transmission is complete, we
3375 * need to set the buffer size to zero.
3376 */
3377 ret = lttng_dynamic_buffer_set_size(
3378 &client->communication.outbound.payload.buffer, 0);
3379 if (ret) {
3380 goto error;
3381 }
3382 }
3383
3384 send_fds:
3385 /* No fds to send, transmission is complete. */
3386 if (fds_to_send_count == 0) {
3387 status = CLIENT_TRANSMISSION_STATUS_COMPLETE;
3388 goto end;
3389 }
3390
3391 ret = lttcomm_send_payload_view_fds_unix_sock_non_block(
3392 client->socket, &pv);
3393 if (ret < 0) {
3394 /* Generic error, disable the client's communication. */
3395 ERR("[notification-thread] Failed to flush outgoing fds queue, disconnecting client (socket fd = %i)",
3396 client->socket);
3397 client->communication.active = false;
3398 status = CLIENT_TRANSMISSION_STATUS_FAIL;
3399 goto end;
3400 } else if (ret == 0) {
3401 /* Nothing could be sent. */
3402 status = CLIENT_TRANSMISSION_STATUS_QUEUED;
3403 } else {
3404 /* Fd passing is an all or nothing kind of thing. */
3405 status = CLIENT_TRANSMISSION_STATUS_COMPLETE;
3406 /*
3407 * The payload _fd_array count is used later to
3408 * check if there is notifications queued. So although the
3409 * direct caller knows that the transmission is complete, we
3410 * need to clear the _fd_array for the queuing check.
3411 */
3412 lttng_dynamic_pointer_array_clear(
3413 &client->communication.outbound.payload
3414 ._fd_handles);
3415 }
3416
3417 end:
3418 if (status == CLIENT_TRANSMISSION_STATUS_COMPLETE) {
3419 client->communication.outbound.queued_command_reply = false;
3420 client->communication.outbound.dropped_notification = false;
3421 lttng_payload_clear(&client->communication.outbound.payload);
3422 }
3423
3424 return status;
3425 error:
3426 return CLIENT_TRANSMISSION_STATUS_ERROR;
3427 }
3428
3429 static
3430 bool client_has_outbound_data_left(
3431 const struct notification_client *client)
3432 {
3433 const struct lttng_payload_view pv = lttng_payload_view_from_payload(
3434 &client->communication.outbound.payload, 0, -1);
3435 const bool has_data = pv.buffer.size != 0;
3436 const bool has_fds = lttng_payload_view_get_fd_handle_count(&pv);
3437
3438 return has_data || has_fds;
3439 }
3440
3441 /* Client lock must _not_ be held by the caller. */
3442 static
3443 int client_send_command_reply(struct notification_client *client,
3444 struct notification_thread_state *state,
3445 enum lttng_notification_channel_status status)
3446 {
3447 int ret;
3448 struct lttng_notification_channel_command_reply reply = {
3449 .status = (int8_t) status,
3450 };
3451 struct lttng_notification_channel_message msg = {
3452 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY,
3453 .size = sizeof(reply),
3454 };
3455 char buffer[sizeof(msg) + sizeof(reply)];
3456 enum client_transmission_status transmission_status;
3457
3458 memcpy(buffer, &msg, sizeof(msg));
3459 memcpy(buffer + sizeof(msg), &reply, sizeof(reply));
3460 DBG("[notification-thread] Send command reply (%i)", (int) status);
3461
3462 pthread_mutex_lock(&client->lock);
3463 if (client->communication.outbound.queued_command_reply) {
3464 /* Protocol error. */
3465 goto error_unlock;
3466 }
3467
3468 /* Enqueue buffer to outgoing queue and flush it. */
3469 ret = lttng_dynamic_buffer_append(
3470 &client->communication.outbound.payload.buffer,
3471 buffer, sizeof(buffer));
3472 if (ret) {
3473 goto error_unlock;
3474 }
3475
3476 transmission_status = client_flush_outgoing_queue(client);
3477
3478 if (client_has_outbound_data_left(client)) {
3479 /* Queue could not be emptied. */
3480 client->communication.outbound.queued_command_reply = true;
3481 }
3482
3483 pthread_mutex_unlock(&client->lock);
3484 ret = client_handle_transmission_status(
3485 client, transmission_status, state);
3486 if (ret) {
3487 goto error;
3488 }
3489
3490 return 0;
3491 error_unlock:
3492 pthread_mutex_unlock(&client->lock);
3493 error:
3494 return -1;
3495 }
3496
3497 static
3498 int client_handle_message_unknown(struct notification_client *client,
3499 struct notification_thread_state *state)
3500 {
3501 int ret;
3502 /*
3503 * Receiving message header. The function will be called again
3504 * once the rest of the message as been received and can be
3505 * interpreted.
3506 */
3507 const struct lttng_notification_channel_message *msg;
3508
3509 assert(sizeof(*msg) == client->communication.inbound.payload.buffer.size);
3510 msg = (const struct lttng_notification_channel_message *)
3511 client->communication.inbound.payload.buffer.data;
3512
3513 if (msg->size == 0 ||
3514 msg->size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) {
3515 ERR("[notification-thread] Invalid notification channel message: length = %u",
3516 msg->size);
3517 ret = -1;
3518 goto end;
3519 }
3520
3521 switch (msg->type) {
3522 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
3523 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
3524 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
3525 break;
3526 default:
3527 ret = -1;
3528 ERR("[notification-thread] Invalid notification channel message: unexpected message type");
3529 goto end;
3530 }
3531
3532 client->communication.inbound.bytes_to_receive = msg->size;
3533 client->communication.inbound.fds_to_receive = msg->fds;
3534 client->communication.inbound.msg_type =
3535 (enum lttng_notification_channel_message_type) msg->type;
3536 ret = lttng_dynamic_buffer_set_size(
3537 &client->communication.inbound.payload.buffer, msg->size);
3538
3539 /* msg is not valid anymore due to lttng_dynamic_buffer_set_size. */
3540 msg = NULL;
3541 end:
3542 return ret;
3543 }
3544
3545 static
3546 int client_handle_message_handshake(struct notification_client *client,
3547 struct notification_thread_state *state)
3548 {
3549 int ret;
3550 struct lttng_notification_channel_command_handshake *handshake_client;
3551 const struct lttng_notification_channel_command_handshake handshake_reply = {
3552 .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
3553 .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
3554 };
3555 const struct lttng_notification_channel_message msg_header = {
3556 .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE,
3557 .size = sizeof(handshake_reply),
3558 };
3559 enum lttng_notification_channel_status status =
3560 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
3561 char send_buffer[sizeof(msg_header) + sizeof(handshake_reply)];
3562
3563 memcpy(send_buffer, &msg_header, sizeof(msg_header));
3564 memcpy(send_buffer + sizeof(msg_header), &handshake_reply,
3565 sizeof(handshake_reply));
3566
3567 handshake_client =
3568 (struct lttng_notification_channel_command_handshake *)
3569 client->communication.inbound.payload.buffer
3570 .data;
3571 client->major = handshake_client->major;
3572 client->minor = handshake_client->minor;
3573 if (!client->communication.inbound.creds_received) {
3574 ERR("[notification-thread] No credentials received from client");
3575 ret = -1;
3576 goto end;
3577 }
3578
3579 client->uid = LTTNG_SOCK_GET_UID_CRED(
3580 &client->communication.inbound.creds);
3581 client->gid = LTTNG_SOCK_GET_GID_CRED(
3582 &client->communication.inbound.creds);
3583 DBG("[notification-thread] Received handshake from client (uid = %u, gid = %u) with version %i.%i",
3584 client->uid, client->gid, (int) client->major,
3585 (int) client->minor);
3586
3587 if (handshake_client->major !=
3588 LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) {
3589 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNSUPPORTED_VERSION;
3590 }
3591
3592 pthread_mutex_lock(&client->lock);
3593 /* Outgoing queue will be flushed when the command reply is sent. */
3594 ret = lttng_dynamic_buffer_append(
3595 &client->communication.outbound.payload.buffer, send_buffer,
3596 sizeof(send_buffer));
3597 if (ret) {
3598 ERR("[notification-thread] Failed to send protocol version to notification channel client");
3599 goto end_unlock;
3600 }
3601
3602 client->validated = true;
3603 client->communication.active = true;
3604 pthread_mutex_unlock(&client->lock);
3605
3606 /* Set reception state to receive the next message header. */
3607 ret = client_reset_inbound_state(client);
3608 if (ret) {
3609 ERR("[notification-thread] Failed to reset client communication's inbound state");
3610 goto end;
3611 }
3612
3613 /* Flushes the outgoing queue. */
3614 ret = client_send_command_reply(client, state, status);
3615 if (ret) {
3616 ERR("[notification-thread] Failed to send reply to notification channel client");
3617 goto end;
3618 }
3619
3620 goto end;
3621 end_unlock:
3622 pthread_mutex_unlock(&client->lock);
3623 end:
3624 return ret;
3625 }
3626
3627 static
3628 int client_handle_message_subscription(
3629 struct notification_client *client,
3630 enum lttng_notification_channel_message_type msg_type,
3631 struct notification_thread_state *state)
3632 {
3633 int ret;
3634 struct lttng_condition *condition;
3635 enum lttng_notification_channel_status status =
3636 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
3637 struct lttng_payload_view condition_view =
3638 lttng_payload_view_from_payload(
3639 &client->communication.inbound.payload,
3640 0, -1);
3641 size_t expected_condition_size;
3642
3643 /*
3644 * No need to lock client to sample the inbound state as the only
3645 * other thread accessing clients (action executor) only uses the
3646 * outbound state.
3647 */
3648 expected_condition_size = client->communication.inbound.payload.buffer.size;
3649 ret = lttng_condition_create_from_payload(&condition_view, &condition);
3650 if (ret != expected_condition_size) {
3651 ERR("[notification-thread] Malformed condition received from client");
3652 goto end;
3653 }
3654
3655 if (msg_type == LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE) {
3656 ret = notification_thread_client_subscribe(
3657 client, condition, state, &status);
3658 } else {
3659 ret = notification_thread_client_unsubscribe(
3660 client, condition, state, &status);
3661 }
3662
3663 if (ret) {
3664 goto end;
3665 }
3666
3667 /* Set reception state to receive the next message header. */
3668 ret = client_reset_inbound_state(client);
3669 if (ret) {
3670 ERR("[notification-thread] Failed to reset client communication's inbound state");
3671 goto end;
3672 }
3673
3674 ret = client_send_command_reply(client, state, status);
3675 if (ret) {
3676 ERR("[notification-thread] Failed to send reply to notification channel client");
3677 goto end;
3678 }
3679
3680 end:
3681 return ret;
3682 }
3683
3684 static
3685 int client_dispatch_message(struct notification_client *client,
3686 struct notification_thread_state *state)
3687 {
3688 int ret = 0;
3689
3690 if (client->communication.inbound.msg_type !=
3691 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE &&
3692 client->communication.inbound.msg_type !=
3693 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN &&
3694 !client->validated) {
3695 WARN("[notification-thread] client attempted a command before handshake");
3696 ret = -1;
3697 goto end;
3698 }
3699
3700 switch (client->communication.inbound.msg_type) {
3701 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN:
3702 {
3703 ret = client_handle_message_unknown(client, state);
3704 break;
3705 }
3706 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
3707 {
3708 ret = client_handle_message_handshake(client, state);
3709 break;
3710 }
3711 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
3712 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
3713 {
3714 ret = client_handle_message_subscription(client,
3715 client->communication.inbound.msg_type, state);
3716 break;
3717 }
3718 default:
3719 abort();
3720 }
3721 end:
3722 return ret;
3723 }
3724
3725 /* Incoming data from client. */
3726 int handle_notification_thread_client_in(
3727 struct notification_thread_state *state, int socket)
3728 {
3729 int ret = 0;
3730 struct notification_client *client;
3731 ssize_t recv_ret;
3732 size_t offset;
3733
3734 rcu_read_lock();
3735 client = get_client_from_socket(socket, state);
3736 if (!client) {
3737 /* Internal error, abort. */
3738 ret = -1;
3739 goto end;
3740 }
3741
3742 offset = client->communication.inbound.payload.buffer.size -
3743 client->communication.inbound.bytes_to_receive;
3744 if (client->communication.inbound.expect_creds) {
3745 recv_ret = lttcomm_recv_creds_unix_sock(socket,
3746 client->communication.inbound.payload.buffer.data + offset,
3747 client->communication.inbound.bytes_to_receive,
3748 &client->communication.inbound.creds);
3749 if (recv_ret > 0) {
3750 client->communication.inbound.expect_creds = false;
3751 client->communication.inbound.creds_received = true;
3752 }
3753 } else {
3754 recv_ret = lttcomm_recv_unix_sock_non_block(socket,
3755 client->communication.inbound.payload.buffer.data + offset,
3756 client->communication.inbound.bytes_to_receive);
3757 }
3758 if (recv_ret >= 0) {
3759 client->communication.inbound.bytes_to_receive -= recv_ret;
3760 } else {
3761 goto error_disconnect_client;
3762 }
3763
3764 if (client->communication.inbound.bytes_to_receive != 0) {
3765 /* Message incomplete wait for more data. */
3766 ret = 0;
3767 goto end;
3768 }
3769
3770 assert(client->communication.inbound.bytes_to_receive == 0);
3771
3772 /* Receive fds. */
3773 if (client->communication.inbound.fds_to_receive != 0) {
3774 ret = lttcomm_recv_payload_fds_unix_sock_non_block(
3775 client->socket,
3776 client->communication.inbound.fds_to_receive,
3777 &client->communication.inbound.payload);
3778 if (ret > 0) {
3779 /*
3780 * Fds received. non blocking fds passing is all
3781 * or nothing.
3782 */
3783 ssize_t expected_size;
3784
3785 expected_size = sizeof(int) *
3786 client->communication.inbound
3787 .fds_to_receive;
3788 assert(ret == expected_size);
3789 client->communication.inbound.fds_to_receive = 0;
3790 } else if (ret == 0) {
3791 /* Received nothing. */
3792 ret = 0;
3793 goto end;
3794 } else {
3795 goto error_disconnect_client;
3796 }
3797 }
3798
3799 /* At this point the message is complete.*/
3800 assert(client->communication.inbound.bytes_to_receive == 0 &&
3801 client->communication.inbound.fds_to_receive == 0);
3802 ret = client_dispatch_message(client, state);
3803 if (ret) {
3804 /*
3805 * Only returns an error if this client must be
3806 * disconnected.
3807 */
3808 goto error_disconnect_client;
3809 }
3810
3811 end:
3812 rcu_read_unlock();
3813 return ret;
3814
3815 error_disconnect_client:
3816 ret = notification_thread_client_disconnect(client, state);
3817 goto end;
3818 }
3819
3820 /* Client ready to receive outgoing data. */
3821 int handle_notification_thread_client_out(
3822 struct notification_thread_state *state, int socket)
3823 {
3824 int ret;
3825 struct notification_client *client;
3826 enum client_transmission_status transmission_status;
3827
3828 rcu_read_lock();
3829 client = get_client_from_socket(socket, state);
3830 if (!client) {
3831 /* Internal error, abort. */
3832 ret = -1;
3833 goto end;
3834 }
3835
3836 pthread_mutex_lock(&client->lock);
3837 transmission_status = client_flush_outgoing_queue(client);
3838 pthread_mutex_unlock(&client->lock);
3839
3840 ret = client_handle_transmission_status(
3841 client, transmission_status, state);
3842 if (ret) {
3843 goto end;
3844 }
3845 end:
3846 rcu_read_unlock();
3847 return ret;
3848 }
3849
3850 static
3851 bool evaluate_buffer_usage_condition(const struct lttng_condition *condition,
3852 const struct channel_state_sample *sample,
3853 uint64_t buffer_capacity)
3854 {
3855 bool result = false;
3856 uint64_t threshold;
3857 enum lttng_condition_type condition_type;
3858 const struct lttng_condition_buffer_usage *use_condition = container_of(
3859 condition, struct lttng_condition_buffer_usage,
3860 parent);
3861
3862 if (use_condition->threshold_bytes.set) {
3863 threshold = use_condition->threshold_bytes.value;
3864 } else {
3865 /*
3866 * Threshold was expressed as a ratio.
3867 *
3868 * TODO the threshold (in bytes) of conditions expressed
3869 * as a ratio of total buffer size could be cached to
3870 * forego this double-multiplication or it could be performed
3871 * as fixed-point math.
3872 *
3873 * Note that caching should accommodates the case where the
3874 * condition applies to multiple channels (i.e. don't assume
3875 * that all channels matching my_chann* have the same size...)
3876 */
3877 threshold = (uint64_t) (use_condition->threshold_ratio.value *
3878 (double) buffer_capacity);
3879 }
3880
3881 condition_type = lttng_condition_get_type(condition);
3882 if (condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW) {
3883 DBG("[notification-thread] Low buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
3884 threshold, sample->highest_usage);
3885
3886 /*
3887 * The low condition should only be triggered once _all_ of the
3888 * streams in a channel have gone below the "low" threshold.
3889 */
3890 if (sample->highest_usage <= threshold) {
3891 result = true;
3892 }
3893 } else {
3894 DBG("[notification-thread] High buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
3895 threshold, sample->highest_usage);
3896
3897 /*
3898 * For high buffer usage scenarios, we want to trigger whenever
3899 * _any_ of the streams has reached the "high" threshold.
3900 */
3901 if (sample->highest_usage >= threshold) {
3902 result = true;
3903 }
3904 }
3905
3906 return result;
3907 }
3908
3909 static
3910 bool evaluate_session_consumed_size_condition(
3911 const struct lttng_condition *condition,
3912 uint64_t session_consumed_size)
3913 {
3914 uint64_t threshold;
3915 const struct lttng_condition_session_consumed_size *size_condition =
3916 container_of(condition,
3917 struct lttng_condition_session_consumed_size,
3918 parent);
3919
3920 threshold = size_condition->consumed_threshold_bytes.value;
3921 DBG("[notification-thread] Session consumed size condition being evaluated: threshold = %" PRIu64 ", current size = %" PRIu64,
3922 threshold, session_consumed_size);
3923 return session_consumed_size >= threshold;
3924 }
3925
3926 static
3927 int evaluate_buffer_condition(const struct lttng_condition *condition,
3928 struct lttng_evaluation **evaluation,
3929 const struct notification_thread_state *state,
3930 const struct channel_state_sample *previous_sample,
3931 const struct channel_state_sample *latest_sample,
3932 uint64_t previous_session_consumed_total,
3933 uint64_t latest_session_consumed_total,
3934 struct channel_info *channel_info)
3935 {
3936 int ret = 0;
3937 enum lttng_condition_type condition_type;
3938 const bool previous_sample_available = !!previous_sample;
3939 bool previous_sample_result = false;
3940 bool latest_sample_result;
3941
3942 condition_type = lttng_condition_get_type(condition);
3943
3944 switch (condition_type) {
3945 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
3946 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
3947 if (caa_likely(previous_sample_available)) {
3948 previous_sample_result =
3949 evaluate_buffer_usage_condition(condition,
3950 previous_sample, channel_info->capacity);
3951 }
3952 latest_sample_result = evaluate_buffer_usage_condition(
3953 condition, latest_sample,
3954 channel_info->capacity);
3955 break;
3956 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
3957 if (caa_likely(previous_sample_available)) {
3958 previous_sample_result =
3959 evaluate_session_consumed_size_condition(
3960 condition,
3961 previous_session_consumed_total);
3962 }
3963 latest_sample_result =
3964 evaluate_session_consumed_size_condition(
3965 condition,
3966 latest_session_consumed_total);
3967 break;
3968 default:
3969 /* Unknown condition type; internal error. */
3970 abort();
3971 }
3972
3973 if (!latest_sample_result ||
3974 (previous_sample_result == latest_sample_result)) {
3975 /*
3976 * Only trigger on a condition evaluation transition.
3977 *
3978 * NOTE: This edge-triggered logic may not be appropriate for
3979 * future condition types.
3980 */
3981 goto end;
3982 }
3983
3984 if (!evaluation || !latest_sample_result) {
3985 goto end;
3986 }
3987
3988 switch (condition_type) {
3989 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
3990 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
3991 *evaluation = lttng_evaluation_buffer_usage_create(
3992 condition_type,
3993 latest_sample->highest_usage,
3994 channel_info->capacity);
3995 break;
3996 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
3997 *evaluation = lttng_evaluation_session_consumed_size_create(
3998 latest_session_consumed_total);
3999 break;
4000 default:
4001 abort();
4002 }
4003
4004 if (!*evaluation) {
4005 ret = -1;
4006 goto end;
4007 }
4008 end:
4009 return ret;
4010 }
4011
4012 static
4013 int client_notification_overflow(struct notification_client *client)
4014 {
4015 int ret = 0;
4016 const struct lttng_notification_channel_message msg = {
4017 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED,
4018 };
4019
4020 ASSERT_LOCKED(client->lock);
4021
4022 DBG("Dropping notification addressed to client (socket fd = %i)",
4023 client->socket);
4024 if (client->communication.outbound.dropped_notification) {
4025 /*
4026 * The client already has a "notification dropped" message
4027 * in its outgoing queue. Nothing to do since all
4028 * of those messages are coalesced.
4029 */
4030 goto end;
4031 }
4032
4033 client->communication.outbound.dropped_notification = true;
4034 ret = lttng_dynamic_buffer_append(
4035 &client->communication.outbound.payload.buffer, &msg,
4036 sizeof(msg));
4037 if (ret) {
4038 PERROR("Failed to enqueue \"dropped notification\" message in client's (socket fd = %i) outgoing queue",
4039 client->socket);
4040 }
4041 end:
4042 return ret;
4043 }
4044
4045 static int client_handle_transmission_status_wrapper(
4046 struct notification_client *client,
4047 enum client_transmission_status status,
4048 void *user_data)
4049 {
4050 return client_handle_transmission_status(client, status,
4051 (struct notification_thread_state *) user_data);
4052 }
4053
4054 static
4055 int send_evaluation_to_clients(const struct lttng_trigger *trigger,
4056 const struct lttng_evaluation *evaluation,
4057 struct notification_client_list* client_list,
4058 struct notification_thread_state *state,
4059 uid_t object_uid, gid_t object_gid)
4060 {
4061 const struct lttng_credentials creds = {
4062 .uid = LTTNG_OPTIONAL_INIT_VALUE(object_uid),
4063 .gid = LTTNG_OPTIONAL_INIT_VALUE(object_gid),
4064 };
4065
4066 return notification_client_list_send_evaluation(client_list,
4067 lttng_trigger_get_const_condition(trigger), evaluation,
4068 lttng_trigger_get_credentials(trigger),
4069 &creds,
4070 client_handle_transmission_status_wrapper, state);
4071 }
4072
4073 /*
4074 * Permission checks relative to notification channel clients are performed
4075 * here. Notice how object, client, and trigger credentials are involved in
4076 * this check.
4077 *
4078 * The `object` credentials are the credentials associated with the "subject"
4079 * of a condition. For instance, a `rotation completed` condition applies
4080 * to a session. When that condition is met, it will produce an evaluation
4081 * against a session. Hence, in this case, the `object` credentials are the
4082 * credentials of the "subject" session.
4083 *
4084 * The `trigger` credentials are the credentials of the user that registered the
4085 * trigger.
4086 *
4087 * The `client` credentials are the credentials of the user that created a given
4088 * notification channel.
4089 *
4090 * In terms of visibility, it is expected that non-privilieged users can only
4091 * register triggers against "their" objects (their own sessions and
4092 * applications they are allowed to interact with). They can then open a
4093 * notification channel and subscribe to notifications associated with those
4094 * triggers.
4095 *
4096 * As for privilieged users, they can register triggers against the objects of
4097 * other users. They can then subscribe to the notifications associated to their
4098 * triggers. Privilieged users _can't_ subscribe to the notifications of
4099 * triggers owned by other users; they must create their own triggers.
4100 *
4101 * This is more a concern of usability than security. It would be difficult for
4102 * a root user reliably subscribe to a specific set of conditions without
4103 * interference from external users (those could, for instance, unregister
4104 * their triggers).
4105 */
4106 LTTNG_HIDDEN
4107 int notification_client_list_send_evaluation(
4108 struct notification_client_list *client_list,
4109 const struct lttng_condition *condition,
4110 const struct lttng_evaluation *evaluation,
4111 const struct lttng_credentials *trigger_creds,
4112 const struct lttng_credentials *source_object_creds,
4113 report_client_transmission_result_cb client_report,
4114 void *user_data)
4115 {
4116 int ret = 0;
4117 struct lttng_payload msg_payload;
4118 struct notification_client_list_element *client_list_element, *tmp;
4119 const struct lttng_notification notification = {
4120 .condition = (struct lttng_condition *) condition,
4121 .evaluation = (struct lttng_evaluation *) evaluation,
4122 };
4123 struct lttng_notification_channel_message msg_header = {
4124 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION,
4125 };
4126
4127 lttng_payload_init(&msg_payload);
4128
4129 ret = lttng_dynamic_buffer_append(&msg_payload.buffer, &msg_header,
4130 sizeof(msg_header));
4131 if (ret) {
4132 goto end;
4133 }
4134
4135 ret = lttng_notification_serialize(&notification, &msg_payload);
4136 if (ret) {
4137 ERR("[notification-thread] Failed to serialize notification");
4138 ret = -1;
4139 goto end;
4140 }
4141
4142 /* Update payload size. */
4143 ((struct lttng_notification_channel_message *) msg_payload.buffer.data)
4144 ->size = (uint32_t)(
4145 msg_payload.buffer.size - sizeof(msg_header));
4146
4147 /* Update the payload number of fds. */
4148 {
4149 const struct lttng_payload_view pv = lttng_payload_view_from_payload(
4150 &msg_payload, 0, -1);
4151
4152 ((struct lttng_notification_channel_message *)
4153 msg_payload.buffer.data)->fds = (uint32_t)
4154 lttng_payload_view_get_fd_handle_count(&pv);
4155 }
4156
4157 pthread_mutex_lock(&client_list->lock);
4158 cds_list_for_each_entry_safe(client_list_element, tmp,
4159 &client_list->list, node) {
4160 enum client_transmission_status transmission_status;
4161 struct notification_client *client =
4162 client_list_element->client;
4163
4164 ret = 0;
4165 pthread_mutex_lock(&client->lock);
4166 if (!client->communication.active) {
4167 /*
4168 * Skip inactive client (protocol error or
4169 * disconnecting).
4170 */
4171 DBG("Skipping client at it is marked as inactive");
4172 goto skip_client;
4173 }
4174
4175 if (source_object_creds) {
4176 if (client->uid != lttng_credentials_get_uid(source_object_creds) &&
4177 client->gid != lttng_credentials_get_gid(source_object_creds) &&
4178 client->uid != 0) {
4179 /*
4180 * Client is not allowed to monitor this
4181 * object.
4182 */
4183 DBG("[notification-thread] Skipping client at it does not have the object permission to receive notification for this trigger");
4184 goto skip_client;
4185 }
4186 }
4187
4188 if (client->uid != lttng_credentials_get_uid(trigger_creds) && client->gid != lttng_credentials_get_gid(trigger_creds)) {
4189 DBG("[notification-thread] Skipping client at it does not have the permission to receive notification for this trigger");
4190 goto skip_client;
4191 }
4192
4193 DBG("[notification-thread] Sending notification to client (fd = %i, %zu bytes)",
4194 client->socket, msg_payload.buffer.size);
4195
4196 if (client_has_outbound_data_left(client)) {
4197 /*
4198 * Outgoing data is already buffered for this client;
4199 * drop the notification and enqueue a "dropped
4200 * notification" message if this is the first dropped
4201 * notification since the socket spilled-over to the
4202 * queue.
4203 */
4204 ret = client_notification_overflow(client);
4205 if (ret) {
4206 /* Fatal error. */
4207 goto skip_client;
4208 }
4209 }
4210
4211 ret = lttng_payload_copy(&msg_payload, &client->communication.outbound.payload);
4212 if (ret) {
4213 /* Fatal error. */
4214 goto skip_client;
4215 }
4216
4217 transmission_status = client_flush_outgoing_queue(client);
4218 pthread_mutex_unlock(&client->lock);
4219 ret = client_report(client, transmission_status, user_data);
4220 if (ret) {
4221 /* Fatal error. */
4222 goto end_unlock_list;
4223 }
4224
4225 continue;
4226
4227 skip_client:
4228 pthread_mutex_unlock(&client->lock);
4229 if (ret) {
4230 /* Fatal error. */
4231 goto end_unlock_list;
4232 }
4233 }
4234 ret = 0;
4235
4236 end_unlock_list:
4237 pthread_mutex_unlock(&client_list->lock);
4238 end:
4239 lttng_payload_reset(&msg_payload);
4240 return ret;
4241 }
4242
4243 static
4244 struct lttng_event_notifier_notification *recv_one_event_notifier_notification(
4245 int notification_pipe_read_fd, enum lttng_domain_type domain)
4246 {
4247 int ret;
4248 uint64_t token;
4249 struct lttng_event_notifier_notification *notification = NULL;
4250 char *capture_buffer = NULL;
4251 size_t capture_buffer_size;
4252 void *reception_buffer;
4253 size_t reception_size;
4254
4255 struct lttng_ust_abi_event_notifier_notification ust_notification;
4256 struct lttng_kernel_event_notifier_notification kernel_notification;
4257
4258 /* Init lttng_event_notifier_notification */
4259 switch(domain) {
4260 case LTTNG_DOMAIN_UST:
4261 reception_buffer = (void *) &ust_notification;
4262 reception_size = sizeof(ust_notification);
4263 break;
4264 case LTTNG_DOMAIN_KERNEL:
4265 reception_buffer = (void *) &kernel_notification;
4266 reception_size = sizeof(kernel_notification);
4267 break;
4268 default:
4269 abort();
4270 }
4271
4272 /*
4273 * The monitoring pipe only holds messages smaller than PIPE_BUF,
4274 * ensuring that read/write of tracer notifications are atomic.
4275 */
4276 ret = lttng_read(notification_pipe_read_fd, reception_buffer,
4277 reception_size);
4278 if (ret != reception_size) {
4279 PERROR("Failed to read from event source notification pipe: fd = %d, size to read = %zu, ret = %d",
4280 notification_pipe_read_fd, reception_size, ret);
4281 ret = -1;
4282 goto end;
4283 }
4284
4285 switch(domain) {
4286 case LTTNG_DOMAIN_UST:
4287 token = ust_notification.token;
4288 capture_buffer_size = ust_notification.capture_buf_size;
4289 break;
4290 case LTTNG_DOMAIN_KERNEL:
4291 token = kernel_notification.token;
4292 capture_buffer_size = 0;
4293 break;
4294 default:
4295 abort();
4296 }
4297
4298 if (capture_buffer_size == 0) {
4299 capture_buffer = NULL;
4300 goto skip_capture;
4301 }
4302
4303 if (capture_buffer_size > MAX_CAPTURE_SIZE) {
4304 ERR("[notification-thread] Event notifier has a capture payload size which exceeds the maximum allowed size: capture_payload_size = %zu bytes, max allowed size = %d bytes",
4305 capture_buffer_size, MAX_CAPTURE_SIZE);
4306 goto end;
4307 }
4308
4309 capture_buffer = zmalloc(capture_buffer_size);
4310 if (!capture_buffer) {
4311 ERR("[notification-thread] Failed to allocate capture buffer");
4312 goto end;
4313 }
4314
4315 /* Fetch additional payload (capture). */
4316 ret = lttng_read(notification_pipe_read_fd, capture_buffer, capture_buffer_size);
4317 if (ret != capture_buffer_size) {
4318 ERR("[notification-thread] Failed to read from event source pipe (fd = %i)",
4319 notification_pipe_read_fd);
4320 goto end;
4321 }
4322
4323 skip_capture:
4324 notification = lttng_event_notifier_notification_create(token, domain,
4325 capture_buffer, capture_buffer_size);
4326 if (notification == NULL) {
4327 goto end;
4328 }
4329
4330 /*
4331 * Ownership transfered to the lttng_event_notifier_notification object.
4332 */
4333 capture_buffer = NULL;
4334
4335 end:
4336 free(capture_buffer);
4337 return notification;
4338 }
4339
4340 static
4341 int dispatch_one_event_notifier_notification(struct notification_thread_state *state,
4342 struct lttng_event_notifier_notification *notification)
4343 {
4344 struct cds_lfht_node *node;
4345 struct cds_lfht_iter iter;
4346 struct notification_trigger_tokens_ht_element *element;
4347 enum lttng_trigger_status trigger_status;
4348 struct lttng_evaluation *evaluation = NULL;
4349 enum action_executor_status executor_status;
4350 struct notification_client_list *client_list = NULL;
4351 const char *trigger_name;
4352 int ret;
4353 unsigned int capture_count = 0;
4354
4355 /* Find triggers associated with this token. */
4356 rcu_read_lock();
4357 cds_lfht_lookup(state->trigger_tokens_ht,
4358 hash_key_u64(&notification->tracer_token, lttng_ht_seed),
4359 match_trigger_token, &notification->tracer_token, &iter);
4360 node = cds_lfht_iter_get_node(&iter);
4361 if (caa_unlikely(!node)) {
4362 /*
4363 * This is not an error, slow consumption of the tracer
4364 * notifications can lead to situations where a trigger is
4365 * removed but we still get tracer notifications matching a
4366 * trigger that no longer exists.
4367 */
4368 ret = 0;
4369 goto end_unlock;
4370 }
4371
4372 element = caa_container_of(node,
4373 struct notification_trigger_tokens_ht_element,
4374 node);
4375
4376 if (!lttng_trigger_should_fire(element->trigger)) {
4377 ret = 0;
4378 goto end_unlock;
4379 }
4380
4381 lttng_trigger_fire(element->trigger);
4382
4383 trigger_status = lttng_trigger_get_name(element->trigger, &trigger_name);
4384 assert(trigger_status == LTTNG_TRIGGER_STATUS_OK);
4385
4386 if (lttng_condition_event_rule_get_capture_descriptor_count(
4387 lttng_trigger_get_const_condition(element->trigger),
4388 &capture_count) != LTTNG_CONDITION_STATUS_OK) {
4389 ERR("Failed to get capture count");
4390 ret = -1;
4391 goto end;
4392 }
4393
4394 if (!notification->capture_buffer && capture_count != 0) {
4395 ERR("Expected capture but capture buffer is null");
4396 ret = -1;
4397 goto end;
4398 }
4399
4400 evaluation = lttng_evaluation_event_rule_create(
4401 container_of(lttng_trigger_get_const_condition(
4402 element->trigger),
4403 struct lttng_condition_event_rule,
4404 parent),
4405 trigger_name,
4406 notification->capture_buffer,
4407 notification->capture_buf_size, false);
4408
4409 if (evaluation == NULL) {
4410 ERR("[notification-thread] Failed to create event rule hit evaluation while creating and enqueuing action executor job");
4411 ret = -1;
4412 goto end_unlock;
4413 }
4414 client_list = get_client_list_from_condition(state,
4415 lttng_trigger_get_const_condition(element->trigger));
4416 executor_status = action_executor_enqueue(state->executor,
4417 element->trigger, evaluation, NULL, client_list);
4418 switch (executor_status) {
4419 case ACTION_EXECUTOR_STATUS_OK:
4420 ret = 0;
4421 break;
4422 case ACTION_EXECUTOR_STATUS_OVERFLOW:
4423 {
4424 struct notification_client_list_element *client_list_element,
4425 *tmp;
4426
4427 /*
4428 * Not a fatal error; this is expected and simply means the
4429 * executor has too much work queued already.
4430 */
4431 ret = 0;
4432
4433 /* No clients subscribed to notifications for this trigger. */
4434 if (!client_list) {
4435 break;
4436 }
4437
4438 /* Warn clients that a notification (or more) was dropped. */
4439 pthread_mutex_lock(&client_list->lock);
4440 cds_list_for_each_entry_safe(client_list_element, tmp,
4441 &client_list->list, node) {
4442 enum client_transmission_status transmission_status;
4443 struct notification_client *client =
4444 client_list_element->client;
4445
4446 pthread_mutex_lock(&client->lock);
4447 ret = client_notification_overflow(client);
4448 if (ret) {
4449 /* Fatal error. */
4450 goto next_client;
4451 }
4452
4453 transmission_status =
4454 client_flush_outgoing_queue(client);
4455 ret = client_handle_transmission_status(
4456 client, transmission_status, state);
4457 if (ret) {
4458 /* Fatal error. */
4459 goto next_client;
4460 }
4461 next_client:
4462 pthread_mutex_unlock(&client->lock);
4463 if (ret) {
4464 break;
4465 }
4466 }
4467
4468 pthread_mutex_unlock(&client_list->lock);
4469 break;
4470 }
4471 case ACTION_EXECUTOR_STATUS_ERROR:
4472 /* Fatal error, shut down everything. */
4473 ERR("Fatal error encoutered while enqueuing action to the action executor");
4474 ret = -1;
4475 goto end_unlock;
4476 default:
4477 /* Unhandled error. */
4478 abort();
4479 }
4480
4481 end_unlock:
4482 notification_client_list_put(client_list);
4483 rcu_read_unlock();
4484 end:
4485 return ret;
4486 }
4487
4488 static
4489 int handle_one_event_notifier_notification(
4490 struct notification_thread_state *state,
4491 int pipe, enum lttng_domain_type domain)
4492 {
4493 int ret = 0;
4494 struct lttng_event_notifier_notification *notification = NULL;
4495
4496 notification = recv_one_event_notifier_notification(pipe, domain);
4497 if (notification == NULL) {
4498 /* Reception failed, don't consider it fatal. */
4499 ERR("[notification-thread] Error receiving an event notifier notification from tracer: fd = %i, domain = %s",
4500 pipe, lttng_domain_type_str(domain));
4501 goto end;
4502 }
4503
4504 ret = dispatch_one_event_notifier_notification(state, notification);
4505 if (ret) {
4506 ERR("[notification-thread] Error dispatching an event notifier notification from tracer: fd = %i, domain = %s",
4507 pipe, lttng_domain_type_str(domain));
4508 goto end;
4509 }
4510
4511 end:
4512 lttng_event_notifier_notification_destroy(notification);
4513 return ret;
4514 }
4515
4516 int handle_notification_thread_event_notification(struct notification_thread_state *state,
4517 int pipe, enum lttng_domain_type domain)
4518 {
4519 return handle_one_event_notifier_notification(state, pipe, domain);
4520 }
4521
4522 int handle_notification_thread_channel_sample(
4523 struct notification_thread_state *state, int pipe,
4524 enum lttng_domain_type domain)
4525 {
4526 int ret = 0;
4527 struct lttcomm_consumer_channel_monitor_msg sample_msg;
4528 struct channel_info *channel_info;
4529 struct cds_lfht_node *node;
4530 struct cds_lfht_iter iter;
4531 struct lttng_channel_trigger_list *trigger_list;
4532 struct lttng_trigger_list_element *trigger_list_element;
4533 bool previous_sample_available = false;
4534 struct channel_state_sample previous_sample, latest_sample;
4535 uint64_t previous_session_consumed_total, latest_session_consumed_total;
4536 struct lttng_credentials channel_creds;
4537
4538 /*
4539 * The monitoring pipe only holds messages smaller than PIPE_BUF,
4540 * ensuring that read/write of sampling messages are atomic.
4541 */
4542 ret = lttng_read(pipe, &sample_msg, sizeof(sample_msg));
4543 if (ret != sizeof(sample_msg)) {
4544 ERR("[notification-thread] Failed to read from monitoring pipe (fd = %i)",
4545 pipe);
4546 ret = -1;
4547 goto end;
4548 }
4549
4550 ret = 0;
4551 latest_sample.key.key = sample_msg.key;
4552 latest_sample.key.domain = domain;
4553 latest_sample.highest_usage = sample_msg.highest;
4554 latest_sample.lowest_usage = sample_msg.lowest;
4555 latest_sample.channel_total_consumed = sample_msg.total_consumed;
4556
4557 rcu_read_lock();
4558
4559 /* Retrieve the channel's informations */
4560 cds_lfht_lookup(state->channels_ht,
4561 hash_channel_key(&latest_sample.key),
4562 match_channel_info,
4563 &latest_sample.key,
4564 &iter);
4565 node = cds_lfht_iter_get_node(&iter);
4566 if (caa_unlikely(!node)) {
4567 /*
4568 * Not an error since the consumer can push a sample to the pipe
4569 * and the rest of the session daemon could notify us of the
4570 * channel's destruction before we get a chance to process that
4571 * sample.
4572 */
4573 DBG("[notification-thread] Received a sample for an unknown channel from consumerd, key = %" PRIu64 " in %s domain",
4574 latest_sample.key.key,
4575 lttng_domain_type_str(domain));
4576 goto end_unlock;
4577 }
4578 channel_info = caa_container_of(node, struct channel_info,
4579 channels_ht_node);
4580 DBG("[notification-thread] Handling channel sample for channel %s (key = %" PRIu64 ") in session %s (highest usage = %" PRIu64 ", lowest usage = %" PRIu64", total consumed = %" PRIu64")",
4581 channel_info->name,
4582 latest_sample.key.key,
4583 channel_info->session_info->name,
4584 latest_sample.highest_usage,
4585 latest_sample.lowest_usage,
4586 latest_sample.channel_total_consumed);
4587
4588 previous_session_consumed_total =
4589 channel_info->session_info->consumed_data_size;
4590
4591 /* Retrieve the channel's last sample, if it exists, and update it. */
4592 cds_lfht_lookup(state->channel_state_ht,
4593 hash_channel_key(&latest_sample.key),
4594 match_channel_state_sample,
4595 &latest_sample.key,
4596 &iter);
4597 node = cds_lfht_iter_get_node(&iter);
4598 if (caa_likely(node)) {
4599 struct channel_state_sample *stored_sample;
4600
4601 /* Update the sample stored. */
4602 stored_sample = caa_container_of(node,
4603 struct channel_state_sample,
4604 channel_state_ht_node);
4605
4606 memcpy(&previous_sample, stored_sample,
4607 sizeof(previous_sample));
4608 stored_sample->highest_usage = latest_sample.highest_usage;
4609 stored_sample->lowest_usage = latest_sample.lowest_usage;
4610 stored_sample->channel_total_consumed = latest_sample.channel_total_consumed;
4611 previous_sample_available = true;
4612
4613 latest_session_consumed_total =
4614 previous_session_consumed_total +
4615 (latest_sample.channel_total_consumed - previous_sample.channel_total_consumed);
4616 } else {
4617 /*
4618 * This is the channel's first sample, allocate space for and
4619 * store the new sample.
4620 */
4621 struct channel_state_sample *stored_sample;
4622
4623 stored_sample = zmalloc(sizeof(*stored_sample));
4624 if (!stored_sample) {
4625 ret = -1;
4626 goto end_unlock;
4627 }
4628
4629 memcpy(stored_sample, &latest_sample, sizeof(*stored_sample));
4630 cds_lfht_node_init(&stored_sample->channel_state_ht_node);
4631 cds_lfht_add(state->channel_state_ht,
4632 hash_channel_key(&stored_sample->key),
4633 &stored_sample->channel_state_ht_node);
4634
4635 latest_session_consumed_total =
4636 previous_session_consumed_total +
4637 latest_sample.channel_total_consumed;
4638 }
4639
4640 channel_info->session_info->consumed_data_size =
4641 latest_session_consumed_total;
4642
4643 /* Find triggers associated with this channel. */
4644 cds_lfht_lookup(state->channel_triggers_ht,
4645 hash_channel_key(&latest_sample.key),
4646 match_channel_trigger_list,
4647 &latest_sample.key,
4648 &iter);
4649 node = cds_lfht_iter_get_node(&iter);
4650 if (caa_likely(!node)) {
4651 goto end_unlock;
4652 }
4653
4654 channel_creds = (typeof(channel_creds)) {
4655 .uid = LTTNG_OPTIONAL_INIT_VALUE(channel_info->session_info->uid),
4656 .gid = LTTNG_OPTIONAL_INIT_VALUE(channel_info->session_info->gid),
4657 };
4658
4659 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
4660 channel_triggers_ht_node);
4661 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
4662 node) {
4663 const struct lttng_condition *condition;
4664 struct lttng_trigger *trigger;
4665 struct notification_client_list *client_list = NULL;
4666 struct lttng_evaluation *evaluation = NULL;
4667 enum action_executor_status executor_status;
4668
4669 ret = 0;
4670 trigger = trigger_list_element->trigger;
4671 condition = lttng_trigger_get_const_condition(trigger);
4672 assert(condition);
4673
4674 /*
4675 * Check if any client is subscribed to the result of this
4676 * evaluation.
4677 */
4678 client_list = get_client_list_from_condition(state, condition);
4679
4680 ret = evaluate_buffer_condition(condition, &evaluation, state,
4681 previous_sample_available ? &previous_sample : NULL,
4682 &latest_sample,
4683 previous_session_consumed_total,
4684 latest_session_consumed_total,
4685 channel_info);
4686 if (caa_unlikely(ret)) {
4687 goto put_list;
4688 }
4689
4690 if (caa_likely(!evaluation)) {
4691 goto put_list;
4692 }
4693
4694 if (!lttng_trigger_should_fire(trigger)) {
4695 goto put_list;
4696 }
4697
4698 lttng_trigger_fire(trigger);
4699
4700 /*
4701 * Ownership of `evaluation` transferred to the action executor
4702 * no matter the result.
4703 */
4704 executor_status = action_executor_enqueue(state->executor,
4705 trigger, evaluation, &channel_creds,
4706 client_list);
4707 evaluation = NULL;
4708 switch (executor_status) {
4709 case ACTION_EXECUTOR_STATUS_OK:
4710 break;
4711 case ACTION_EXECUTOR_STATUS_ERROR:
4712 case ACTION_EXECUTOR_STATUS_INVALID:
4713 /*
4714 * TODO Add trigger identification (name/id) when
4715 * it is added to the API.
4716 */
4717 ERR("Fatal error occurred while enqueuing action associated with buffer-condition trigger");
4718 ret = -1;
4719 goto put_list;
4720 case ACTION_EXECUTOR_STATUS_OVERFLOW:
4721 /*
4722 * TODO Add trigger identification (name/id) when
4723 * it is added to the API.
4724 *
4725 * Not a fatal error.
4726 */
4727 WARN("No space left when enqueuing action associated with buffer-condition trigger");
4728 ret = 0;
4729 goto put_list;
4730 default:
4731 abort();
4732 }
4733
4734 put_list:
4735 notification_client_list_put(client_list);
4736 if (caa_unlikely(ret)) {
4737 break;
4738 }
4739 }
4740 end_unlock:
4741 rcu_read_unlock();
4742 end:
4743 return ret;
4744 }
This page took 0.190139 seconds and 4 git commands to generate.