Cleanup: erroneous use of CDS_INIT_LIST_HEAD() on node
[lttng-tools.git] / src / bin / lttng-sessiond / notification-thread-events.c
1 /*
2 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * SPDX-License-Identifier: GPL-2.0-only
5 *
6 */
7
8 #include "lttng/action/action.h"
9 #include "lttng/trigger/trigger-internal.h"
10 #define _LGPL_SOURCE
11 #include <urcu.h>
12 #include <urcu/rculfhash.h>
13
14 #include <common/defaults.h>
15 #include <common/error.h>
16 #include <common/futex.h>
17 #include <common/unix.h>
18 #include <common/dynamic-buffer.h>
19 #include <common/hashtable/utils.h>
20 #include <common/sessiond-comm/sessiond-comm.h>
21 #include <common/macros.h>
22 #include <lttng/condition/condition.h>
23 #include <lttng/action/action-internal.h>
24 #include <lttng/notification/notification-internal.h>
25 #include <lttng/condition/condition-internal.h>
26 #include <lttng/condition/buffer-usage-internal.h>
27 #include <lttng/condition/session-consumed-size-internal.h>
28 #include <lttng/condition/session-rotation-internal.h>
29 #include <lttng/condition/event-rule-internal.h>
30 #include <lttng/domain-internal.h>
31 #include <lttng/notification/channel-internal.h>
32 #include <lttng/trigger/trigger-internal.h>
33 #include <lttng/event-rule/event-rule-internal.h>
34
35 #include <time.h>
36 #include <unistd.h>
37 #include <assert.h>
38 #include <inttypes.h>
39 #include <fcntl.h>
40
41 #include "condition-internal.h"
42 #include "notification-thread.h"
43 #include "notification-thread-events.h"
44 #include "notification-thread-commands.h"
45 #include "lttng-sessiond.h"
46 #include "kernel.h"
47
48 #define CLIENT_POLL_MASK_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
49 #define CLIENT_POLL_MASK_IN_OUT (CLIENT_POLL_MASK_IN | LPOLLOUT)
50
51 enum lttng_object_type {
52 LTTNG_OBJECT_TYPE_UNKNOWN,
53 LTTNG_OBJECT_TYPE_NONE,
54 LTTNG_OBJECT_TYPE_CHANNEL,
55 LTTNG_OBJECT_TYPE_SESSION,
56 };
57
58 struct lttng_trigger_list_element {
59 /* No ownership of the trigger object is assumed. */
60 struct lttng_trigger *trigger;
61 struct cds_list_head node;
62 };
63
64 struct lttng_channel_trigger_list {
65 struct channel_key channel_key;
66 /* List of struct lttng_trigger_list_element. */
67 struct cds_list_head list;
68 /* Node in the channel_triggers_ht */
69 struct cds_lfht_node channel_triggers_ht_node;
70 /* call_rcu delayed reclaim. */
71 struct rcu_head rcu_node;
72 };
73
74 /*
75 * List of triggers applying to a given session.
76 *
77 * See:
78 * - lttng_session_trigger_list_create()
79 * - lttng_session_trigger_list_build()
80 * - lttng_session_trigger_list_destroy()
81 * - lttng_session_trigger_list_add()
82 */
83 struct lttng_session_trigger_list {
84 /*
85 * Not owned by this; points to the session_info structure's
86 * session name.
87 */
88 const char *session_name;
89 /* List of struct lttng_trigger_list_element. */
90 struct cds_list_head list;
91 /* Node in the session_triggers_ht */
92 struct cds_lfht_node session_triggers_ht_node;
93 /*
94 * Weak reference to the notification system's session triggers
95 * hashtable.
96 *
97 * The session trigger list structure structure is owned by
98 * the session's session_info.
99 *
100 * The session_info is kept alive the the channel_infos holding a
101 * reference to it (reference counting). When those channels are
102 * destroyed (at runtime or on teardown), the reference they hold
103 * to the session_info are released. On destruction of session_info,
104 * session_info_destroy() will remove the list of triggers applying
105 * to this session from the notification system's state.
106 *
107 * This implies that the session_triggers_ht must be destroyed
108 * after the channels.
109 */
110 struct cds_lfht *session_triggers_ht;
111 /* Used for delayed RCU reclaim. */
112 struct rcu_head rcu_node;
113 };
114
115 struct lttng_trigger_ht_element {
116 struct lttng_trigger *trigger;
117 struct cds_lfht_node node;
118 struct cds_lfht_node node_by_name_uid;
119 /* call_rcu delayed reclaim. */
120 struct rcu_head rcu_node;
121 };
122
123 struct lttng_condition_list_element {
124 struct lttng_condition *condition;
125 struct cds_list_head node;
126 };
127
128 struct channel_state_sample {
129 struct channel_key key;
130 struct cds_lfht_node channel_state_ht_node;
131 uint64_t highest_usage;
132 uint64_t lowest_usage;
133 uint64_t channel_total_consumed;
134 /* call_rcu delayed reclaim. */
135 struct rcu_head rcu_node;
136 };
137
138 static unsigned long hash_channel_key(struct channel_key *key);
139 static int evaluate_buffer_condition(const struct lttng_condition *condition,
140 struct lttng_evaluation **evaluation,
141 const struct notification_thread_state *state,
142 const struct channel_state_sample *previous_sample,
143 const struct channel_state_sample *latest_sample,
144 uint64_t previous_session_consumed_total,
145 uint64_t latest_session_consumed_total,
146 struct channel_info *channel_info);
147 static
148 int send_evaluation_to_clients(const struct lttng_trigger *trigger,
149 const struct lttng_evaluation *evaluation,
150 struct notification_client_list *client_list,
151 struct notification_thread_state *state,
152 uid_t channel_uid, gid_t channel_gid);
153
154
155 /* session_info API */
156 static
157 void session_info_destroy(void *_data);
158 static
159 void session_info_get(struct session_info *session_info);
160 static
161 void session_info_put(struct session_info *session_info);
162 static
163 struct session_info *session_info_create(const char *name,
164 uid_t uid, gid_t gid,
165 struct lttng_session_trigger_list *trigger_list,
166 struct cds_lfht *sessions_ht);
167 static
168 void session_info_add_channel(struct session_info *session_info,
169 struct channel_info *channel_info);
170 static
171 void session_info_remove_channel(struct session_info *session_info,
172 struct channel_info *channel_info);
173
174 /* lttng_session_trigger_list API */
175 static
176 struct lttng_session_trigger_list *lttng_session_trigger_list_create(
177 const char *session_name,
178 struct cds_lfht *session_triggers_ht);
179 static
180 struct lttng_session_trigger_list *lttng_session_trigger_list_build(
181 const struct notification_thread_state *state,
182 const char *session_name);
183 static
184 void lttng_session_trigger_list_destroy(
185 struct lttng_session_trigger_list *list);
186 static
187 int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
188 struct lttng_trigger *trigger);
189
190 static
191 int client_handle_transmission_status(
192 struct notification_client *client,
193 enum client_transmission_status transmission_status,
194 struct notification_thread_state *state);
195
196 static
197 void free_lttng_trigger_ht_element_rcu(struct rcu_head *node);
198
199 static
200 int match_client_socket(struct cds_lfht_node *node, const void *key)
201 {
202 /* This double-cast is intended to supress pointer-to-cast warning. */
203 const int socket = (int) (intptr_t) key;
204 const struct notification_client *client = caa_container_of(node,
205 struct notification_client, client_socket_ht_node);
206
207 return client->socket == socket;
208 }
209
210 static
211 int match_client_id(struct cds_lfht_node *node, const void *key)
212 {
213 /* This double-cast is intended to supress pointer-to-cast warning. */
214 const notification_client_id id = *((notification_client_id *) key);
215 const struct notification_client *client = caa_container_of(
216 node, struct notification_client, client_id_ht_node);
217
218 return client->id == id;
219 }
220
221 static
222 int match_channel_trigger_list(struct cds_lfht_node *node, const void *key)
223 {
224 struct channel_key *channel_key = (struct channel_key *) key;
225 struct lttng_channel_trigger_list *trigger_list;
226
227 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
228 channel_triggers_ht_node);
229
230 return !!((channel_key->key == trigger_list->channel_key.key) &&
231 (channel_key->domain == trigger_list->channel_key.domain));
232 }
233
234 static
235 int match_session_trigger_list(struct cds_lfht_node *node, const void *key)
236 {
237 const char *session_name = (const char *) key;
238 struct lttng_session_trigger_list *trigger_list;
239
240 trigger_list = caa_container_of(node, struct lttng_session_trigger_list,
241 session_triggers_ht_node);
242
243 return !!(strcmp(trigger_list->session_name, session_name) == 0);
244 }
245
246 static
247 int match_channel_state_sample(struct cds_lfht_node *node, const void *key)
248 {
249 struct channel_key *channel_key = (struct channel_key *) key;
250 struct channel_state_sample *sample;
251
252 sample = caa_container_of(node, struct channel_state_sample,
253 channel_state_ht_node);
254
255 return !!((channel_key->key == sample->key.key) &&
256 (channel_key->domain == sample->key.domain));
257 }
258
259 static
260 int match_channel_info(struct cds_lfht_node *node, const void *key)
261 {
262 struct channel_key *channel_key = (struct channel_key *) key;
263 struct channel_info *channel_info;
264
265 channel_info = caa_container_of(node, struct channel_info,
266 channels_ht_node);
267
268 return !!((channel_key->key == channel_info->key.key) &&
269 (channel_key->domain == channel_info->key.domain));
270 }
271
272 static
273 int match_trigger(struct cds_lfht_node *node, const void *key)
274 {
275 struct lttng_trigger *trigger_key = (struct lttng_trigger *) key;
276 struct lttng_trigger_ht_element *trigger_ht_element;
277
278 trigger_ht_element = caa_container_of(node, struct lttng_trigger_ht_element,
279 node);
280
281 return !!lttng_trigger_is_equal(trigger_key, trigger_ht_element->trigger);
282 }
283
284 static
285 int match_trigger_token(struct cds_lfht_node *node, const void *key)
286 {
287 const uint64_t *_key = key;
288 struct notification_trigger_tokens_ht_element *element;
289
290 element = caa_container_of(node,
291 struct notification_trigger_tokens_ht_element, node);
292 return *_key == element->token;
293 }
294
295 static
296 int match_client_list_condition(struct cds_lfht_node *node, const void *key)
297 {
298 struct lttng_condition *condition_key = (struct lttng_condition *) key;
299 struct notification_client_list *client_list;
300 const struct lttng_condition *condition;
301
302 assert(condition_key);
303
304 client_list = caa_container_of(node, struct notification_client_list,
305 notification_trigger_clients_ht_node);
306 condition = lttng_trigger_get_const_condition(client_list->trigger);
307
308 return !!lttng_condition_is_equal(condition_key, condition);
309 }
310
311 static
312 int match_session(struct cds_lfht_node *node, const void *key)
313 {
314 const char *name = key;
315 struct session_info *session_info = caa_container_of(
316 node, struct session_info, sessions_ht_node);
317
318 return !strcmp(session_info->name, name);
319 }
320
321 /*
322 * Match trigger based on name and credentials only.
323 * Name duplication is NOT allowed for the same uid.
324 */
325 static
326 int match_trigger_by_name_uid(struct cds_lfht_node *node,
327 const void *key)
328 {
329 bool match = false;
330 const char *name;
331 const char *key_name;
332 enum lttng_trigger_status status;
333 const struct lttng_credentials *key_creds;
334 const struct lttng_credentials *node_creds;
335 const struct lttng_trigger *trigger_key =
336 (const struct lttng_trigger *) key;
337 const struct lttng_trigger_ht_element *trigger_ht_element =
338 caa_container_of(node,
339 struct lttng_trigger_ht_element,
340 node_by_name_uid);
341
342 status = lttng_trigger_get_name(trigger_ht_element->trigger, &name);
343 assert(status == LTTNG_TRIGGER_STATUS_OK);
344
345 status = lttng_trigger_get_name(trigger_key, &key_name);
346 assert(status == LTTNG_TRIGGER_STATUS_OK);
347
348 /* Compare the names. */
349 if (strcmp(name, key_name) != 0) {
350 goto end;
351 }
352
353 /* Compare the owners' UIDs. */
354 key_creds = lttng_trigger_get_credentials(trigger_key);
355 node_creds = lttng_trigger_get_credentials(trigger_ht_element->trigger);
356
357 match = lttng_credentials_is_equal_uid(key_creds, node_creds);
358
359 end:
360 return match;
361 }
362
363 /*
364 * Hash trigger based on name and credentials only.
365 */
366 static
367 unsigned long hash_trigger_by_name_uid(const struct lttng_trigger *trigger)
368 {
369 unsigned long hash = 0;
370 const struct lttng_credentials *trigger_creds;
371 const char *trigger_name;
372 enum lttng_trigger_status status;
373
374 status = lttng_trigger_get_name(trigger, &trigger_name);
375 if (status == LTTNG_TRIGGER_STATUS_OK) {
376 hash = hash_key_str(trigger_name, lttng_ht_seed);
377 }
378
379 trigger_creds = lttng_trigger_get_credentials(trigger);
380 hash ^= hash_key_ulong((void *) (unsigned long) LTTNG_OPTIONAL_GET(trigger_creds->uid),
381 lttng_ht_seed);
382
383 return hash;
384 }
385
386 static
387 unsigned long hash_channel_key(struct channel_key *key)
388 {
389 unsigned long key_hash = hash_key_u64(&key->key, lttng_ht_seed);
390 unsigned long domain_hash = hash_key_ulong(
391 (void *) (unsigned long) key->domain, lttng_ht_seed);
392
393 return key_hash ^ domain_hash;
394 }
395
396 static
397 unsigned long hash_client_socket(int socket)
398 {
399 return hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed);
400 }
401
402 static
403 unsigned long hash_client_id(notification_client_id id)
404 {
405 return hash_key_u64(&id, lttng_ht_seed);
406 }
407
408 /*
409 * Get the type of object to which a given condition applies. Bindings let
410 * the notification system evaluate a trigger's condition when a given
411 * object's state is updated.
412 *
413 * For instance, a condition bound to a channel will be evaluated everytime
414 * the channel's state is changed by a channel monitoring sample.
415 */
416 static
417 enum lttng_object_type get_condition_binding_object(
418 const struct lttng_condition *condition)
419 {
420 switch (lttng_condition_get_type(condition)) {
421 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
422 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
423 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
424 return LTTNG_OBJECT_TYPE_CHANNEL;
425 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
426 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
427 return LTTNG_OBJECT_TYPE_SESSION;
428 case LTTNG_CONDITION_TYPE_EVENT_RULE_HIT:
429 return LTTNG_OBJECT_TYPE_NONE;
430 default:
431 return LTTNG_OBJECT_TYPE_UNKNOWN;
432 }
433 }
434
435 static
436 void free_channel_info_rcu(struct rcu_head *node)
437 {
438 free(caa_container_of(node, struct channel_info, rcu_node));
439 }
440
441 static
442 void channel_info_destroy(struct channel_info *channel_info)
443 {
444 if (!channel_info) {
445 return;
446 }
447
448 if (channel_info->session_info) {
449 session_info_remove_channel(channel_info->session_info,
450 channel_info);
451 session_info_put(channel_info->session_info);
452 }
453 if (channel_info->name) {
454 free(channel_info->name);
455 }
456 call_rcu(&channel_info->rcu_node, free_channel_info_rcu);
457 }
458
459 static
460 void free_session_info_rcu(struct rcu_head *node)
461 {
462 free(caa_container_of(node, struct session_info, rcu_node));
463 }
464
465 /* Don't call directly, use the ref-counting mechanism. */
466 static
467 void session_info_destroy(void *_data)
468 {
469 struct session_info *session_info = _data;
470 int ret;
471
472 assert(session_info);
473 if (session_info->channel_infos_ht) {
474 ret = cds_lfht_destroy(session_info->channel_infos_ht, NULL);
475 if (ret) {
476 ERR("[notification-thread] Failed to destroy channel information hash table");
477 }
478 }
479 lttng_session_trigger_list_destroy(session_info->trigger_list);
480
481 rcu_read_lock();
482 cds_lfht_del(session_info->sessions_ht,
483 &session_info->sessions_ht_node);
484 rcu_read_unlock();
485 free(session_info->name);
486 call_rcu(&session_info->rcu_node, free_session_info_rcu);
487 }
488
489 static
490 void session_info_get(struct session_info *session_info)
491 {
492 if (!session_info) {
493 return;
494 }
495 lttng_ref_get(&session_info->ref);
496 }
497
498 static
499 void session_info_put(struct session_info *session_info)
500 {
501 if (!session_info) {
502 return;
503 }
504 lttng_ref_put(&session_info->ref);
505 }
506
507 static
508 struct session_info *session_info_create(const char *name, uid_t uid, gid_t gid,
509 struct lttng_session_trigger_list *trigger_list,
510 struct cds_lfht *sessions_ht)
511 {
512 struct session_info *session_info;
513
514 assert(name);
515
516 session_info = zmalloc(sizeof(*session_info));
517 if (!session_info) {
518 goto end;
519 }
520 lttng_ref_init(&session_info->ref, session_info_destroy);
521
522 session_info->channel_infos_ht = cds_lfht_new(DEFAULT_HT_SIZE,
523 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
524 if (!session_info->channel_infos_ht) {
525 goto error;
526 }
527
528 cds_lfht_node_init(&session_info->sessions_ht_node);
529 session_info->name = strdup(name);
530 if (!session_info->name) {
531 goto error;
532 }
533 session_info->uid = uid;
534 session_info->gid = gid;
535 session_info->trigger_list = trigger_list;
536 session_info->sessions_ht = sessions_ht;
537 end:
538 return session_info;
539 error:
540 session_info_put(session_info);
541 return NULL;
542 }
543
544 static
545 void session_info_add_channel(struct session_info *session_info,
546 struct channel_info *channel_info)
547 {
548 rcu_read_lock();
549 cds_lfht_add(session_info->channel_infos_ht,
550 hash_channel_key(&channel_info->key),
551 &channel_info->session_info_channels_ht_node);
552 rcu_read_unlock();
553 }
554
555 static
556 void session_info_remove_channel(struct session_info *session_info,
557 struct channel_info *channel_info)
558 {
559 rcu_read_lock();
560 cds_lfht_del(session_info->channel_infos_ht,
561 &channel_info->session_info_channels_ht_node);
562 rcu_read_unlock();
563 }
564
565 static
566 struct channel_info *channel_info_create(const char *channel_name,
567 struct channel_key *channel_key, uint64_t channel_capacity,
568 struct session_info *session_info)
569 {
570 struct channel_info *channel_info = zmalloc(sizeof(*channel_info));
571
572 if (!channel_info) {
573 goto end;
574 }
575
576 cds_lfht_node_init(&channel_info->channels_ht_node);
577 cds_lfht_node_init(&channel_info->session_info_channels_ht_node);
578 memcpy(&channel_info->key, channel_key, sizeof(*channel_key));
579 channel_info->capacity = channel_capacity;
580
581 channel_info->name = strdup(channel_name);
582 if (!channel_info->name) {
583 goto error;
584 }
585
586 /*
587 * Set the references between session and channel infos:
588 * - channel_info holds a strong reference to session_info
589 * - session_info holds a weak reference to channel_info
590 */
591 session_info_get(session_info);
592 session_info_add_channel(session_info, channel_info);
593 channel_info->session_info = session_info;
594 end:
595 return channel_info;
596 error:
597 channel_info_destroy(channel_info);
598 return NULL;
599 }
600
601 LTTNG_HIDDEN
602 bool notification_client_list_get(struct notification_client_list *list)
603 {
604 return urcu_ref_get_unless_zero(&list->ref);
605 }
606
607 static
608 void free_notification_client_list_rcu(struct rcu_head *node)
609 {
610 free(caa_container_of(node, struct notification_client_list,
611 rcu_node));
612 }
613
614 static
615 void notification_client_list_release(struct urcu_ref *list_ref)
616 {
617 struct notification_client_list *list =
618 container_of(list_ref, typeof(*list), ref);
619 struct notification_client_list_element *client_list_element, *tmp;
620
621 if (list->notification_trigger_clients_ht) {
622 rcu_read_lock();
623 cds_lfht_del(list->notification_trigger_clients_ht,
624 &list->notification_trigger_clients_ht_node);
625 rcu_read_unlock();
626 list->notification_trigger_clients_ht = NULL;
627 }
628 cds_list_for_each_entry_safe(client_list_element, tmp,
629 &list->list, node) {
630 free(client_list_element);
631 }
632 pthread_mutex_destroy(&list->lock);
633 call_rcu(&list->rcu_node, free_notification_client_list_rcu);
634 }
635
636 static
637 struct notification_client_list *notification_client_list_create(
638 const struct lttng_trigger *trigger)
639 {
640 struct notification_client_list *client_list =
641 zmalloc(sizeof(*client_list));
642
643 if (!client_list) {
644 goto error;
645 }
646 pthread_mutex_init(&client_list->lock, NULL);
647 urcu_ref_init(&client_list->ref);
648 cds_lfht_node_init(&client_list->notification_trigger_clients_ht_node);
649 CDS_INIT_LIST_HEAD(&client_list->list);
650 client_list->trigger = trigger;
651 error:
652 return client_list;
653 }
654
655 static
656 void publish_notification_client_list(
657 struct notification_thread_state *state,
658 struct notification_client_list *list)
659 {
660 const struct lttng_condition *condition =
661 lttng_trigger_get_const_condition(list->trigger);
662
663 assert(!list->notification_trigger_clients_ht);
664 notification_client_list_get(list);
665
666 list->notification_trigger_clients_ht =
667 state->notification_trigger_clients_ht;
668
669 rcu_read_lock();
670 cds_lfht_add(state->notification_trigger_clients_ht,
671 lttng_condition_hash(condition),
672 &list->notification_trigger_clients_ht_node);
673 rcu_read_unlock();
674 }
675
676 LTTNG_HIDDEN
677 void notification_client_list_put(struct notification_client_list *list)
678 {
679 if (!list) {
680 return;
681 }
682 return urcu_ref_put(&list->ref, notification_client_list_release);
683 }
684
685 /* Provides a reference to the returned list. */
686 static
687 struct notification_client_list *get_client_list_from_condition(
688 struct notification_thread_state *state,
689 const struct lttng_condition *condition)
690 {
691 struct cds_lfht_node *node;
692 struct cds_lfht_iter iter;
693 struct notification_client_list *list = NULL;
694
695 rcu_read_lock();
696 cds_lfht_lookup(state->notification_trigger_clients_ht,
697 lttng_condition_hash(condition),
698 match_client_list_condition,
699 condition,
700 &iter);
701 node = cds_lfht_iter_get_node(&iter);
702 if (node) {
703 list = container_of(node, struct notification_client_list,
704 notification_trigger_clients_ht_node);
705 list = notification_client_list_get(list) ? list : NULL;
706 }
707
708 rcu_read_unlock();
709 return list;
710 }
711
712 static
713 int evaluate_channel_condition_for_client(
714 const struct lttng_condition *condition,
715 struct notification_thread_state *state,
716 struct lttng_evaluation **evaluation,
717 uid_t *session_uid, gid_t *session_gid)
718 {
719 int ret;
720 struct cds_lfht_iter iter;
721 struct cds_lfht_node *node;
722 struct channel_info *channel_info = NULL;
723 struct channel_key *channel_key = NULL;
724 struct channel_state_sample *last_sample = NULL;
725 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
726
727 rcu_read_lock();
728
729 /* Find the channel associated with the condition. */
730 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter,
731 channel_trigger_list, channel_triggers_ht_node) {
732 struct lttng_trigger_list_element *element;
733
734 cds_list_for_each_entry(element, &channel_trigger_list->list, node) {
735 const struct lttng_condition *current_condition =
736 lttng_trigger_get_const_condition(
737 element->trigger);
738
739 assert(current_condition);
740 if (!lttng_condition_is_equal(condition,
741 current_condition)) {
742 continue;
743 }
744
745 /* Found the trigger, save the channel key. */
746 channel_key = &channel_trigger_list->channel_key;
747 break;
748 }
749 if (channel_key) {
750 /* The channel key was found stop iteration. */
751 break;
752 }
753 }
754
755 if (!channel_key){
756 /* No channel found; normal exit. */
757 DBG("[notification-thread] No known channel associated with newly subscribed-to condition");
758 ret = 0;
759 goto end;
760 }
761
762 /* Fetch channel info for the matching channel. */
763 cds_lfht_lookup(state->channels_ht,
764 hash_channel_key(channel_key),
765 match_channel_info,
766 channel_key,
767 &iter);
768 node = cds_lfht_iter_get_node(&iter);
769 assert(node);
770 channel_info = caa_container_of(node, struct channel_info,
771 channels_ht_node);
772
773 /* Retrieve the channel's last sample, if it exists. */
774 cds_lfht_lookup(state->channel_state_ht,
775 hash_channel_key(channel_key),
776 match_channel_state_sample,
777 channel_key,
778 &iter);
779 node = cds_lfht_iter_get_node(&iter);
780 if (node) {
781 last_sample = caa_container_of(node,
782 struct channel_state_sample,
783 channel_state_ht_node);
784 } else {
785 /* Nothing to evaluate, no sample was ever taken. Normal exit */
786 DBG("[notification-thread] No channel sample associated with newly subscribed-to condition");
787 ret = 0;
788 goto end;
789 }
790
791 ret = evaluate_buffer_condition(condition, evaluation, state,
792 NULL, last_sample,
793 0, channel_info->session_info->consumed_data_size,
794 channel_info);
795 if (ret) {
796 WARN("[notification-thread] Fatal error occurred while evaluating a newly subscribed-to condition");
797 goto end;
798 }
799
800 *session_uid = channel_info->session_info->uid;
801 *session_gid = channel_info->session_info->gid;
802 end:
803 rcu_read_unlock();
804 return ret;
805 }
806
807 static
808 const char *get_condition_session_name(const struct lttng_condition *condition)
809 {
810 const char *session_name = NULL;
811 enum lttng_condition_status status;
812
813 switch (lttng_condition_get_type(condition)) {
814 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
815 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
816 status = lttng_condition_buffer_usage_get_session_name(
817 condition, &session_name);
818 break;
819 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
820 status = lttng_condition_session_consumed_size_get_session_name(
821 condition, &session_name);
822 break;
823 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
824 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
825 status = lttng_condition_session_rotation_get_session_name(
826 condition, &session_name);
827 break;
828 default:
829 abort();
830 }
831 if (status != LTTNG_CONDITION_STATUS_OK) {
832 ERR("[notification-thread] Failed to retrieve session rotation condition's session name");
833 goto end;
834 }
835 end:
836 return session_name;
837 }
838
839 static
840 int evaluate_session_condition_for_client(
841 const struct lttng_condition *condition,
842 struct notification_thread_state *state,
843 struct lttng_evaluation **evaluation,
844 uid_t *session_uid, gid_t *session_gid)
845 {
846 int ret;
847 struct cds_lfht_iter iter;
848 struct cds_lfht_node *node;
849 const char *session_name;
850 struct session_info *session_info = NULL;
851
852 rcu_read_lock();
853 session_name = get_condition_session_name(condition);
854
855 /* Find the session associated with the trigger. */
856 cds_lfht_lookup(state->sessions_ht,
857 hash_key_str(session_name, lttng_ht_seed),
858 match_session,
859 session_name,
860 &iter);
861 node = cds_lfht_iter_get_node(&iter);
862 if (!node) {
863 DBG("[notification-thread] No known session matching name \"%s\"",
864 session_name);
865 ret = 0;
866 goto end;
867 }
868
869 session_info = caa_container_of(node, struct session_info,
870 sessions_ht_node);
871 session_info_get(session_info);
872
873 /*
874 * Evaluation is performed in-line here since only one type of
875 * session-bound condition is handled for the moment.
876 */
877 switch (lttng_condition_get_type(condition)) {
878 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
879 if (!session_info->rotation.ongoing) {
880 ret = 0;
881 goto end_session_put;
882 }
883
884 *evaluation = lttng_evaluation_session_rotation_ongoing_create(
885 session_info->rotation.id);
886 if (!*evaluation) {
887 /* Fatal error. */
888 ERR("[notification-thread] Failed to create session rotation ongoing evaluation for session \"%s\"",
889 session_info->name);
890 ret = -1;
891 goto end_session_put;
892 }
893 ret = 0;
894 break;
895 default:
896 ret = 0;
897 goto end_session_put;
898 }
899
900 *session_uid = session_info->uid;
901 *session_gid = session_info->gid;
902
903 end_session_put:
904 session_info_put(session_info);
905 end:
906 rcu_read_unlock();
907 return ret;
908 }
909
910 static
911 int evaluate_condition_for_client(const struct lttng_trigger *trigger,
912 const struct lttng_condition *condition,
913 struct notification_client *client,
914 struct notification_thread_state *state)
915 {
916 int ret;
917 struct lttng_evaluation *evaluation = NULL;
918 struct notification_client_list client_list = {
919 .lock = PTHREAD_MUTEX_INITIALIZER,
920 };
921 struct notification_client_list_element client_list_element = { 0 };
922 uid_t object_uid = 0;
923 gid_t object_gid = 0;
924
925 assert(trigger);
926 assert(condition);
927 assert(client);
928 assert(state);
929
930 switch (get_condition_binding_object(condition)) {
931 case LTTNG_OBJECT_TYPE_SESSION:
932 ret = evaluate_session_condition_for_client(condition, state,
933 &evaluation, &object_uid, &object_gid);
934 break;
935 case LTTNG_OBJECT_TYPE_CHANNEL:
936 ret = evaluate_channel_condition_for_client(condition, state,
937 &evaluation, &object_uid, &object_gid);
938 break;
939 case LTTNG_OBJECT_TYPE_NONE:
940 DBG("[notification-thread] Newly subscribed-to condition not bound to object, nothing to evaluate");
941 ret = 0;
942 goto end;
943 case LTTNG_OBJECT_TYPE_UNKNOWN:
944 default:
945 ret = -1;
946 goto end;
947 }
948 if (ret) {
949 /* Fatal error. */
950 goto end;
951 }
952 if (!evaluation) {
953 /* Evaluation yielded nothing. Normal exit. */
954 DBG("[notification-thread] Newly subscribed-to condition evaluated to false, nothing to report to client");
955 ret = 0;
956 goto end;
957 }
958
959 /*
960 * Create a temporary client list with the client currently
961 * subscribing.
962 */
963 cds_lfht_node_init(&client_list.notification_trigger_clients_ht_node);
964 CDS_INIT_LIST_HEAD(&client_list.list);
965 client_list.trigger = trigger;
966
967 CDS_INIT_LIST_HEAD(&client_list_element.node);
968 client_list_element.client = client;
969 cds_list_add(&client_list_element.node, &client_list.list);
970
971 /* Send evaluation result to the newly-subscribed client. */
972 DBG("[notification-thread] Newly subscribed-to condition evaluated to true, notifying client");
973 ret = send_evaluation_to_clients(trigger, evaluation, &client_list,
974 state, object_uid, object_gid);
975
976 end:
977 return ret;
978 }
979
980 static
981 int notification_thread_client_subscribe(struct notification_client *client,
982 struct lttng_condition *condition,
983 struct notification_thread_state *state,
984 enum lttng_notification_channel_status *_status)
985 {
986 int ret = 0;
987 struct notification_client_list *client_list = NULL;
988 struct lttng_condition_list_element *condition_list_element = NULL;
989 struct notification_client_list_element *client_list_element = NULL;
990 enum lttng_notification_channel_status status =
991 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
992
993 /*
994 * Ensure that the client has not already subscribed to this condition
995 * before.
996 */
997 cds_list_for_each_entry(condition_list_element, &client->condition_list, node) {
998 if (lttng_condition_is_equal(condition_list_element->condition,
999 condition)) {
1000 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ALREADY_SUBSCRIBED;
1001 goto end;
1002 }
1003 }
1004
1005 condition_list_element = zmalloc(sizeof(*condition_list_element));
1006 if (!condition_list_element) {
1007 ret = -1;
1008 goto error;
1009 }
1010 client_list_element = zmalloc(sizeof(*client_list_element));
1011 if (!client_list_element) {
1012 ret = -1;
1013 goto error;
1014 }
1015
1016 /*
1017 * Add the newly-subscribed condition to the client's subscription list.
1018 */
1019 CDS_INIT_LIST_HEAD(&condition_list_element->node);
1020 condition_list_element->condition = condition;
1021 cds_list_add(&condition_list_element->node, &client->condition_list);
1022
1023 client_list = get_client_list_from_condition(state, condition);
1024 if (!client_list) {
1025 /*
1026 * No notification-emiting trigger registered with this
1027 * condition. We don't evaluate the condition right away
1028 * since this trigger is not registered yet.
1029 */
1030 free(client_list_element);
1031 goto end;
1032 }
1033
1034 /*
1035 * The condition to which the client just subscribed is evaluated
1036 * at this point so that conditions that are already TRUE result
1037 * in a notification being sent out.
1038 *
1039 * The client_list's trigger is used without locking the list itself.
1040 * This is correct since the list doesn't own the trigger and the
1041 * object is immutable.
1042 */
1043 if (evaluate_condition_for_client(client_list->trigger, condition,
1044 client, state)) {
1045 WARN("[notification-thread] Evaluation of a condition on client subscription failed, aborting.");
1046 ret = -1;
1047 free(client_list_element);
1048 goto end;
1049 }
1050
1051 /*
1052 * Add the client to the list of clients interested in a given trigger
1053 * if a "notification" trigger with a corresponding condition was
1054 * added prior.
1055 */
1056 client_list_element->client = client;
1057 CDS_INIT_LIST_HEAD(&client_list_element->node);
1058
1059 pthread_mutex_lock(&client_list->lock);
1060 cds_list_add(&client_list_element->node, &client_list->list);
1061 pthread_mutex_unlock(&client_list->lock);
1062 end:
1063 if (_status) {
1064 *_status = status;
1065 }
1066 if (client_list) {
1067 notification_client_list_put(client_list);
1068 }
1069 return ret;
1070 error:
1071 free(condition_list_element);
1072 free(client_list_element);
1073 return ret;
1074 }
1075
1076 static
1077 int notification_thread_client_unsubscribe(
1078 struct notification_client *client,
1079 struct lttng_condition *condition,
1080 struct notification_thread_state *state,
1081 enum lttng_notification_channel_status *_status)
1082 {
1083 struct notification_client_list *client_list;
1084 struct lttng_condition_list_element *condition_list_element,
1085 *condition_tmp;
1086 struct notification_client_list_element *client_list_element,
1087 *client_tmp;
1088 bool condition_found = false;
1089 enum lttng_notification_channel_status status =
1090 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
1091
1092 /* Remove the condition from the client's condition list. */
1093 cds_list_for_each_entry_safe(condition_list_element, condition_tmp,
1094 &client->condition_list, node) {
1095 if (!lttng_condition_is_equal(condition_list_element->condition,
1096 condition)) {
1097 continue;
1098 }
1099
1100 cds_list_del(&condition_list_element->node);
1101 /*
1102 * The caller may be iterating on the client's conditions to
1103 * tear down a client's connection. In this case, the condition
1104 * will be destroyed at the end.
1105 */
1106 if (condition != condition_list_element->condition) {
1107 lttng_condition_destroy(
1108 condition_list_element->condition);
1109 }
1110 free(condition_list_element);
1111 condition_found = true;
1112 break;
1113 }
1114
1115 if (!condition_found) {
1116 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNKNOWN_CONDITION;
1117 goto end;
1118 }
1119
1120 /*
1121 * Remove the client from the list of clients interested the trigger
1122 * matching the condition.
1123 */
1124 client_list = get_client_list_from_condition(state, condition);
1125 if (!client_list) {
1126 goto end;
1127 }
1128
1129 pthread_mutex_lock(&client_list->lock);
1130 cds_list_for_each_entry_safe(client_list_element, client_tmp,
1131 &client_list->list, node) {
1132 if (client_list_element->client->id != client->id) {
1133 continue;
1134 }
1135 cds_list_del(&client_list_element->node);
1136 free(client_list_element);
1137 break;
1138 }
1139 pthread_mutex_unlock(&client_list->lock);
1140 notification_client_list_put(client_list);
1141 client_list = NULL;
1142 end:
1143 lttng_condition_destroy(condition);
1144 if (_status) {
1145 *_status = status;
1146 }
1147 return 0;
1148 }
1149
1150 static
1151 void free_notification_client_rcu(struct rcu_head *node)
1152 {
1153 free(caa_container_of(node, struct notification_client, rcu_node));
1154 }
1155
1156 static
1157 void notification_client_destroy(struct notification_client *client,
1158 struct notification_thread_state *state)
1159 {
1160 if (!client) {
1161 return;
1162 }
1163
1164 /*
1165 * The client object is not reachable by other threads, no need to lock
1166 * the client here.
1167 */
1168 if (client->socket >= 0) {
1169 (void) lttcomm_close_unix_sock(client->socket);
1170 client->socket = -1;
1171 }
1172 client->communication.active = false;
1173 lttng_payload_reset(&client->communication.inbound.payload);
1174 lttng_payload_reset(&client->communication.outbound.payload);
1175 pthread_mutex_destroy(&client->lock);
1176 call_rcu(&client->rcu_node, free_notification_client_rcu);
1177 }
1178
1179 /*
1180 * Call with rcu_read_lock held (and hold for the lifetime of the returned
1181 * client pointer).
1182 */
1183 static
1184 struct notification_client *get_client_from_socket(int socket,
1185 struct notification_thread_state *state)
1186 {
1187 struct cds_lfht_iter iter;
1188 struct cds_lfht_node *node;
1189 struct notification_client *client = NULL;
1190
1191 cds_lfht_lookup(state->client_socket_ht,
1192 hash_client_socket(socket),
1193 match_client_socket,
1194 (void *) (unsigned long) socket,
1195 &iter);
1196 node = cds_lfht_iter_get_node(&iter);
1197 if (!node) {
1198 goto end;
1199 }
1200
1201 client = caa_container_of(node, struct notification_client,
1202 client_socket_ht_node);
1203 end:
1204 return client;
1205 }
1206
1207 /*
1208 * Call with rcu_read_lock held (and hold for the lifetime of the returned
1209 * client pointer).
1210 */
1211 static
1212 struct notification_client *get_client_from_id(notification_client_id id,
1213 struct notification_thread_state *state)
1214 {
1215 struct cds_lfht_iter iter;
1216 struct cds_lfht_node *node;
1217 struct notification_client *client = NULL;
1218
1219 cds_lfht_lookup(state->client_id_ht,
1220 hash_client_id(id),
1221 match_client_id,
1222 &id,
1223 &iter);
1224 node = cds_lfht_iter_get_node(&iter);
1225 if (!node) {
1226 goto end;
1227 }
1228
1229 client = caa_container_of(node, struct notification_client,
1230 client_id_ht_node);
1231 end:
1232 return client;
1233 }
1234
1235 static
1236 bool buffer_usage_condition_applies_to_channel(
1237 const struct lttng_condition *condition,
1238 const struct channel_info *channel_info)
1239 {
1240 enum lttng_condition_status status;
1241 enum lttng_domain_type condition_domain;
1242 const char *condition_session_name = NULL;
1243 const char *condition_channel_name = NULL;
1244
1245 status = lttng_condition_buffer_usage_get_domain_type(condition,
1246 &condition_domain);
1247 assert(status == LTTNG_CONDITION_STATUS_OK);
1248 if (channel_info->key.domain != condition_domain) {
1249 goto fail;
1250 }
1251
1252 status = lttng_condition_buffer_usage_get_session_name(
1253 condition, &condition_session_name);
1254 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
1255
1256 status = lttng_condition_buffer_usage_get_channel_name(
1257 condition, &condition_channel_name);
1258 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_channel_name);
1259
1260 if (strcmp(channel_info->session_info->name, condition_session_name)) {
1261 goto fail;
1262 }
1263 if (strcmp(channel_info->name, condition_channel_name)) {
1264 goto fail;
1265 }
1266
1267 return true;
1268 fail:
1269 return false;
1270 }
1271
1272 static
1273 bool session_consumed_size_condition_applies_to_channel(
1274 const struct lttng_condition *condition,
1275 const struct channel_info *channel_info)
1276 {
1277 enum lttng_condition_status status;
1278 const char *condition_session_name = NULL;
1279
1280 status = lttng_condition_session_consumed_size_get_session_name(
1281 condition, &condition_session_name);
1282 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
1283
1284 if (strcmp(channel_info->session_info->name, condition_session_name)) {
1285 goto fail;
1286 }
1287
1288 return true;
1289 fail:
1290 return false;
1291 }
1292
1293 static
1294 bool trigger_applies_to_channel(const struct lttng_trigger *trigger,
1295 const struct channel_info *channel_info)
1296 {
1297 const struct lttng_condition *condition;
1298 bool trigger_applies;
1299
1300 condition = lttng_trigger_get_const_condition(trigger);
1301 if (!condition) {
1302 goto fail;
1303 }
1304
1305 switch (lttng_condition_get_type(condition)) {
1306 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
1307 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
1308 trigger_applies = buffer_usage_condition_applies_to_channel(
1309 condition, channel_info);
1310 break;
1311 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
1312 trigger_applies = session_consumed_size_condition_applies_to_channel(
1313 condition, channel_info);
1314 break;
1315 default:
1316 goto fail;
1317 }
1318
1319 return trigger_applies;
1320 fail:
1321 return false;
1322 }
1323
1324 static
1325 bool trigger_applies_to_client(struct lttng_trigger *trigger,
1326 struct notification_client *client)
1327 {
1328 bool applies = false;
1329 struct lttng_condition_list_element *condition_list_element;
1330
1331 cds_list_for_each_entry(condition_list_element, &client->condition_list,
1332 node) {
1333 applies = lttng_condition_is_equal(
1334 condition_list_element->condition,
1335 lttng_trigger_get_condition(trigger));
1336 if (applies) {
1337 break;
1338 }
1339 }
1340 return applies;
1341 }
1342
1343 /* Must be called with RCU read lock held. */
1344 static
1345 struct lttng_session_trigger_list *get_session_trigger_list(
1346 struct notification_thread_state *state,
1347 const char *session_name)
1348 {
1349 struct lttng_session_trigger_list *list = NULL;
1350 struct cds_lfht_node *node;
1351 struct cds_lfht_iter iter;
1352
1353 cds_lfht_lookup(state->session_triggers_ht,
1354 hash_key_str(session_name, lttng_ht_seed),
1355 match_session_trigger_list,
1356 session_name,
1357 &iter);
1358 node = cds_lfht_iter_get_node(&iter);
1359 if (!node) {
1360 /*
1361 * Not an error, the list of triggers applying to that session
1362 * will be initialized when the session is created.
1363 */
1364 DBG("[notification-thread] No trigger list found for session \"%s\" as it is not yet known to the notification system",
1365 session_name);
1366 goto end;
1367 }
1368
1369 list = caa_container_of(node,
1370 struct lttng_session_trigger_list,
1371 session_triggers_ht_node);
1372 end:
1373 return list;
1374 }
1375
1376 /*
1377 * Allocate an empty lttng_session_trigger_list for the session named
1378 * 'session_name'.
1379 *
1380 * No ownership of 'session_name' is assumed by the session trigger list.
1381 * It is the caller's responsability to ensure the session name is alive
1382 * for as long as this list is.
1383 */
1384 static
1385 struct lttng_session_trigger_list *lttng_session_trigger_list_create(
1386 const char *session_name,
1387 struct cds_lfht *session_triggers_ht)
1388 {
1389 struct lttng_session_trigger_list *list;
1390
1391 list = zmalloc(sizeof(*list));
1392 if (!list) {
1393 goto end;
1394 }
1395 list->session_name = session_name;
1396 CDS_INIT_LIST_HEAD(&list->list);
1397 cds_lfht_node_init(&list->session_triggers_ht_node);
1398 list->session_triggers_ht = session_triggers_ht;
1399
1400 rcu_read_lock();
1401 /* Publish the list through the session_triggers_ht. */
1402 cds_lfht_add(session_triggers_ht,
1403 hash_key_str(session_name, lttng_ht_seed),
1404 &list->session_triggers_ht_node);
1405 rcu_read_unlock();
1406 end:
1407 return list;
1408 }
1409
1410 static
1411 void free_session_trigger_list_rcu(struct rcu_head *node)
1412 {
1413 free(caa_container_of(node, struct lttng_session_trigger_list,
1414 rcu_node));
1415 }
1416
1417 static
1418 void lttng_session_trigger_list_destroy(struct lttng_session_trigger_list *list)
1419 {
1420 struct lttng_trigger_list_element *trigger_list_element, *tmp;
1421
1422 /* Empty the list element by element, and then free the list itself. */
1423 cds_list_for_each_entry_safe(trigger_list_element, tmp,
1424 &list->list, node) {
1425 cds_list_del(&trigger_list_element->node);
1426 free(trigger_list_element);
1427 }
1428 rcu_read_lock();
1429 /* Unpublish the list from the session_triggers_ht. */
1430 cds_lfht_del(list->session_triggers_ht,
1431 &list->session_triggers_ht_node);
1432 rcu_read_unlock();
1433 call_rcu(&list->rcu_node, free_session_trigger_list_rcu);
1434 }
1435
1436 static
1437 int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
1438 struct lttng_trigger *trigger)
1439 {
1440 int ret = 0;
1441 struct lttng_trigger_list_element *new_element =
1442 zmalloc(sizeof(*new_element));
1443
1444 if (!new_element) {
1445 ret = -1;
1446 goto end;
1447 }
1448 CDS_INIT_LIST_HEAD(&new_element->node);
1449 new_element->trigger = trigger;
1450 cds_list_add(&new_element->node, &list->list);
1451 end:
1452 return ret;
1453 }
1454
1455 static
1456 bool trigger_applies_to_session(const struct lttng_trigger *trigger,
1457 const char *session_name)
1458 {
1459 bool applies = false;
1460 const struct lttng_condition *condition;
1461
1462 condition = lttng_trigger_get_const_condition(trigger);
1463 switch (lttng_condition_get_type(condition)) {
1464 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
1465 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
1466 {
1467 enum lttng_condition_status condition_status;
1468 const char *condition_session_name;
1469
1470 condition_status = lttng_condition_session_rotation_get_session_name(
1471 condition, &condition_session_name);
1472 if (condition_status != LTTNG_CONDITION_STATUS_OK) {
1473 ERR("[notification-thread] Failed to retrieve session rotation condition's session name");
1474 goto end;
1475 }
1476
1477 assert(condition_session_name);
1478 applies = !strcmp(condition_session_name, session_name);
1479 break;
1480 }
1481 default:
1482 goto end;
1483 }
1484 end:
1485 return applies;
1486 }
1487
1488 /*
1489 * Allocate and initialize an lttng_session_trigger_list which contains
1490 * all triggers that apply to the session named 'session_name'.
1491 *
1492 * No ownership of 'session_name' is assumed by the session trigger list.
1493 * It is the caller's responsability to ensure the session name is alive
1494 * for as long as this list is.
1495 */
1496 static
1497 struct lttng_session_trigger_list *lttng_session_trigger_list_build(
1498 const struct notification_thread_state *state,
1499 const char *session_name)
1500 {
1501 int trigger_count = 0;
1502 struct lttng_session_trigger_list *session_trigger_list = NULL;
1503 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1504 struct cds_lfht_iter iter;
1505
1506 session_trigger_list = lttng_session_trigger_list_create(session_name,
1507 state->session_triggers_ht);
1508
1509 /* Add all triggers applying to the session named 'session_name'. */
1510 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1511 node) {
1512 int ret;
1513
1514 if (!trigger_applies_to_session(trigger_ht_element->trigger,
1515 session_name)) {
1516 continue;
1517 }
1518
1519 ret = lttng_session_trigger_list_add(session_trigger_list,
1520 trigger_ht_element->trigger);
1521 if (ret) {
1522 goto error;
1523 }
1524
1525 trigger_count++;
1526 }
1527
1528 DBG("[notification-thread] Found %i triggers that apply to newly created session",
1529 trigger_count);
1530 return session_trigger_list;
1531 error:
1532 lttng_session_trigger_list_destroy(session_trigger_list);
1533 return NULL;
1534 }
1535
1536 static
1537 struct session_info *find_or_create_session_info(
1538 struct notification_thread_state *state,
1539 const char *name, uid_t uid, gid_t gid)
1540 {
1541 struct session_info *session = NULL;
1542 struct cds_lfht_node *node;
1543 struct cds_lfht_iter iter;
1544 struct lttng_session_trigger_list *trigger_list;
1545
1546 rcu_read_lock();
1547 cds_lfht_lookup(state->sessions_ht,
1548 hash_key_str(name, lttng_ht_seed),
1549 match_session,
1550 name,
1551 &iter);
1552 node = cds_lfht_iter_get_node(&iter);
1553 if (node) {
1554 DBG("[notification-thread] Found session info of session \"%s\" (uid = %i, gid = %i)",
1555 name, uid, gid);
1556 session = caa_container_of(node, struct session_info,
1557 sessions_ht_node);
1558 assert(session->uid == uid);
1559 assert(session->gid == gid);
1560 session_info_get(session);
1561 goto end;
1562 }
1563
1564 trigger_list = lttng_session_trigger_list_build(state, name);
1565 if (!trigger_list) {
1566 goto error;
1567 }
1568
1569 session = session_info_create(name, uid, gid, trigger_list,
1570 state->sessions_ht);
1571 if (!session) {
1572 ERR("[notification-thread] Failed to allocation session info for session \"%s\" (uid = %i, gid = %i)",
1573 name, uid, gid);
1574 lttng_session_trigger_list_destroy(trigger_list);
1575 goto error;
1576 }
1577 trigger_list = NULL;
1578
1579 cds_lfht_add(state->sessions_ht, hash_key_str(name, lttng_ht_seed),
1580 &session->sessions_ht_node);
1581 end:
1582 rcu_read_unlock();
1583 return session;
1584 error:
1585 rcu_read_unlock();
1586 session_info_put(session);
1587 return NULL;
1588 }
1589
1590 static
1591 int handle_notification_thread_command_add_channel(
1592 struct notification_thread_state *state,
1593 const char *session_name, uid_t session_uid, gid_t session_gid,
1594 const char *channel_name, enum lttng_domain_type channel_domain,
1595 uint64_t channel_key_int, uint64_t channel_capacity,
1596 enum lttng_error_code *cmd_result)
1597 {
1598 struct cds_list_head trigger_list;
1599 struct channel_info *new_channel_info = NULL;
1600 struct channel_key channel_key = {
1601 .key = channel_key_int,
1602 .domain = channel_domain,
1603 };
1604 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
1605 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1606 int trigger_count = 0;
1607 struct cds_lfht_iter iter;
1608 struct session_info *session_info = NULL;
1609
1610 DBG("[notification-thread] Adding channel %s from session %s, channel key = %" PRIu64 " in %s domain",
1611 channel_name, session_name, channel_key_int,
1612 channel_domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
1613
1614 CDS_INIT_LIST_HEAD(&trigger_list);
1615
1616 session_info = find_or_create_session_info(state, session_name,
1617 session_uid, session_gid);
1618 if (!session_info) {
1619 /* Allocation error or an internal error occurred. */
1620 goto error;
1621 }
1622
1623 new_channel_info = channel_info_create(channel_name, &channel_key,
1624 channel_capacity, session_info);
1625 if (!new_channel_info) {
1626 goto error;
1627 }
1628
1629 rcu_read_lock();
1630 /* Build a list of all triggers applying to the new channel. */
1631 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1632 node) {
1633 struct lttng_trigger_list_element *new_element;
1634
1635 if (!trigger_applies_to_channel(trigger_ht_element->trigger,
1636 new_channel_info)) {
1637 continue;
1638 }
1639
1640 new_element = zmalloc(sizeof(*new_element));
1641 if (!new_element) {
1642 rcu_read_unlock();
1643 goto error;
1644 }
1645 CDS_INIT_LIST_HEAD(&new_element->node);
1646 new_element->trigger = trigger_ht_element->trigger;
1647 cds_list_add(&new_element->node, &trigger_list);
1648 trigger_count++;
1649 }
1650 rcu_read_unlock();
1651
1652 DBG("[notification-thread] Found %i triggers that apply to newly added channel",
1653 trigger_count);
1654 channel_trigger_list = zmalloc(sizeof(*channel_trigger_list));
1655 if (!channel_trigger_list) {
1656 goto error;
1657 }
1658 channel_trigger_list->channel_key = new_channel_info->key;
1659 CDS_INIT_LIST_HEAD(&channel_trigger_list->list);
1660 cds_lfht_node_init(&channel_trigger_list->channel_triggers_ht_node);
1661 cds_list_splice(&trigger_list, &channel_trigger_list->list);
1662
1663 rcu_read_lock();
1664 /* Add channel to the channel_ht which owns the channel_infos. */
1665 cds_lfht_add(state->channels_ht,
1666 hash_channel_key(&new_channel_info->key),
1667 &new_channel_info->channels_ht_node);
1668 /*
1669 * Add the list of triggers associated with this channel to the
1670 * channel_triggers_ht.
1671 */
1672 cds_lfht_add(state->channel_triggers_ht,
1673 hash_channel_key(&new_channel_info->key),
1674 &channel_trigger_list->channel_triggers_ht_node);
1675 rcu_read_unlock();
1676 session_info_put(session_info);
1677 *cmd_result = LTTNG_OK;
1678 return 0;
1679 error:
1680 channel_info_destroy(new_channel_info);
1681 session_info_put(session_info);
1682 return 1;
1683 }
1684
1685 static
1686 void free_channel_trigger_list_rcu(struct rcu_head *node)
1687 {
1688 free(caa_container_of(node, struct lttng_channel_trigger_list,
1689 rcu_node));
1690 }
1691
1692 static
1693 void free_channel_state_sample_rcu(struct rcu_head *node)
1694 {
1695 free(caa_container_of(node, struct channel_state_sample,
1696 rcu_node));
1697 }
1698
1699 static
1700 int handle_notification_thread_command_remove_channel(
1701 struct notification_thread_state *state,
1702 uint64_t channel_key, enum lttng_domain_type domain,
1703 enum lttng_error_code *cmd_result)
1704 {
1705 struct cds_lfht_node *node;
1706 struct cds_lfht_iter iter;
1707 struct lttng_channel_trigger_list *trigger_list;
1708 struct lttng_trigger_list_element *trigger_list_element, *tmp;
1709 struct channel_key key = { .key = channel_key, .domain = domain };
1710 struct channel_info *channel_info;
1711
1712 DBG("[notification-thread] Removing channel key = %" PRIu64 " in %s domain",
1713 channel_key, domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
1714
1715 rcu_read_lock();
1716
1717 cds_lfht_lookup(state->channel_triggers_ht,
1718 hash_channel_key(&key),
1719 match_channel_trigger_list,
1720 &key,
1721 &iter);
1722 node = cds_lfht_iter_get_node(&iter);
1723 /*
1724 * There is a severe internal error if we are being asked to remove a
1725 * channel that doesn't exist.
1726 */
1727 if (!node) {
1728 ERR("[notification-thread] Channel being removed is unknown to the notification thread");
1729 goto end;
1730 }
1731
1732 /* Free the list of triggers associated with this channel. */
1733 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
1734 channel_triggers_ht_node);
1735 cds_list_for_each_entry_safe(trigger_list_element, tmp,
1736 &trigger_list->list, node) {
1737 cds_list_del(&trigger_list_element->node);
1738 free(trigger_list_element);
1739 }
1740 cds_lfht_del(state->channel_triggers_ht, node);
1741 call_rcu(&trigger_list->rcu_node, free_channel_trigger_list_rcu);
1742
1743 /* Free sampled channel state. */
1744 cds_lfht_lookup(state->channel_state_ht,
1745 hash_channel_key(&key),
1746 match_channel_state_sample,
1747 &key,
1748 &iter);
1749 node = cds_lfht_iter_get_node(&iter);
1750 /*
1751 * This is expected to be NULL if the channel is destroyed before we
1752 * received a sample.
1753 */
1754 if (node) {
1755 struct channel_state_sample *sample = caa_container_of(node,
1756 struct channel_state_sample,
1757 channel_state_ht_node);
1758
1759 cds_lfht_del(state->channel_state_ht, node);
1760 call_rcu(&sample->rcu_node, free_channel_state_sample_rcu);
1761 }
1762
1763 /* Remove the channel from the channels_ht and free it. */
1764 cds_lfht_lookup(state->channels_ht,
1765 hash_channel_key(&key),
1766 match_channel_info,
1767 &key,
1768 &iter);
1769 node = cds_lfht_iter_get_node(&iter);
1770 assert(node);
1771 channel_info = caa_container_of(node, struct channel_info,
1772 channels_ht_node);
1773 cds_lfht_del(state->channels_ht, node);
1774 channel_info_destroy(channel_info);
1775 end:
1776 rcu_read_unlock();
1777 *cmd_result = LTTNG_OK;
1778 return 0;
1779 }
1780
1781 static
1782 int handle_notification_thread_command_session_rotation(
1783 struct notification_thread_state *state,
1784 enum notification_thread_command_type cmd_type,
1785 const char *session_name, uid_t session_uid, gid_t session_gid,
1786 uint64_t trace_archive_chunk_id,
1787 struct lttng_trace_archive_location *location,
1788 enum lttng_error_code *_cmd_result)
1789 {
1790 int ret = 0;
1791 enum lttng_error_code cmd_result = LTTNG_OK;
1792 struct lttng_session_trigger_list *trigger_list;
1793 struct lttng_trigger_list_element *trigger_list_element;
1794 struct session_info *session_info;
1795 const struct lttng_credentials session_creds = {
1796 .uid = LTTNG_OPTIONAL_INIT_VALUE(session_uid),
1797 .gid = LTTNG_OPTIONAL_INIT_VALUE(session_gid),
1798 };
1799
1800 rcu_read_lock();
1801
1802 session_info = find_or_create_session_info(state, session_name,
1803 session_uid, session_gid);
1804 if (!session_info) {
1805 /* Allocation error or an internal error occurred. */
1806 ret = -1;
1807 cmd_result = LTTNG_ERR_NOMEM;
1808 goto end;
1809 }
1810
1811 session_info->rotation.ongoing =
1812 cmd_type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING;
1813 session_info->rotation.id = trace_archive_chunk_id;
1814 trigger_list = get_session_trigger_list(state, session_name);
1815 if (!trigger_list) {
1816 DBG("[notification-thread] No triggers applying to session \"%s\" found",
1817 session_name);
1818 goto end;
1819 }
1820
1821 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
1822 node) {
1823 const struct lttng_condition *condition;
1824 struct lttng_trigger *trigger;
1825 struct notification_client_list *client_list;
1826 struct lttng_evaluation *evaluation = NULL;
1827 enum lttng_condition_type condition_type;
1828 enum action_executor_status executor_status;
1829
1830 trigger = trigger_list_element->trigger;
1831 condition = lttng_trigger_get_const_condition(trigger);
1832 assert(condition);
1833 condition_type = lttng_condition_get_type(condition);
1834
1835 if (condition_type == LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING &&
1836 cmd_type != NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING) {
1837 continue;
1838 } else if (condition_type == LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED &&
1839 cmd_type != NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED) {
1840 continue;
1841 }
1842
1843 client_list = get_client_list_from_condition(state, condition);
1844 if (cmd_type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING) {
1845 evaluation = lttng_evaluation_session_rotation_ongoing_create(
1846 trace_archive_chunk_id);
1847 } else {
1848 evaluation = lttng_evaluation_session_rotation_completed_create(
1849 trace_archive_chunk_id, location);
1850 }
1851
1852 if (!evaluation) {
1853 /* Internal error */
1854 ret = -1;
1855 cmd_result = LTTNG_ERR_UNK;
1856 goto put_list;
1857 }
1858
1859 /*
1860 * Ownership of `evaluation` transferred to the action executor
1861 * no matter the result.
1862 */
1863 executor_status = action_executor_enqueue(state->executor,
1864 trigger, evaluation, &session_creds,
1865 client_list);
1866 evaluation = NULL;
1867 switch (executor_status) {
1868 case ACTION_EXECUTOR_STATUS_OK:
1869 break;
1870 case ACTION_EXECUTOR_STATUS_ERROR:
1871 case ACTION_EXECUTOR_STATUS_INVALID:
1872 /*
1873 * TODO Add trigger identification (name/id) when
1874 * it is added to the API.
1875 */
1876 ERR("Fatal error occurred while enqueuing action associated with session rotation trigger");
1877 ret = -1;
1878 goto put_list;
1879 case ACTION_EXECUTOR_STATUS_OVERFLOW:
1880 /*
1881 * TODO Add trigger identification (name/id) when
1882 * it is added to the API.
1883 *
1884 * Not a fatal error.
1885 */
1886 WARN("No space left when enqueuing action associated with session rotation trigger");
1887 ret = 0;
1888 goto put_list;
1889 default:
1890 abort();
1891 }
1892
1893 put_list:
1894 notification_client_list_put(client_list);
1895 if (caa_unlikely(ret)) {
1896 break;
1897 }
1898 }
1899 end:
1900 session_info_put(session_info);
1901 *_cmd_result = cmd_result;
1902 rcu_read_unlock();
1903 return ret;
1904 }
1905
1906 static
1907 int handle_notification_thread_command_add_tracer_event_source(
1908 struct notification_thread_state *state,
1909 int tracer_event_source_fd,
1910 enum lttng_domain_type domain_type,
1911 enum lttng_error_code *_cmd_result)
1912 {
1913 int ret = 0;
1914 enum lttng_error_code cmd_result = LTTNG_OK;
1915 struct notification_event_tracer_event_source_element *element = NULL;
1916
1917 element = zmalloc(sizeof(*element));
1918 if (!element) {
1919 cmd_result = LTTNG_ERR_NOMEM;
1920 ret = -1;
1921 goto end;
1922 }
1923
1924 element->fd = tracer_event_source_fd;
1925 element->domain = domain_type;
1926
1927 cds_list_add(&element->node, &state->tracer_event_sources_list);
1928
1929 DBG3("[notification-thread] Adding tracer event source fd to poll set: tracer_event_source_fd = %d, domain = '%s'",
1930 tracer_event_source_fd,
1931 lttng_domain_type_str(domain_type));
1932
1933 /* Adding the read side pipe to the event poll. */
1934 ret = lttng_poll_add(&state->events, tracer_event_source_fd, LPOLLIN | LPOLLERR);
1935 if (ret < 0) {
1936 ERR("[notification-thread] Failed to add tracer event source to poll set: tracer_event_source_fd = %d, domain = '%s'",
1937 tracer_event_source_fd,
1938 lttng_domain_type_str(element->domain));
1939 cds_list_del(&element->node);
1940 free(element);
1941 goto end;
1942 }
1943
1944 element->is_fd_in_poll_set = true;
1945
1946 end:
1947 *_cmd_result = cmd_result;
1948 return ret;
1949 }
1950
1951 static
1952 int handle_notification_thread_command_remove_tracer_event_source(
1953 struct notification_thread_state *state,
1954 int tracer_event_source_fd,
1955 enum lttng_error_code *_cmd_result)
1956 {
1957 int ret = 0;
1958 enum lttng_error_code cmd_result = LTTNG_OK;
1959 struct notification_event_tracer_event_source_element *source_element = NULL, *tmp;
1960
1961 cds_list_for_each_entry_safe(source_element, tmp,
1962 &state->tracer_event_sources_list, node) {
1963 if (source_element->fd != tracer_event_source_fd) {
1964 continue;
1965 }
1966
1967 DBG("[notification-thread] Removed tracer event source from poll set: tracer_event_source_fd = %d, domain = '%s'",
1968 tracer_event_source_fd,
1969 lttng_domain_type_str(source_element->domain));
1970 cds_list_del(&source_element->node);
1971 break;
1972 }
1973
1974 /* It should always be found. */
1975 assert(source_element);
1976
1977 if (!source_element->is_fd_in_poll_set) {
1978 /* Skip the poll set removal. */
1979 goto end;
1980 }
1981
1982 DBG3("[notification-thread] Removing tracer event source from poll set: tracer_event_source_fd = %d, domain = '%s'",
1983 tracer_event_source_fd,
1984 lttng_domain_type_str(source_element->domain));
1985
1986 /* Removing the fd from the event poll set. */
1987 ret = lttng_poll_del(&state->events, tracer_event_source_fd);
1988 if (ret < 0) {
1989 ERR("[notification-thread] Failed to remove tracer event source from poll set: tracer_event_source_fd = %d, domain = '%s'",
1990 tracer_event_source_fd,
1991 lttng_domain_type_str(source_element->domain));
1992 cmd_result = LTTNG_ERR_FATAL;
1993 goto end;
1994 }
1995
1996 source_element->is_fd_in_poll_set = false;
1997
1998 end:
1999 free(source_element);
2000 *_cmd_result = cmd_result;
2001 return ret;
2002 }
2003
2004 int handle_notification_thread_remove_tracer_event_source_no_result(
2005 struct notification_thread_state *state,
2006 int tracer_event_source_fd)
2007 {
2008 int ret;
2009 enum lttng_error_code cmd_result;
2010
2011 ret = handle_notification_thread_command_remove_tracer_event_source(
2012 state, tracer_event_source_fd, &cmd_result);
2013 (void) cmd_result;
2014 return ret;
2015 }
2016
2017 static int handle_notification_thread_command_list_triggers(
2018 struct notification_thread_handle *handle,
2019 struct notification_thread_state *state,
2020 uid_t client_uid,
2021 struct lttng_triggers **triggers,
2022 enum lttng_error_code *_cmd_result)
2023 {
2024 int ret = 0;
2025 enum lttng_error_code cmd_result = LTTNG_OK;
2026 struct cds_lfht_iter iter;
2027 struct lttng_trigger_ht_element *trigger_ht_element;
2028 struct lttng_triggers *local_triggers = NULL;
2029 const struct lttng_credentials *creds;
2030
2031 rcu_read_lock();
2032
2033 local_triggers = lttng_triggers_create();
2034 if (!local_triggers) {
2035 /* Not a fatal error. */
2036 cmd_result = LTTNG_ERR_NOMEM;
2037 goto end;
2038 }
2039
2040 cds_lfht_for_each_entry(state->triggers_ht, &iter,
2041 trigger_ht_element, node) {
2042 /*
2043 * Only return the triggers to which the client has access.
2044 * The root user has visibility over all triggers.
2045 */
2046 creds = lttng_trigger_get_credentials(trigger_ht_element->trigger);
2047 if (client_uid != lttng_credentials_get_uid(creds) && client_uid != 0) {
2048 continue;
2049 }
2050
2051 ret = lttng_triggers_add(local_triggers,
2052 trigger_ht_element->trigger);
2053 if (ret < 0) {
2054 /* Not a fatal error. */
2055 ret = 0;
2056 cmd_result = LTTNG_ERR_NOMEM;
2057 goto end;
2058 }
2059 }
2060
2061 /* Transferring ownership to the caller. */
2062 *triggers = local_triggers;
2063 local_triggers = NULL;
2064
2065 end:
2066 rcu_read_unlock();
2067 lttng_triggers_destroy(local_triggers);
2068 *_cmd_result = cmd_result;
2069 return ret;
2070 }
2071
2072 static
2073 bool condition_is_supported(struct lttng_condition *condition)
2074 {
2075 bool is_supported;
2076
2077 switch (lttng_condition_get_type(condition)) {
2078 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
2079 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
2080 {
2081 int ret;
2082 enum lttng_domain_type domain;
2083
2084 ret = lttng_condition_buffer_usage_get_domain_type(condition,
2085 &domain);
2086 assert(ret == 0);
2087
2088 if (domain != LTTNG_DOMAIN_KERNEL) {
2089 is_supported = true;
2090 goto end;
2091 }
2092
2093 /*
2094 * Older kernel tracers don't expose the API to monitor their
2095 * buffers. Therefore, we reject triggers that require that
2096 * mechanism to be available to be evaluated.
2097 *
2098 * Assume unsupported on error.
2099 */
2100 is_supported = kernel_supports_ring_buffer_snapshot_sample_positions() == 1;
2101 break;
2102 }
2103 case LTTNG_CONDITION_TYPE_EVENT_RULE_HIT:
2104 {
2105 const struct lttng_event_rule *event_rule;
2106 enum lttng_domain_type domain;
2107 const enum lttng_condition_status status =
2108 lttng_condition_event_rule_get_rule(
2109 condition, &event_rule);
2110
2111 assert(status == LTTNG_CONDITION_STATUS_OK);
2112
2113 domain = lttng_event_rule_get_domain_type(event_rule);
2114 if (domain != LTTNG_DOMAIN_KERNEL) {
2115 is_supported = true;
2116 goto end;
2117 }
2118
2119 /*
2120 * Older kernel tracers can't emit notification. Therefore, we
2121 * reject triggers that require that mechanism to be available
2122 * to be evaluated.
2123 *
2124 * Assume unsupported on error.
2125 */
2126 is_supported = kernel_supports_event_notifiers() == 1;
2127 break;
2128 }
2129 default:
2130 is_supported = true;
2131 }
2132 end:
2133 return is_supported;
2134 }
2135
2136 /* Must be called with RCU read lock held. */
2137 static
2138 int bind_trigger_to_matching_session(struct lttng_trigger *trigger,
2139 struct notification_thread_state *state)
2140 {
2141 int ret = 0;
2142 const struct lttng_condition *condition;
2143 const char *session_name;
2144 struct lttng_session_trigger_list *trigger_list;
2145
2146 condition = lttng_trigger_get_const_condition(trigger);
2147 switch (lttng_condition_get_type(condition)) {
2148 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
2149 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
2150 {
2151 enum lttng_condition_status status;
2152
2153 status = lttng_condition_session_rotation_get_session_name(
2154 condition, &session_name);
2155 if (status != LTTNG_CONDITION_STATUS_OK) {
2156 ERR("[notification-thread] Failed to bind trigger to session: unable to get 'session_rotation' condition's session name");
2157 ret = -1;
2158 goto end;
2159 }
2160 break;
2161 }
2162 default:
2163 ret = -1;
2164 goto end;
2165 }
2166
2167 trigger_list = get_session_trigger_list(state, session_name);
2168 if (!trigger_list) {
2169 DBG("[notification-thread] Unable to bind trigger applying to session \"%s\" as it is not yet known to the notification system",
2170 session_name);
2171 goto end;
2172
2173 }
2174
2175 DBG("[notification-thread] Newly registered trigger bound to session \"%s\"",
2176 session_name);
2177 ret = lttng_session_trigger_list_add(trigger_list, trigger);
2178 end:
2179 return ret;
2180 }
2181
2182 /* Must be called with RCU read lock held. */
2183 static
2184 int bind_trigger_to_matching_channels(struct lttng_trigger *trigger,
2185 struct notification_thread_state *state)
2186 {
2187 int ret = 0;
2188 struct cds_lfht_node *node;
2189 struct cds_lfht_iter iter;
2190 struct channel_info *channel;
2191
2192 cds_lfht_for_each_entry(state->channels_ht, &iter, channel,
2193 channels_ht_node) {
2194 struct lttng_trigger_list_element *trigger_list_element;
2195 struct lttng_channel_trigger_list *trigger_list;
2196 struct cds_lfht_iter lookup_iter;
2197
2198 if (!trigger_applies_to_channel(trigger, channel)) {
2199 continue;
2200 }
2201
2202 cds_lfht_lookup(state->channel_triggers_ht,
2203 hash_channel_key(&channel->key),
2204 match_channel_trigger_list,
2205 &channel->key,
2206 &lookup_iter);
2207 node = cds_lfht_iter_get_node(&lookup_iter);
2208 assert(node);
2209 trigger_list = caa_container_of(node,
2210 struct lttng_channel_trigger_list,
2211 channel_triggers_ht_node);
2212
2213 trigger_list_element = zmalloc(sizeof(*trigger_list_element));
2214 if (!trigger_list_element) {
2215 ret = -1;
2216 goto end;
2217 }
2218 CDS_INIT_LIST_HEAD(&trigger_list_element->node);
2219 trigger_list_element->trigger = trigger;
2220 cds_list_add(&trigger_list_element->node, &trigger_list->list);
2221 DBG("[notification-thread] Newly registered trigger bound to channel \"%s\"",
2222 channel->name);
2223 }
2224 end:
2225 return ret;
2226 }
2227
2228 static
2229 bool is_trigger_action_notify(const struct lttng_trigger *trigger)
2230 {
2231 bool is_notify = false;
2232 unsigned int i, count;
2233 enum lttng_action_status action_status;
2234 const struct lttng_action *action =
2235 lttng_trigger_get_const_action(trigger);
2236 enum lttng_action_type action_type;
2237
2238 assert(action);
2239 action_type = lttng_action_get_type(action);
2240 if (action_type == LTTNG_ACTION_TYPE_NOTIFY) {
2241 is_notify = true;
2242 goto end;
2243 } else if (action_type != LTTNG_ACTION_TYPE_GROUP) {
2244 goto end;
2245 }
2246
2247 action_status = lttng_action_group_get_count(action, &count);
2248 assert(action_status == LTTNG_ACTION_STATUS_OK);
2249
2250 for (i = 0; i < count; i++) {
2251 const struct lttng_action *inner_action =
2252 lttng_action_group_get_at_index(
2253 action, i);
2254
2255 action_type = lttng_action_get_type(inner_action);
2256 if (action_type == LTTNG_ACTION_TYPE_NOTIFY) {
2257 is_notify = true;
2258 goto end;
2259 }
2260 }
2261
2262 end:
2263 return is_notify;
2264 }
2265
2266 static bool trigger_name_taken(struct notification_thread_state *state,
2267 const struct lttng_trigger *trigger)
2268 {
2269 struct cds_lfht_iter iter;
2270
2271 /*
2272 * No duplicata is allowed in the triggers_by_name_uid_ht.
2273 * The match is done against the trigger name and uid.
2274 */
2275 cds_lfht_lookup(state->triggers_by_name_uid_ht,
2276 hash_trigger_by_name_uid(trigger),
2277 match_trigger_by_name_uid,
2278 trigger,
2279 &iter);
2280 return !!cds_lfht_iter_get_node(&iter);
2281 }
2282
2283 static
2284 enum lttng_error_code generate_trigger_name(
2285 struct notification_thread_state *state,
2286 struct lttng_trigger *trigger, const char **name)
2287 {
2288 enum lttng_error_code ret_code = LTTNG_OK;
2289 bool taken = false;
2290 enum lttng_trigger_status status;
2291
2292 do {
2293 const int ret = lttng_trigger_generate_name(trigger,
2294 state->trigger_id.name_offset++);
2295 if (ret) {
2296 /* The only reason this can fail right now. */
2297 ret_code = LTTNG_ERR_NOMEM;
2298 break;
2299 }
2300
2301 status = lttng_trigger_get_name(trigger, name);
2302 assert(status == LTTNG_TRIGGER_STATUS_OK);
2303
2304 taken = trigger_name_taken(state, trigger);
2305 } while (taken || state->trigger_id.name_offset == UINT64_MAX);
2306
2307 return ret_code;
2308 }
2309
2310 /*
2311 * FIXME A client's credentials are not checked when registering a trigger.
2312 *
2313 * The effects of this are benign since:
2314 * - The client will succeed in registering the trigger, as it is valid,
2315 * - The trigger will, internally, be bound to the channel/session,
2316 * - The notifications will not be sent since the client's credentials
2317 * are checked against the channel at that moment.
2318 *
2319 * If this function returns a non-zero value, it means something is
2320 * fundamentally broken and the whole subsystem/thread will be torn down.
2321 *
2322 * If a non-fatal error occurs, just set the cmd_result to the appropriate
2323 * error code.
2324 */
2325 static
2326 int handle_notification_thread_command_register_trigger(
2327 struct notification_thread_state *state,
2328 struct lttng_trigger *trigger,
2329 enum lttng_error_code *cmd_result)
2330 {
2331 int ret = 0;
2332 struct lttng_condition *condition;
2333 struct notification_client *client;
2334 struct notification_client_list *client_list = NULL;
2335 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
2336 struct notification_client_list_element *client_list_element;
2337 struct notification_trigger_tokens_ht_element *trigger_tokens_ht_element = NULL;
2338 struct cds_lfht_node *node;
2339 struct cds_lfht_iter iter;
2340 const char* trigger_name;
2341 bool free_trigger = true;
2342 struct lttng_evaluation *evaluation = NULL;
2343 struct lttng_credentials object_creds;
2344 uid_t object_uid;
2345 gid_t object_gid;
2346 enum action_executor_status executor_status;
2347 const uint64_t trigger_tracer_token =
2348 state->trigger_id.next_tracer_token++;
2349
2350 rcu_read_lock();
2351
2352 /* Set the trigger's tracer token. */
2353 lttng_trigger_set_tracer_token(trigger, trigger_tracer_token);
2354
2355 if (lttng_trigger_get_name(trigger, &trigger_name) ==
2356 LTTNG_TRIGGER_STATUS_UNSET) {
2357 const enum lttng_error_code ret_code = generate_trigger_name(
2358 state, trigger, &trigger_name);
2359
2360 if (ret_code != LTTNG_OK) {
2361 /* Fatal error. */
2362 ret = -1;
2363 *cmd_result = ret_code;
2364 goto error;
2365 }
2366 } else if (trigger_name_taken(state, trigger)) {
2367 /* Not a fatal error. */
2368 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
2369 ret = 0;
2370 goto error;
2371 }
2372
2373 condition = lttng_trigger_get_condition(trigger);
2374 assert(condition);
2375
2376 /* Some conditions require tracers to implement a minimal ABI version. */
2377 if (!condition_is_supported(condition)) {
2378 *cmd_result = LTTNG_ERR_NOT_SUPPORTED;
2379 goto error;
2380 }
2381
2382 trigger_ht_element = zmalloc(sizeof(*trigger_ht_element));
2383 if (!trigger_ht_element) {
2384 ret = -1;
2385 goto error;
2386 }
2387
2388 /* Add trigger to the trigger_ht. */
2389 cds_lfht_node_init(&trigger_ht_element->node);
2390 cds_lfht_node_init(&trigger_ht_element->node_by_name_uid);
2391 trigger_ht_element->trigger = trigger;
2392
2393 node = cds_lfht_add_unique(state->triggers_ht,
2394 lttng_condition_hash(condition),
2395 match_trigger,
2396 trigger,
2397 &trigger_ht_element->node);
2398 if (node != &trigger_ht_element->node) {
2399 /* Not a fatal error, simply report it to the client. */
2400 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
2401 goto error_free_ht_element;
2402 }
2403
2404 node = cds_lfht_add_unique(state->triggers_by_name_uid_ht,
2405 hash_trigger_by_name_uid(trigger),
2406 match_trigger_by_name_uid,
2407 trigger,
2408 &trigger_ht_element->node_by_name_uid);
2409 if (node != &trigger_ht_element->node_by_name_uid) {
2410 /* Not a fatal error, simply report it to the client. */
2411 cds_lfht_del(state->triggers_ht, &trigger_ht_element->node);
2412 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
2413 goto error_free_ht_element;
2414 }
2415
2416 if (lttng_condition_get_type(condition) == LTTNG_CONDITION_TYPE_EVENT_RULE_HIT) {
2417 trigger_tokens_ht_element = zmalloc(sizeof(*trigger_tokens_ht_element));
2418 if (!trigger_tokens_ht_element) {
2419 /* Fatal error. */
2420 ret = -1;
2421 cds_lfht_del(state->triggers_ht,
2422 &trigger_ht_element->node);
2423 cds_lfht_del(state->triggers_by_name_uid_ht,
2424 &trigger_ht_element->node_by_name_uid);
2425 goto error_free_ht_element;
2426 }
2427
2428 /* Add trigger token to the trigger_tokens_ht. */
2429 cds_lfht_node_init(&trigger_tokens_ht_element->node);
2430 trigger_tokens_ht_element->token =
2431 LTTNG_OPTIONAL_GET(trigger->tracer_token);
2432 trigger_tokens_ht_element->trigger = trigger;
2433
2434 node = cds_lfht_add_unique(state->trigger_tokens_ht,
2435 hash_key_u64(&trigger_tokens_ht_element->token,
2436 lttng_ht_seed),
2437 match_trigger_token,
2438 &trigger_tokens_ht_element->token,
2439 &trigger_tokens_ht_element->node);
2440 if (node != &trigger_tokens_ht_element->node) {
2441 /* Internal corruption, fatal error. */
2442 ret = -1;
2443 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
2444 cds_lfht_del(state->triggers_ht,
2445 &trigger_ht_element->node);
2446 cds_lfht_del(state->triggers_by_name_uid_ht,
2447 &trigger_ht_element->node_by_name_uid);
2448 goto error_free_ht_element;
2449 }
2450 }
2451
2452 /*
2453 * Ownership of the trigger and of its wrapper was transfered to
2454 * the triggers_ht. Same for token ht element if necessary.
2455 */
2456 trigger_tokens_ht_element = NULL;
2457 trigger_ht_element = NULL;
2458 free_trigger = false;
2459
2460 /*
2461 * The rest only applies to triggers that have a "notify" action.
2462 * It is not skipped as this is the only action type currently
2463 * supported.
2464 */
2465 if (is_trigger_action_notify(trigger)) {
2466 client_list = notification_client_list_create(trigger);
2467 if (!client_list) {
2468 ret = -1;
2469 goto error_free_ht_element;
2470 }
2471
2472 /* Build a list of clients to which this new trigger applies. */
2473 cds_lfht_for_each_entry (state->client_socket_ht, &iter, client,
2474 client_socket_ht_node) {
2475 if (!trigger_applies_to_client(trigger, client)) {
2476 continue;
2477 }
2478
2479 client_list_element =
2480 zmalloc(sizeof(*client_list_element));
2481 if (!client_list_element) {
2482 ret = -1;
2483 goto error_put_client_list;
2484 }
2485
2486 CDS_INIT_LIST_HEAD(&client_list_element->node);
2487 client_list_element->client = client;
2488 cds_list_add(&client_list_element->node,
2489 &client_list->list);
2490 }
2491
2492 /*
2493 * Client list ownership transferred to the
2494 * notification_trigger_clients_ht.
2495 */
2496 publish_notification_client_list(state, client_list);
2497 }
2498
2499 switch (get_condition_binding_object(condition)) {
2500 case LTTNG_OBJECT_TYPE_SESSION:
2501 /* Add the trigger to the list if it matches a known session. */
2502 ret = bind_trigger_to_matching_session(trigger, state);
2503 if (ret) {
2504 goto error_put_client_list;
2505 }
2506 break;
2507 case LTTNG_OBJECT_TYPE_CHANNEL:
2508 /*
2509 * Add the trigger to list of triggers bound to the channels
2510 * currently known.
2511 */
2512 ret = bind_trigger_to_matching_channels(trigger, state);
2513 if (ret) {
2514 goto error_put_client_list;
2515 }
2516 break;
2517 case LTTNG_OBJECT_TYPE_NONE:
2518 break;
2519 default:
2520 ERR("Unknown object type on which to bind a newly registered trigger was encountered");
2521 ret = -1;
2522 goto error_put_client_list;
2523 }
2524
2525 /*
2526 * The new trigger's condition must be evaluated against the current
2527 * state.
2528 *
2529 * In the case of `notify` action, nothing preventing clients from
2530 * subscribing to a condition before the corresponding trigger is
2531 * registered, we have to evaluate this new condition right away.
2532 *
2533 * At some point, we were waiting for the next "evaluation" (e.g. on
2534 * reception of a channel sample) to evaluate this new condition, but
2535 * that was broken.
2536 *
2537 * The reason it was broken is that waiting for the next sample
2538 * does not allow us to properly handle transitions for edge-triggered
2539 * conditions.
2540 *
2541 * Consider this example: when we handle a new channel sample, we
2542 * evaluate each conditions twice: once with the previous state, and
2543 * again with the newest state. We then use those two results to
2544 * determine whether a state change happened: a condition was false and
2545 * became true. If a state change happened, we have to notify clients.
2546 *
2547 * Now, if a client subscribes to a given notification and registers
2548 * a trigger *after* that subscription, we have to make sure the
2549 * condition is evaluated at this point while considering only the
2550 * current state. Otherwise, the next evaluation cycle may only see
2551 * that the evaluations remain the same (true for samples n-1 and n) and
2552 * the client will never know that the condition has been met.
2553 */
2554 switch (get_condition_binding_object(condition)) {
2555 case LTTNG_OBJECT_TYPE_SESSION:
2556 ret = evaluate_session_condition_for_client(condition, state,
2557 &evaluation, &object_uid,
2558 &object_gid);
2559 break;
2560 case LTTNG_OBJECT_TYPE_CHANNEL:
2561 ret = evaluate_channel_condition_for_client(condition, state,
2562 &evaluation, &object_uid,
2563 &object_gid);
2564 break;
2565 case LTTNG_OBJECT_TYPE_NONE:
2566 ret = 0;
2567 break;
2568 case LTTNG_OBJECT_TYPE_UNKNOWN:
2569 default:
2570 ret = -1;
2571 break;
2572 }
2573
2574 if (ret) {
2575 /* Fatal error. */
2576 goto error_put_client_list;
2577 }
2578
2579 LTTNG_OPTIONAL_SET(&object_creds.uid, object_uid);
2580 LTTNG_OPTIONAL_SET(&object_creds.gid, object_gid);
2581
2582 DBG("Newly registered trigger's condition evaluated to %s",
2583 evaluation ? "true" : "false");
2584 if (!evaluation) {
2585 /* Evaluation yielded nothing. Normal exit. */
2586 ret = 0;
2587 goto end;
2588 }
2589
2590 /*
2591 * Ownership of `evaluation` transferred to the action executor
2592 * no matter the result.
2593 */
2594 executor_status = action_executor_enqueue(state->executor, trigger,
2595 evaluation, &object_creds, client_list);
2596 evaluation = NULL;
2597 switch (executor_status) {
2598 case ACTION_EXECUTOR_STATUS_OK:
2599 break;
2600 case ACTION_EXECUTOR_STATUS_ERROR:
2601 case ACTION_EXECUTOR_STATUS_INVALID:
2602 /*
2603 * TODO Add trigger identification (name/id) when
2604 * it is added to the API.
2605 */
2606 ERR("Fatal error occurred while enqueuing action associated to newly registered trigger");
2607 ret = -1;
2608 goto error_put_client_list;
2609 case ACTION_EXECUTOR_STATUS_OVERFLOW:
2610 /*
2611 * TODO Add trigger identification (name/id) when
2612 * it is added to the API.
2613 *
2614 * Not a fatal error.
2615 */
2616 WARN("No space left when enqueuing action associated to newly registered trigger");
2617 ret = 0;
2618 goto end;
2619 default:
2620 abort();
2621 }
2622
2623 end:
2624 *cmd_result = LTTNG_OK;
2625 DBG("Registered trigger: name = `%s`, tracer token = %" PRIu64,
2626 trigger_name, trigger_tracer_token);
2627
2628 error_put_client_list:
2629 notification_client_list_put(client_list);
2630
2631 error_free_ht_element:
2632 if (trigger_ht_element) {
2633 /* Delayed removal due to RCU constraint on delete. */
2634 call_rcu(&trigger_ht_element->rcu_node,
2635 free_lttng_trigger_ht_element_rcu);
2636 }
2637
2638 free(trigger_tokens_ht_element);
2639 error:
2640 if (free_trigger) {
2641 lttng_trigger_destroy(trigger);
2642 }
2643 rcu_read_unlock();
2644 return ret;
2645 }
2646
2647 static
2648 void free_lttng_trigger_ht_element_rcu(struct rcu_head *node)
2649 {
2650 free(caa_container_of(node, struct lttng_trigger_ht_element,
2651 rcu_node));
2652 }
2653
2654 static
2655 void free_notification_trigger_tokens_ht_element_rcu(struct rcu_head *node)
2656 {
2657 free(caa_container_of(node, struct notification_trigger_tokens_ht_element,
2658 rcu_node));
2659 }
2660
2661 static
2662 int handle_notification_thread_command_unregister_trigger(
2663 struct notification_thread_state *state,
2664 struct lttng_trigger *trigger,
2665 enum lttng_error_code *_cmd_reply)
2666 {
2667 struct cds_lfht_iter iter;
2668 struct cds_lfht_node *triggers_ht_node;
2669 struct lttng_channel_trigger_list *trigger_list;
2670 struct notification_client_list *client_list;
2671 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
2672 struct lttng_condition *condition = lttng_trigger_get_condition(
2673 trigger);
2674 enum lttng_error_code cmd_reply;
2675
2676 rcu_read_lock();
2677
2678 cds_lfht_lookup(state->triggers_ht,
2679 lttng_condition_hash(condition),
2680 match_trigger,
2681 trigger,
2682 &iter);
2683 triggers_ht_node = cds_lfht_iter_get_node(&iter);
2684 if (!triggers_ht_node) {
2685 cmd_reply = LTTNG_ERR_TRIGGER_NOT_FOUND;
2686 goto end;
2687 } else {
2688 cmd_reply = LTTNG_OK;
2689 }
2690
2691 /* Remove trigger from channel_triggers_ht. */
2692 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter, trigger_list,
2693 channel_triggers_ht_node) {
2694 struct lttng_trigger_list_element *trigger_element, *tmp;
2695
2696 cds_list_for_each_entry_safe(trigger_element, tmp,
2697 &trigger_list->list, node) {
2698 if (!lttng_trigger_is_equal(trigger, trigger_element->trigger)) {
2699 continue;
2700 }
2701
2702 DBG("[notification-thread] Removed trigger from channel_triggers_ht");
2703 cds_list_del(&trigger_element->node);
2704 /* A trigger can only appear once per channel */
2705 break;
2706 }
2707 }
2708
2709 if (lttng_condition_get_type(condition) ==
2710 LTTNG_CONDITION_TYPE_EVENT_RULE_HIT) {
2711 struct notification_trigger_tokens_ht_element
2712 *trigger_tokens_ht_element;
2713
2714 cds_lfht_for_each_entry (state->trigger_tokens_ht, &iter,
2715 trigger_tokens_ht_element, node) {
2716 if (!lttng_trigger_is_equal(trigger,
2717 trigger_tokens_ht_element->trigger)) {
2718 continue;
2719 }
2720
2721 DBG("[notification-thread] Removed trigger from tokens_ht");
2722 cds_lfht_del(state->trigger_tokens_ht,
2723 &trigger_tokens_ht_element->node);
2724 call_rcu(&trigger_tokens_ht_element->rcu_node,
2725 free_notification_trigger_tokens_ht_element_rcu);
2726
2727 break;
2728 }
2729 }
2730
2731 if (is_trigger_action_notify(trigger)) {
2732 /*
2733 * Remove and release the client list from
2734 * notification_trigger_clients_ht.
2735 */
2736 client_list = get_client_list_from_condition(state, condition);
2737 assert(client_list);
2738
2739 /* Put new reference and the hashtable's reference. */
2740 notification_client_list_put(client_list);
2741 notification_client_list_put(client_list);
2742 client_list = NULL;
2743 }
2744
2745 /* Remove trigger from triggers_ht. */
2746 trigger_ht_element = caa_container_of(triggers_ht_node,
2747 struct lttng_trigger_ht_element, node);
2748 cds_lfht_del(state->triggers_by_name_uid_ht, &trigger_ht_element->node_by_name_uid);
2749 cds_lfht_del(state->triggers_ht, triggers_ht_node);
2750
2751 /* Release the ownership of the trigger. */
2752 lttng_trigger_destroy(trigger_ht_element->trigger);
2753 call_rcu(&trigger_ht_element->rcu_node, free_lttng_trigger_ht_element_rcu);
2754 end:
2755 rcu_read_unlock();
2756 if (_cmd_reply) {
2757 *_cmd_reply = cmd_reply;
2758 }
2759 return 0;
2760 }
2761
2762 /* Returns 0 on success, 1 on exit requested, negative value on error. */
2763 int handle_notification_thread_command(
2764 struct notification_thread_handle *handle,
2765 struct notification_thread_state *state)
2766 {
2767 int ret;
2768 uint64_t counter;
2769 struct notification_thread_command *cmd;
2770
2771 /* Read the event pipe to put it back into a quiescent state. */
2772 ret = lttng_read(lttng_pipe_get_readfd(handle->cmd_queue.event_pipe), &counter,
2773 sizeof(counter));
2774 if (ret != sizeof(counter)) {
2775 goto error;
2776 }
2777
2778 pthread_mutex_lock(&handle->cmd_queue.lock);
2779 cmd = cds_list_first_entry(&handle->cmd_queue.list,
2780 struct notification_thread_command, cmd_list_node);
2781 cds_list_del(&cmd->cmd_list_node);
2782 pthread_mutex_unlock(&handle->cmd_queue.lock);
2783 switch (cmd->type) {
2784 case NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER:
2785 DBG("[notification-thread] Received register trigger command");
2786 ret = handle_notification_thread_command_register_trigger(
2787 state, cmd->parameters.trigger,
2788 &cmd->reply_code);
2789 break;
2790 case NOTIFICATION_COMMAND_TYPE_UNREGISTER_TRIGGER:
2791 DBG("[notification-thread] Received unregister trigger command");
2792 ret = handle_notification_thread_command_unregister_trigger(
2793 state, cmd->parameters.trigger,
2794 &cmd->reply_code);
2795 break;
2796 case NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL:
2797 DBG("[notification-thread] Received add channel command");
2798 ret = handle_notification_thread_command_add_channel(
2799 state,
2800 cmd->parameters.add_channel.session.name,
2801 cmd->parameters.add_channel.session.uid,
2802 cmd->parameters.add_channel.session.gid,
2803 cmd->parameters.add_channel.channel.name,
2804 cmd->parameters.add_channel.channel.domain,
2805 cmd->parameters.add_channel.channel.key,
2806 cmd->parameters.add_channel.channel.capacity,
2807 &cmd->reply_code);
2808 break;
2809 case NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL:
2810 DBG("[notification-thread] Received remove channel command");
2811 ret = handle_notification_thread_command_remove_channel(
2812 state, cmd->parameters.remove_channel.key,
2813 cmd->parameters.remove_channel.domain,
2814 &cmd->reply_code);
2815 break;
2816 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING:
2817 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED:
2818 DBG("[notification-thread] Received session rotation %s command",
2819 cmd->type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING ?
2820 "ongoing" : "completed");
2821 ret = handle_notification_thread_command_session_rotation(
2822 state,
2823 cmd->type,
2824 cmd->parameters.session_rotation.session_name,
2825 cmd->parameters.session_rotation.uid,
2826 cmd->parameters.session_rotation.gid,
2827 cmd->parameters.session_rotation.trace_archive_chunk_id,
2828 cmd->parameters.session_rotation.location,
2829 &cmd->reply_code);
2830 break;
2831 case NOTIFICATION_COMMAND_TYPE_ADD_TRACER_EVENT_SOURCE:
2832 ret = handle_notification_thread_command_add_tracer_event_source(
2833 state,
2834 cmd->parameters.tracer_event_source.tracer_event_source_fd,
2835 cmd->parameters.tracer_event_source.domain,
2836 &cmd->reply_code);
2837 break;
2838 case NOTIFICATION_COMMAND_TYPE_REMOVE_TRACER_EVENT_SOURCE:
2839 ret = handle_notification_thread_command_remove_tracer_event_source(
2840 state,
2841 cmd->parameters.tracer_event_source.tracer_event_source_fd,
2842 &cmd->reply_code);
2843 break;
2844 case NOTIFICATION_COMMAND_TYPE_LIST_TRIGGERS:
2845 {
2846 struct lttng_triggers *triggers = NULL;
2847
2848 ret = handle_notification_thread_command_list_triggers(
2849 handle,
2850 state,
2851 cmd->parameters.list_triggers.uid,
2852 &triggers,
2853 &cmd->reply_code);
2854 cmd->reply.list_triggers.triggers = triggers;
2855 ret = 0;
2856 break;
2857 }
2858 case NOTIFICATION_COMMAND_TYPE_QUIT:
2859 DBG("[notification-thread] Received quit command");
2860 cmd->reply_code = LTTNG_OK;
2861 ret = 1;
2862 goto end;
2863 case NOTIFICATION_COMMAND_TYPE_CLIENT_COMMUNICATION_UPDATE:
2864 {
2865 const enum client_transmission_status client_status =
2866 cmd->parameters.client_communication_update
2867 .status;
2868 const notification_client_id client_id =
2869 cmd->parameters.client_communication_update.id;
2870 struct notification_client *client;
2871
2872 rcu_read_lock();
2873 client = get_client_from_id(client_id, state);
2874
2875 if (!client) {
2876 /*
2877 * Client error was probably already picked-up by the
2878 * notification thread or it has disconnected
2879 * gracefully while this command was queued.
2880 */
2881 DBG("Failed to find notification client to update communication status, client id = %" PRIu64,
2882 client_id);
2883 ret = 0;
2884 } else {
2885 ret = client_handle_transmission_status(
2886 client, client_status, state);
2887 }
2888 rcu_read_unlock();
2889 break;
2890 }
2891 default:
2892 ERR("[notification-thread] Unknown internal command received");
2893 goto error_unlock;
2894 }
2895
2896 if (ret) {
2897 goto error_unlock;
2898 }
2899 end:
2900 if (cmd->is_async) {
2901 free(cmd);
2902 cmd = NULL;
2903 } else {
2904 lttng_waiter_wake_up(&cmd->reply_waiter);
2905 }
2906 return ret;
2907 error_unlock:
2908 /* Wake-up and return a fatal error to the calling thread. */
2909 lttng_waiter_wake_up(&cmd->reply_waiter);
2910 cmd->reply_code = LTTNG_ERR_FATAL;
2911 error:
2912 /* Indicate a fatal error to the caller. */
2913 return -1;
2914 }
2915
2916 static
2917 int socket_set_non_blocking(int socket)
2918 {
2919 int ret, flags;
2920
2921 /* Set the pipe as non-blocking. */
2922 ret = fcntl(socket, F_GETFL, 0);
2923 if (ret == -1) {
2924 PERROR("fcntl get socket flags");
2925 goto end;
2926 }
2927 flags = ret;
2928
2929 ret = fcntl(socket, F_SETFL, flags | O_NONBLOCK);
2930 if (ret == -1) {
2931 PERROR("fcntl set O_NONBLOCK socket flag");
2932 goto end;
2933 }
2934 DBG("Client socket (fd = %i) set as non-blocking", socket);
2935 end:
2936 return ret;
2937 }
2938
2939 static
2940 int client_reset_inbound_state(struct notification_client *client)
2941 {
2942 int ret;
2943
2944
2945 lttng_payload_clear(&client->communication.inbound.payload);
2946
2947 client->communication.inbound.bytes_to_receive =
2948 sizeof(struct lttng_notification_channel_message);
2949 client->communication.inbound.msg_type =
2950 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN;
2951 LTTNG_SOCK_SET_UID_CRED(&client->communication.inbound.creds, -1);
2952 LTTNG_SOCK_SET_GID_CRED(&client->communication.inbound.creds, -1);
2953 ret = lttng_dynamic_buffer_set_size(
2954 &client->communication.inbound.payload.buffer,
2955 client->communication.inbound.bytes_to_receive);
2956
2957 return ret;
2958 }
2959
2960 int handle_notification_thread_client_connect(
2961 struct notification_thread_state *state)
2962 {
2963 int ret;
2964 struct notification_client *client;
2965
2966 DBG("[notification-thread] Handling new notification channel client connection");
2967
2968 client = zmalloc(sizeof(*client));
2969 if (!client) {
2970 /* Fatal error. */
2971 ret = -1;
2972 goto error;
2973 }
2974
2975 pthread_mutex_init(&client->lock, NULL);
2976 client->id = state->next_notification_client_id++;
2977 CDS_INIT_LIST_HEAD(&client->condition_list);
2978 lttng_payload_init(&client->communication.inbound.payload);
2979 lttng_payload_init(&client->communication.outbound.payload);
2980 client->communication.inbound.expect_creds = true;
2981
2982 ret = client_reset_inbound_state(client);
2983 if (ret) {
2984 ERR("[notification-thread] Failed to reset client communication's inbound state");
2985 ret = 0;
2986 goto error;
2987 }
2988
2989 ret = lttcomm_accept_unix_sock(state->notification_channel_socket);
2990 if (ret < 0) {
2991 ERR("[notification-thread] Failed to accept new notification channel client connection");
2992 ret = 0;
2993 goto error;
2994 }
2995
2996 client->socket = ret;
2997
2998 ret = socket_set_non_blocking(client->socket);
2999 if (ret) {
3000 ERR("[notification-thread] Failed to set new notification channel client connection socket as non-blocking");
3001 goto error;
3002 }
3003
3004 ret = lttcomm_setsockopt_creds_unix_sock(client->socket);
3005 if (ret < 0) {
3006 ERR("[notification-thread] Failed to set socket options on new notification channel client socket");
3007 ret = 0;
3008 goto error;
3009 }
3010
3011 ret = lttng_poll_add(&state->events, client->socket,
3012 LPOLLIN | LPOLLERR |
3013 LPOLLHUP | LPOLLRDHUP);
3014 if (ret < 0) {
3015 ERR("[notification-thread] Failed to add notification channel client socket to poll set");
3016 ret = 0;
3017 goto error;
3018 }
3019 DBG("[notification-thread] Added new notification channel client socket (%i) to poll set",
3020 client->socket);
3021
3022 rcu_read_lock();
3023 cds_lfht_add(state->client_socket_ht,
3024 hash_client_socket(client->socket),
3025 &client->client_socket_ht_node);
3026 cds_lfht_add(state->client_id_ht,
3027 hash_client_id(client->id),
3028 &client->client_id_ht_node);
3029 rcu_read_unlock();
3030
3031 return ret;
3032
3033 error:
3034 notification_client_destroy(client, state);
3035 return ret;
3036 }
3037
3038 /*
3039 * RCU read-lock must be held by the caller.
3040 * Client lock must _not_ be held by the caller.
3041 */
3042 static
3043 int notification_thread_client_disconnect(
3044 struct notification_client *client,
3045 struct notification_thread_state *state)
3046 {
3047 int ret;
3048 struct lttng_condition_list_element *condition_list_element, *tmp;
3049
3050 /* Acquire the client lock to disable its communication atomically. */
3051 pthread_mutex_lock(&client->lock);
3052 client->communication.active = false;
3053 cds_lfht_del(state->client_socket_ht, &client->client_socket_ht_node);
3054 cds_lfht_del(state->client_id_ht, &client->client_id_ht_node);
3055 pthread_mutex_unlock(&client->lock);
3056
3057 ret = lttng_poll_del(&state->events, client->socket);
3058 if (ret) {
3059 ERR("[notification-thread] Failed to remove client socket %d from poll set",
3060 client->socket);
3061 }
3062
3063 /* Release all conditions to which the client was subscribed. */
3064 cds_list_for_each_entry_safe(condition_list_element, tmp,
3065 &client->condition_list, node) {
3066 (void) notification_thread_client_unsubscribe(client,
3067 condition_list_element->condition, state, NULL);
3068 }
3069
3070 /*
3071 * Client no longer accessible to other threads (through the
3072 * client lists).
3073 */
3074 notification_client_destroy(client, state);
3075 return ret;
3076 }
3077
3078 int handle_notification_thread_client_disconnect(
3079 int client_socket, struct notification_thread_state *state)
3080 {
3081 int ret = 0;
3082 struct notification_client *client;
3083
3084 rcu_read_lock();
3085 DBG("[notification-thread] Closing client connection (socket fd = %i)",
3086 client_socket);
3087 client = get_client_from_socket(client_socket, state);
3088 if (!client) {
3089 /* Internal state corruption, fatal error. */
3090 ERR("[notification-thread] Unable to find client (socket fd = %i)",
3091 client_socket);
3092 ret = -1;
3093 goto end;
3094 }
3095
3096 ret = notification_thread_client_disconnect(client, state);
3097 end:
3098 rcu_read_unlock();
3099 return ret;
3100 }
3101
3102 int handle_notification_thread_client_disconnect_all(
3103 struct notification_thread_state *state)
3104 {
3105 struct cds_lfht_iter iter;
3106 struct notification_client *client;
3107 bool error_encoutered = false;
3108
3109 rcu_read_lock();
3110 DBG("[notification-thread] Closing all client connections");
3111 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
3112 client_socket_ht_node) {
3113 int ret;
3114
3115 ret = notification_thread_client_disconnect(
3116 client, state);
3117 if (ret) {
3118 error_encoutered = true;
3119 }
3120 }
3121 rcu_read_unlock();
3122 return error_encoutered ? 1 : 0;
3123 }
3124
3125 int handle_notification_thread_trigger_unregister_all(
3126 struct notification_thread_state *state)
3127 {
3128 bool error_occurred = false;
3129 struct cds_lfht_iter iter;
3130 struct lttng_trigger_ht_element *trigger_ht_element;
3131
3132 rcu_read_lock();
3133 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
3134 node) {
3135 int ret = handle_notification_thread_command_unregister_trigger(
3136 state, trigger_ht_element->trigger, NULL);
3137 if (ret) {
3138 error_occurred = true;
3139 }
3140 }
3141 rcu_read_unlock();
3142 return error_occurred ? -1 : 0;
3143 }
3144
3145 static
3146 int client_handle_transmission_status(
3147 struct notification_client *client,
3148 enum client_transmission_status transmission_status,
3149 struct notification_thread_state *state)
3150 {
3151 int ret = 0;
3152
3153 switch (transmission_status) {
3154 case CLIENT_TRANSMISSION_STATUS_COMPLETE:
3155 ret = lttng_poll_mod(&state->events, client->socket,
3156 CLIENT_POLL_MASK_IN);
3157 if (ret) {
3158 goto end;
3159 }
3160
3161 break;
3162 case CLIENT_TRANSMISSION_STATUS_QUEUED:
3163 /*
3164 * We want to be notified whenever there is buffer space
3165 * available to send the rest of the payload.
3166 */
3167 ret = lttng_poll_mod(&state->events, client->socket,
3168 CLIENT_POLL_MASK_IN_OUT);
3169 if (ret) {
3170 goto end;
3171 }
3172 break;
3173 case CLIENT_TRANSMISSION_STATUS_FAIL:
3174 ret = notification_thread_client_disconnect(client, state);
3175 if (ret) {
3176 goto end;
3177 }
3178 break;
3179 case CLIENT_TRANSMISSION_STATUS_ERROR:
3180 ret = -1;
3181 goto end;
3182 default:
3183 abort();
3184 }
3185 end:
3186 return ret;
3187 }
3188
3189 /* Client lock must be acquired by caller. */
3190 static
3191 enum client_transmission_status client_flush_outgoing_queue(
3192 struct notification_client *client)
3193 {
3194 ssize_t ret;
3195 size_t to_send_count;
3196 enum client_transmission_status status;
3197 struct lttng_payload_view pv = lttng_payload_view_from_payload(
3198 &client->communication.outbound.payload, 0, -1);
3199 const int fds_to_send_count =
3200 lttng_payload_view_get_fd_handle_count(&pv);
3201
3202 ASSERT_LOCKED(client->lock);
3203
3204 if (!client->communication.active) {
3205 status = CLIENT_TRANSMISSION_STATUS_FAIL;
3206 goto end;
3207 }
3208
3209 if (pv.buffer.size == 0) {
3210 /*
3211 * If both data and fds are equal to zero, we are in an invalid
3212 * state.
3213 */
3214 assert(fds_to_send_count != 0);
3215 goto send_fds;
3216 }
3217
3218 /* Send data. */
3219 to_send_count = pv.buffer.size;
3220 DBG("[notification-thread] Flushing client (socket fd = %i) outgoing queue",
3221 client->socket);
3222
3223 ret = lttcomm_send_unix_sock_non_block(client->socket,
3224 pv.buffer.data,
3225 to_send_count);
3226 if ((ret >= 0 && ret < to_send_count)) {
3227 DBG("[notification-thread] Client (socket fd = %i) outgoing queue could not be completely flushed",
3228 client->socket);
3229 to_send_count -= max(ret, 0);
3230
3231 memmove(client->communication.outbound.payload.buffer.data,
3232 pv.buffer.data +
3233 pv.buffer.size - to_send_count,
3234 to_send_count);
3235 ret = lttng_dynamic_buffer_set_size(
3236 &client->communication.outbound.payload.buffer,
3237 to_send_count);
3238 if (ret) {
3239 goto error;
3240 }
3241
3242 status = CLIENT_TRANSMISSION_STATUS_QUEUED;
3243 goto end;
3244 } else if (ret < 0) {
3245 /* Generic error, disable the client's communication. */
3246 ERR("[notification-thread] Failed to flush outgoing queue, disconnecting client (socket fd = %i)",
3247 client->socket);
3248 client->communication.active = false;
3249 status = CLIENT_TRANSMISSION_STATUS_FAIL;
3250 goto end;
3251 } else {
3252 /*
3253 * No error and flushed the queue completely.
3254 *
3255 * The payload buffer size is used later to
3256 * check if there is notifications queued. So albeit that the
3257 * direct caller knows that the transmission is complete, we
3258 * need to set the buffer size to zero.
3259 */
3260 ret = lttng_dynamic_buffer_set_size(
3261 &client->communication.outbound.payload.buffer, 0);
3262 if (ret) {
3263 goto error;
3264 }
3265 }
3266
3267 send_fds:
3268 /* No fds to send, transmission is complete. */
3269 if (fds_to_send_count == 0) {
3270 status = CLIENT_TRANSMISSION_STATUS_COMPLETE;
3271 goto end;
3272 }
3273
3274 ret = lttcomm_send_payload_view_fds_unix_sock_non_block(
3275 client->socket, &pv);
3276 if (ret < 0) {
3277 /* Generic error, disable the client's communication. */
3278 ERR("[notification-thread] Failed to flush outgoing fds queue, disconnecting client (socket fd = %i)",
3279 client->socket);
3280 client->communication.active = false;
3281 status = CLIENT_TRANSMISSION_STATUS_FAIL;
3282 goto end;
3283 } else if (ret == 0) {
3284 /* Nothing could be sent. */
3285 status = CLIENT_TRANSMISSION_STATUS_QUEUED;
3286 } else {
3287 /* Fd passing is an all or nothing kind of thing. */
3288 status = CLIENT_TRANSMISSION_STATUS_COMPLETE;
3289 /*
3290 * The payload _fd_array count is used later to
3291 * check if there is notifications queued. So although the
3292 * direct caller knows that the transmission is complete, we
3293 * need to clear the _fd_array for the queuing check.
3294 */
3295 lttng_dynamic_pointer_array_clear(
3296 &client->communication.outbound.payload
3297 ._fd_handles);
3298 }
3299
3300 end:
3301 if (status == CLIENT_TRANSMISSION_STATUS_COMPLETE) {
3302 client->communication.outbound.queued_command_reply = false;
3303 client->communication.outbound.dropped_notification = false;
3304 lttng_payload_clear(&client->communication.outbound.payload);
3305 }
3306
3307 return status;
3308 error:
3309 return CLIENT_TRANSMISSION_STATUS_ERROR;
3310 }
3311
3312 static
3313 bool client_has_outbound_data_left(
3314 const struct notification_client *client)
3315 {
3316 const struct lttng_payload_view pv = lttng_payload_view_from_payload(
3317 &client->communication.outbound.payload, 0, -1);
3318 const bool has_data = pv.buffer.size != 0;
3319 const bool has_fds = lttng_payload_view_get_fd_handle_count(&pv);
3320
3321 return has_data || has_fds;
3322 }
3323
3324 /* Client lock must _not_ be held by the caller. */
3325 static
3326 int client_send_command_reply(struct notification_client *client,
3327 struct notification_thread_state *state,
3328 enum lttng_notification_channel_status status)
3329 {
3330 int ret;
3331 struct lttng_notification_channel_command_reply reply = {
3332 .status = (int8_t) status,
3333 };
3334 struct lttng_notification_channel_message msg = {
3335 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY,
3336 .size = sizeof(reply),
3337 };
3338 char buffer[sizeof(msg) + sizeof(reply)];
3339 enum client_transmission_status transmission_status;
3340
3341 memcpy(buffer, &msg, sizeof(msg));
3342 memcpy(buffer + sizeof(msg), &reply, sizeof(reply));
3343 DBG("[notification-thread] Send command reply (%i)", (int) status);
3344
3345 pthread_mutex_lock(&client->lock);
3346 if (client->communication.outbound.queued_command_reply) {
3347 /* Protocol error. */
3348 goto error_unlock;
3349 }
3350
3351 /* Enqueue buffer to outgoing queue and flush it. */
3352 ret = lttng_dynamic_buffer_append(
3353 &client->communication.outbound.payload.buffer,
3354 buffer, sizeof(buffer));
3355 if (ret) {
3356 goto error_unlock;
3357 }
3358
3359 transmission_status = client_flush_outgoing_queue(client);
3360
3361 if (client_has_outbound_data_left(client)) {
3362 /* Queue could not be emptied. */
3363 client->communication.outbound.queued_command_reply = true;
3364 }
3365
3366 pthread_mutex_unlock(&client->lock);
3367 ret = client_handle_transmission_status(
3368 client, transmission_status, state);
3369 if (ret) {
3370 goto error;
3371 }
3372
3373 return 0;
3374 error_unlock:
3375 pthread_mutex_unlock(&client->lock);
3376 error:
3377 return -1;
3378 }
3379
3380 static
3381 int client_handle_message_unknown(struct notification_client *client,
3382 struct notification_thread_state *state)
3383 {
3384 int ret;
3385 /*
3386 * Receiving message header. The function will be called again
3387 * once the rest of the message as been received and can be
3388 * interpreted.
3389 */
3390 const struct lttng_notification_channel_message *msg;
3391
3392 assert(sizeof(*msg) == client->communication.inbound.payload.buffer.size);
3393 msg = (const struct lttng_notification_channel_message *)
3394 client->communication.inbound.payload.buffer.data;
3395
3396 if (msg->size == 0 ||
3397 msg->size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) {
3398 ERR("[notification-thread] Invalid notification channel message: length = %u",
3399 msg->size);
3400 ret = -1;
3401 goto end;
3402 }
3403
3404 switch (msg->type) {
3405 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
3406 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
3407 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
3408 break;
3409 default:
3410 ret = -1;
3411 ERR("[notification-thread] Invalid notification channel message: unexpected message type");
3412 goto end;
3413 }
3414
3415 client->communication.inbound.bytes_to_receive = msg->size;
3416 client->communication.inbound.fds_to_receive = msg->fds;
3417 client->communication.inbound.msg_type =
3418 (enum lttng_notification_channel_message_type) msg->type;
3419 ret = lttng_dynamic_buffer_set_size(
3420 &client->communication.inbound.payload.buffer, msg->size);
3421
3422 /* msg is not valid anymore due to lttng_dynamic_buffer_set_size. */
3423 msg = NULL;
3424 end:
3425 return ret;
3426 }
3427
3428 static
3429 int client_handle_message_handshake(struct notification_client *client,
3430 struct notification_thread_state *state)
3431 {
3432 int ret;
3433 struct lttng_notification_channel_command_handshake *handshake_client;
3434 const struct lttng_notification_channel_command_handshake handshake_reply = {
3435 .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
3436 .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
3437 };
3438 const struct lttng_notification_channel_message msg_header = {
3439 .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE,
3440 .size = sizeof(handshake_reply),
3441 };
3442 enum lttng_notification_channel_status status =
3443 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
3444 char send_buffer[sizeof(msg_header) + sizeof(handshake_reply)];
3445
3446 memcpy(send_buffer, &msg_header, sizeof(msg_header));
3447 memcpy(send_buffer + sizeof(msg_header), &handshake_reply,
3448 sizeof(handshake_reply));
3449
3450 handshake_client =
3451 (struct lttng_notification_channel_command_handshake *)
3452 client->communication.inbound.payload.buffer
3453 .data;
3454 client->major = handshake_client->major;
3455 client->minor = handshake_client->minor;
3456 if (!client->communication.inbound.creds_received) {
3457 ERR("[notification-thread] No credentials received from client");
3458 ret = -1;
3459 goto end;
3460 }
3461
3462 client->uid = LTTNG_SOCK_GET_UID_CRED(
3463 &client->communication.inbound.creds);
3464 client->gid = LTTNG_SOCK_GET_GID_CRED(
3465 &client->communication.inbound.creds);
3466 DBG("[notification-thread] Received handshake from client (uid = %u, gid = %u) with version %i.%i",
3467 client->uid, client->gid, (int) client->major,
3468 (int) client->minor);
3469
3470 if (handshake_client->major !=
3471 LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) {
3472 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNSUPPORTED_VERSION;
3473 }
3474
3475 pthread_mutex_lock(&client->lock);
3476 /* Outgoing queue will be flushed when the command reply is sent. */
3477 ret = lttng_dynamic_buffer_append(
3478 &client->communication.outbound.payload.buffer, send_buffer,
3479 sizeof(send_buffer));
3480 if (ret) {
3481 ERR("[notification-thread] Failed to send protocol version to notification channel client");
3482 goto end_unlock;
3483 }
3484
3485 client->validated = true;
3486 client->communication.active = true;
3487 pthread_mutex_unlock(&client->lock);
3488
3489 /* Set reception state to receive the next message header. */
3490 ret = client_reset_inbound_state(client);
3491 if (ret) {
3492 ERR("[notification-thread] Failed to reset client communication's inbound state");
3493 goto end;
3494 }
3495
3496 /* Flushes the outgoing queue. */
3497 ret = client_send_command_reply(client, state, status);
3498 if (ret) {
3499 ERR("[notification-thread] Failed to send reply to notification channel client");
3500 goto end;
3501 }
3502
3503 goto end;
3504 end_unlock:
3505 pthread_mutex_unlock(&client->lock);
3506 end:
3507 return ret;
3508 }
3509
3510 static
3511 int client_handle_message_subscription(
3512 struct notification_client *client,
3513 enum lttng_notification_channel_message_type msg_type,
3514 struct notification_thread_state *state)
3515 {
3516 int ret;
3517 struct lttng_condition *condition;
3518 enum lttng_notification_channel_status status =
3519 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
3520 struct lttng_payload_view condition_view =
3521 lttng_payload_view_from_payload(
3522 &client->communication.inbound.payload,
3523 0, -1);
3524 size_t expected_condition_size;
3525
3526 /*
3527 * No need to lock client to sample the inbound state as the only
3528 * other thread accessing clients (action executor) only uses the
3529 * outbound state.
3530 */
3531 expected_condition_size = client->communication.inbound.payload.buffer.size;
3532 ret = lttng_condition_create_from_payload(&condition_view, &condition);
3533 if (ret != expected_condition_size) {
3534 ERR("[notification-thread] Malformed condition received from client");
3535 goto end;
3536 }
3537
3538 if (msg_type == LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE) {
3539 ret = notification_thread_client_subscribe(
3540 client, condition, state, &status);
3541 } else {
3542 ret = notification_thread_client_unsubscribe(
3543 client, condition, state, &status);
3544 }
3545
3546 if (ret) {
3547 goto end;
3548 }
3549
3550 /* Set reception state to receive the next message header. */
3551 ret = client_reset_inbound_state(client);
3552 if (ret) {
3553 ERR("[notification-thread] Failed to reset client communication's inbound state");
3554 goto end;
3555 }
3556
3557 ret = client_send_command_reply(client, state, status);
3558 if (ret) {
3559 ERR("[notification-thread] Failed to send reply to notification channel client");
3560 goto end;
3561 }
3562
3563 end:
3564 return ret;
3565 }
3566
3567 static
3568 int client_dispatch_message(struct notification_client *client,
3569 struct notification_thread_state *state)
3570 {
3571 int ret = 0;
3572
3573 if (client->communication.inbound.msg_type !=
3574 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE &&
3575 client->communication.inbound.msg_type !=
3576 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN &&
3577 !client->validated) {
3578 WARN("[notification-thread] client attempted a command before handshake");
3579 ret = -1;
3580 goto end;
3581 }
3582
3583 switch (client->communication.inbound.msg_type) {
3584 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN:
3585 {
3586 ret = client_handle_message_unknown(client, state);
3587 break;
3588 }
3589 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
3590 {
3591 ret = client_handle_message_handshake(client, state);
3592 break;
3593 }
3594 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
3595 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
3596 {
3597 ret = client_handle_message_subscription(client,
3598 client->communication.inbound.msg_type, state);
3599 break;
3600 }
3601 default:
3602 abort();
3603 }
3604 end:
3605 return ret;
3606 }
3607
3608 /* Incoming data from client. */
3609 int handle_notification_thread_client_in(
3610 struct notification_thread_state *state, int socket)
3611 {
3612 int ret = 0;
3613 struct notification_client *client;
3614 ssize_t recv_ret;
3615 size_t offset;
3616
3617 rcu_read_lock();
3618 client = get_client_from_socket(socket, state);
3619 if (!client) {
3620 /* Internal error, abort. */
3621 ret = -1;
3622 goto end;
3623 }
3624
3625 offset = client->communication.inbound.payload.buffer.size -
3626 client->communication.inbound.bytes_to_receive;
3627 if (client->communication.inbound.expect_creds) {
3628 recv_ret = lttcomm_recv_creds_unix_sock(socket,
3629 client->communication.inbound.payload.buffer.data + offset,
3630 client->communication.inbound.bytes_to_receive,
3631 &client->communication.inbound.creds);
3632 if (recv_ret > 0) {
3633 client->communication.inbound.expect_creds = false;
3634 client->communication.inbound.creds_received = true;
3635 }
3636 } else {
3637 recv_ret = lttcomm_recv_unix_sock_non_block(socket,
3638 client->communication.inbound.payload.buffer.data + offset,
3639 client->communication.inbound.bytes_to_receive);
3640 }
3641 if (recv_ret >= 0) {
3642 client->communication.inbound.bytes_to_receive -= recv_ret;
3643 } else {
3644 goto error_disconnect_client;
3645 }
3646
3647 if (client->communication.inbound.bytes_to_receive != 0) {
3648 /* Message incomplete wait for more data. */
3649 ret = 0;
3650 goto end;
3651 }
3652
3653 assert(client->communication.inbound.bytes_to_receive == 0);
3654
3655 /* Receive fds. */
3656 if (client->communication.inbound.fds_to_receive != 0) {
3657 ret = lttcomm_recv_payload_fds_unix_sock_non_block(
3658 client->socket,
3659 client->communication.inbound.fds_to_receive,
3660 &client->communication.inbound.payload);
3661 if (ret > 0) {
3662 /*
3663 * Fds received. non blocking fds passing is all
3664 * or nothing.
3665 */
3666 ssize_t expected_size;
3667
3668 expected_size = sizeof(int) *
3669 client->communication.inbound
3670 .fds_to_receive;
3671 assert(ret == expected_size);
3672 client->communication.inbound.fds_to_receive = 0;
3673 } else if (ret == 0) {
3674 /* Received nothing. */
3675 ret = 0;
3676 goto end;
3677 } else {
3678 goto error_disconnect_client;
3679 }
3680 }
3681
3682 /* At this point the message is complete.*/
3683 assert(client->communication.inbound.bytes_to_receive == 0 &&
3684 client->communication.inbound.fds_to_receive == 0);
3685 ret = client_dispatch_message(client, state);
3686 if (ret) {
3687 /*
3688 * Only returns an error if this client must be
3689 * disconnected.
3690 */
3691 goto error_disconnect_client;
3692 }
3693
3694 end:
3695 rcu_read_unlock();
3696 return ret;
3697
3698 error_disconnect_client:
3699 ret = notification_thread_client_disconnect(client, state);
3700 goto end;
3701 }
3702
3703 /* Client ready to receive outgoing data. */
3704 int handle_notification_thread_client_out(
3705 struct notification_thread_state *state, int socket)
3706 {
3707 int ret;
3708 struct notification_client *client;
3709 enum client_transmission_status transmission_status;
3710
3711 rcu_read_lock();
3712 client = get_client_from_socket(socket, state);
3713 if (!client) {
3714 /* Internal error, abort. */
3715 ret = -1;
3716 goto end;
3717 }
3718
3719 pthread_mutex_lock(&client->lock);
3720 transmission_status = client_flush_outgoing_queue(client);
3721 pthread_mutex_unlock(&client->lock);
3722
3723 ret = client_handle_transmission_status(
3724 client, transmission_status, state);
3725 if (ret) {
3726 goto end;
3727 }
3728 end:
3729 rcu_read_unlock();
3730 return ret;
3731 }
3732
3733 static
3734 bool evaluate_buffer_usage_condition(const struct lttng_condition *condition,
3735 const struct channel_state_sample *sample,
3736 uint64_t buffer_capacity)
3737 {
3738 bool result = false;
3739 uint64_t threshold;
3740 enum lttng_condition_type condition_type;
3741 const struct lttng_condition_buffer_usage *use_condition = container_of(
3742 condition, struct lttng_condition_buffer_usage,
3743 parent);
3744
3745 if (use_condition->threshold_bytes.set) {
3746 threshold = use_condition->threshold_bytes.value;
3747 } else {
3748 /*
3749 * Threshold was expressed as a ratio.
3750 *
3751 * TODO the threshold (in bytes) of conditions expressed
3752 * as a ratio of total buffer size could be cached to
3753 * forego this double-multiplication or it could be performed
3754 * as fixed-point math.
3755 *
3756 * Note that caching should accommodates the case where the
3757 * condition applies to multiple channels (i.e. don't assume
3758 * that all channels matching my_chann* have the same size...)
3759 */
3760 threshold = (uint64_t) (use_condition->threshold_ratio.value *
3761 (double) buffer_capacity);
3762 }
3763
3764 condition_type = lttng_condition_get_type(condition);
3765 if (condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW) {
3766 DBG("[notification-thread] Low buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
3767 threshold, sample->highest_usage);
3768
3769 /*
3770 * The low condition should only be triggered once _all_ of the
3771 * streams in a channel have gone below the "low" threshold.
3772 */
3773 if (sample->highest_usage <= threshold) {
3774 result = true;
3775 }
3776 } else {
3777 DBG("[notification-thread] High buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
3778 threshold, sample->highest_usage);
3779
3780 /*
3781 * For high buffer usage scenarios, we want to trigger whenever
3782 * _any_ of the streams has reached the "high" threshold.
3783 */
3784 if (sample->highest_usage >= threshold) {
3785 result = true;
3786 }
3787 }
3788
3789 return result;
3790 }
3791
3792 static
3793 bool evaluate_session_consumed_size_condition(
3794 const struct lttng_condition *condition,
3795 uint64_t session_consumed_size)
3796 {
3797 uint64_t threshold;
3798 const struct lttng_condition_session_consumed_size *size_condition =
3799 container_of(condition,
3800 struct lttng_condition_session_consumed_size,
3801 parent);
3802
3803 threshold = size_condition->consumed_threshold_bytes.value;
3804 DBG("[notification-thread] Session consumed size condition being evaluated: threshold = %" PRIu64 ", current size = %" PRIu64,
3805 threshold, session_consumed_size);
3806 return session_consumed_size >= threshold;
3807 }
3808
3809 static
3810 int evaluate_buffer_condition(const struct lttng_condition *condition,
3811 struct lttng_evaluation **evaluation,
3812 const struct notification_thread_state *state,
3813 const struct channel_state_sample *previous_sample,
3814 const struct channel_state_sample *latest_sample,
3815 uint64_t previous_session_consumed_total,
3816 uint64_t latest_session_consumed_total,
3817 struct channel_info *channel_info)
3818 {
3819 int ret = 0;
3820 enum lttng_condition_type condition_type;
3821 const bool previous_sample_available = !!previous_sample;
3822 bool previous_sample_result = false;
3823 bool latest_sample_result;
3824
3825 condition_type = lttng_condition_get_type(condition);
3826
3827 switch (condition_type) {
3828 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
3829 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
3830 if (caa_likely(previous_sample_available)) {
3831 previous_sample_result =
3832 evaluate_buffer_usage_condition(condition,
3833 previous_sample, channel_info->capacity);
3834 }
3835 latest_sample_result = evaluate_buffer_usage_condition(
3836 condition, latest_sample,
3837 channel_info->capacity);
3838 break;
3839 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
3840 if (caa_likely(previous_sample_available)) {
3841 previous_sample_result =
3842 evaluate_session_consumed_size_condition(
3843 condition,
3844 previous_session_consumed_total);
3845 }
3846 latest_sample_result =
3847 evaluate_session_consumed_size_condition(
3848 condition,
3849 latest_session_consumed_total);
3850 break;
3851 default:
3852 /* Unknown condition type; internal error. */
3853 abort();
3854 }
3855
3856 if (!latest_sample_result ||
3857 (previous_sample_result == latest_sample_result)) {
3858 /*
3859 * Only trigger on a condition evaluation transition.
3860 *
3861 * NOTE: This edge-triggered logic may not be appropriate for
3862 * future condition types.
3863 */
3864 goto end;
3865 }
3866
3867 if (!evaluation || !latest_sample_result) {
3868 goto end;
3869 }
3870
3871 switch (condition_type) {
3872 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
3873 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
3874 *evaluation = lttng_evaluation_buffer_usage_create(
3875 condition_type,
3876 latest_sample->highest_usage,
3877 channel_info->capacity);
3878 break;
3879 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
3880 *evaluation = lttng_evaluation_session_consumed_size_create(
3881 latest_session_consumed_total);
3882 break;
3883 default:
3884 abort();
3885 }
3886
3887 if (!*evaluation) {
3888 ret = -1;
3889 goto end;
3890 }
3891 end:
3892 return ret;
3893 }
3894
3895 static
3896 int client_notification_overflow(struct notification_client *client)
3897 {
3898 int ret = 0;
3899 const struct lttng_notification_channel_message msg = {
3900 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED,
3901 };
3902
3903 ASSERT_LOCKED(client->lock);
3904
3905 DBG("Dropping notification addressed to client (socket fd = %i)",
3906 client->socket);
3907 if (client->communication.outbound.dropped_notification) {
3908 /*
3909 * The client already has a "notification dropped" message
3910 * in its outgoing queue. Nothing to do since all
3911 * of those messages are coalesced.
3912 */
3913 goto end;
3914 }
3915
3916 client->communication.outbound.dropped_notification = true;
3917 ret = lttng_dynamic_buffer_append(
3918 &client->communication.outbound.payload.buffer, &msg,
3919 sizeof(msg));
3920 if (ret) {
3921 PERROR("Failed to enqueue \"dropped notification\" message in client's (socket fd = %i) outgoing queue",
3922 client->socket);
3923 }
3924 end:
3925 return ret;
3926 }
3927
3928 static int client_handle_transmission_status_wrapper(
3929 struct notification_client *client,
3930 enum client_transmission_status status,
3931 void *user_data)
3932 {
3933 return client_handle_transmission_status(client, status,
3934 (struct notification_thread_state *) user_data);
3935 }
3936
3937 static
3938 int send_evaluation_to_clients(const struct lttng_trigger *trigger,
3939 const struct lttng_evaluation *evaluation,
3940 struct notification_client_list* client_list,
3941 struct notification_thread_state *state,
3942 uid_t object_uid, gid_t object_gid)
3943 {
3944 const struct lttng_credentials creds = {
3945 .uid = LTTNG_OPTIONAL_INIT_VALUE(object_uid),
3946 .gid = LTTNG_OPTIONAL_INIT_VALUE(object_gid),
3947 };
3948
3949 return notification_client_list_send_evaluation(client_list,
3950 lttng_trigger_get_const_condition(trigger), evaluation,
3951 lttng_trigger_get_credentials(trigger),
3952 &creds,
3953 client_handle_transmission_status_wrapper, state);
3954 }
3955
3956 /*
3957 * Permission checks relative to notification channel clients are performed
3958 * here. Notice how object, client, and trigger credentials are involved in
3959 * this check.
3960 *
3961 * The `object` credentials are the credentials associated with the "subject"
3962 * of a condition. For instance, a `rotation completed` condition applies
3963 * to a session. When that condition is met, it will produce an evaluation
3964 * against a session. Hence, in this case, the `object` credentials are the
3965 * credentials of the "subject" session.
3966 *
3967 * The `trigger` credentials are the credentials of the user that registered the
3968 * trigger.
3969 *
3970 * The `client` credentials are the credentials of the user that created a given
3971 * notification channel.
3972 *
3973 * In terms of visibility, it is expected that non-privilieged users can only
3974 * register triggers against "their" objects (their own sessions and
3975 * applications they are allowed to interact with). They can then open a
3976 * notification channel and subscribe to notifications associated with those
3977 * triggers.
3978 *
3979 * As for privilieged users, they can register triggers against the objects of
3980 * other users. They can then subscribe to the notifications associated to their
3981 * triggers. Privilieged users _can't_ subscribe to the notifications of
3982 * triggers owned by other users; they must create their own triggers.
3983 *
3984 * This is more a concern of usability than security. It would be difficult for
3985 * a root user reliably subscribe to a specific set of conditions without
3986 * interference from external users (those could, for instance, unregister
3987 * their triggers).
3988 */
3989 LTTNG_HIDDEN
3990 int notification_client_list_send_evaluation(
3991 struct notification_client_list *client_list,
3992 const struct lttng_condition *condition,
3993 const struct lttng_evaluation *evaluation,
3994 const struct lttng_credentials *trigger_creds,
3995 const struct lttng_credentials *source_object_creds,
3996 report_client_transmission_result_cb client_report,
3997 void *user_data)
3998 {
3999 int ret = 0;
4000 struct lttng_payload msg_payload;
4001 struct notification_client_list_element *client_list_element, *tmp;
4002 const struct lttng_notification notification = {
4003 .condition = (struct lttng_condition *) condition,
4004 .evaluation = (struct lttng_evaluation *) evaluation,
4005 };
4006 struct lttng_notification_channel_message msg_header = {
4007 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION,
4008 };
4009
4010 lttng_payload_init(&msg_payload);
4011
4012 ret = lttng_dynamic_buffer_append(&msg_payload.buffer, &msg_header,
4013 sizeof(msg_header));
4014 if (ret) {
4015 goto end;
4016 }
4017
4018 ret = lttng_notification_serialize(&notification, &msg_payload);
4019 if (ret) {
4020 ERR("[notification-thread] Failed to serialize notification");
4021 ret = -1;
4022 goto end;
4023 }
4024
4025 /* Update payload size. */
4026 ((struct lttng_notification_channel_message *) msg_payload.buffer.data)
4027 ->size = (uint32_t)(
4028 msg_payload.buffer.size - sizeof(msg_header));
4029
4030 /* Update the payload number of fds. */
4031 {
4032 const struct lttng_payload_view pv = lttng_payload_view_from_payload(
4033 &msg_payload, 0, -1);
4034
4035 ((struct lttng_notification_channel_message *)
4036 msg_payload.buffer.data)->fds = (uint32_t)
4037 lttng_payload_view_get_fd_handle_count(&pv);
4038 }
4039
4040 pthread_mutex_lock(&client_list->lock);
4041 cds_list_for_each_entry_safe(client_list_element, tmp,
4042 &client_list->list, node) {
4043 enum client_transmission_status transmission_status;
4044 struct notification_client *client =
4045 client_list_element->client;
4046
4047 ret = 0;
4048 pthread_mutex_lock(&client->lock);
4049 if (!client->communication.active) {
4050 /*
4051 * Skip inactive client (protocol error or
4052 * disconnecting).
4053 */
4054 DBG("Skipping client at it is marked as inactive");
4055 goto skip_client;
4056 }
4057
4058 if (source_object_creds) {
4059 if (client->uid != lttng_credentials_get_uid(source_object_creds) &&
4060 client->gid != lttng_credentials_get_gid(source_object_creds) &&
4061 client->uid != 0) {
4062 /*
4063 * Client is not allowed to monitor this
4064 * object.
4065 */
4066 DBG("[notification-thread] Skipping client at it does not have the object permission to receive notification for this trigger");
4067 goto skip_client;
4068 }
4069 }
4070
4071 if (client->uid != lttng_credentials_get_uid(trigger_creds) && client->gid != lttng_credentials_get_gid(trigger_creds)) {
4072 DBG("[notification-thread] Skipping client at it does not have the permission to receive notification for this trigger");
4073 goto skip_client;
4074 }
4075
4076 DBG("[notification-thread] Sending notification to client (fd = %i, %zu bytes)",
4077 client->socket, msg_payload.buffer.size);
4078
4079 if (client_has_outbound_data_left(client)) {
4080 /*
4081 * Outgoing data is already buffered for this client;
4082 * drop the notification and enqueue a "dropped
4083 * notification" message if this is the first dropped
4084 * notification since the socket spilled-over to the
4085 * queue.
4086 */
4087 ret = client_notification_overflow(client);
4088 if (ret) {
4089 /* Fatal error. */
4090 goto skip_client;
4091 }
4092 }
4093
4094 ret = lttng_payload_copy(&msg_payload, &client->communication.outbound.payload);
4095 if (ret) {
4096 /* Fatal error. */
4097 goto skip_client;
4098 }
4099
4100 transmission_status = client_flush_outgoing_queue(client);
4101 pthread_mutex_unlock(&client->lock);
4102 ret = client_report(client, transmission_status, user_data);
4103 if (ret) {
4104 /* Fatal error. */
4105 goto end_unlock_list;
4106 }
4107
4108 continue;
4109
4110 skip_client:
4111 pthread_mutex_unlock(&client->lock);
4112 if (ret) {
4113 /* Fatal error. */
4114 goto end_unlock_list;
4115 }
4116 }
4117 ret = 0;
4118
4119 end_unlock_list:
4120 pthread_mutex_unlock(&client_list->lock);
4121 end:
4122 lttng_payload_reset(&msg_payload);
4123 return ret;
4124 }
4125
4126 int handle_notification_thread_channel_sample(
4127 struct notification_thread_state *state, int pipe,
4128 enum lttng_domain_type domain)
4129 {
4130 int ret = 0;
4131 struct lttcomm_consumer_channel_monitor_msg sample_msg;
4132 struct channel_info *channel_info;
4133 struct cds_lfht_node *node;
4134 struct cds_lfht_iter iter;
4135 struct lttng_channel_trigger_list *trigger_list;
4136 struct lttng_trigger_list_element *trigger_list_element;
4137 bool previous_sample_available = false;
4138 struct channel_state_sample previous_sample, latest_sample;
4139 uint64_t previous_session_consumed_total, latest_session_consumed_total;
4140 struct lttng_credentials channel_creds;
4141
4142 /*
4143 * The monitoring pipe only holds messages smaller than PIPE_BUF,
4144 * ensuring that read/write of sampling messages are atomic.
4145 */
4146 ret = lttng_read(pipe, &sample_msg, sizeof(sample_msg));
4147 if (ret != sizeof(sample_msg)) {
4148 ERR("[notification-thread] Failed to read from monitoring pipe (fd = %i)",
4149 pipe);
4150 ret = -1;
4151 goto end;
4152 }
4153
4154 ret = 0;
4155 latest_sample.key.key = sample_msg.key;
4156 latest_sample.key.domain = domain;
4157 latest_sample.highest_usage = sample_msg.highest;
4158 latest_sample.lowest_usage = sample_msg.lowest;
4159 latest_sample.channel_total_consumed = sample_msg.total_consumed;
4160
4161 rcu_read_lock();
4162
4163 /* Retrieve the channel's informations */
4164 cds_lfht_lookup(state->channels_ht,
4165 hash_channel_key(&latest_sample.key),
4166 match_channel_info,
4167 &latest_sample.key,
4168 &iter);
4169 node = cds_lfht_iter_get_node(&iter);
4170 if (caa_unlikely(!node)) {
4171 /*
4172 * Not an error since the consumer can push a sample to the pipe
4173 * and the rest of the session daemon could notify us of the
4174 * channel's destruction before we get a chance to process that
4175 * sample.
4176 */
4177 DBG("[notification-thread] Received a sample for an unknown channel from consumerd, key = %" PRIu64 " in %s domain",
4178 latest_sample.key.key,
4179 domain == LTTNG_DOMAIN_KERNEL ? "kernel" :
4180 "user space");
4181 goto end_unlock;
4182 }
4183 channel_info = caa_container_of(node, struct channel_info,
4184 channels_ht_node);
4185 DBG("[notification-thread] Handling channel sample for channel %s (key = %" PRIu64 ") in session %s (highest usage = %" PRIu64 ", lowest usage = %" PRIu64", total consumed = %" PRIu64")",
4186 channel_info->name,
4187 latest_sample.key.key,
4188 channel_info->session_info->name,
4189 latest_sample.highest_usage,
4190 latest_sample.lowest_usage,
4191 latest_sample.channel_total_consumed);
4192
4193 previous_session_consumed_total =
4194 channel_info->session_info->consumed_data_size;
4195
4196 /* Retrieve the channel's last sample, if it exists, and update it. */
4197 cds_lfht_lookup(state->channel_state_ht,
4198 hash_channel_key(&latest_sample.key),
4199 match_channel_state_sample,
4200 &latest_sample.key,
4201 &iter);
4202 node = cds_lfht_iter_get_node(&iter);
4203 if (caa_likely(node)) {
4204 struct channel_state_sample *stored_sample;
4205
4206 /* Update the sample stored. */
4207 stored_sample = caa_container_of(node,
4208 struct channel_state_sample,
4209 channel_state_ht_node);
4210
4211 memcpy(&previous_sample, stored_sample,
4212 sizeof(previous_sample));
4213 stored_sample->highest_usage = latest_sample.highest_usage;
4214 stored_sample->lowest_usage = latest_sample.lowest_usage;
4215 stored_sample->channel_total_consumed = latest_sample.channel_total_consumed;
4216 previous_sample_available = true;
4217
4218 latest_session_consumed_total =
4219 previous_session_consumed_total +
4220 (latest_sample.channel_total_consumed - previous_sample.channel_total_consumed);
4221 } else {
4222 /*
4223 * This is the channel's first sample, allocate space for and
4224 * store the new sample.
4225 */
4226 struct channel_state_sample *stored_sample;
4227
4228 stored_sample = zmalloc(sizeof(*stored_sample));
4229 if (!stored_sample) {
4230 ret = -1;
4231 goto end_unlock;
4232 }
4233
4234 memcpy(stored_sample, &latest_sample, sizeof(*stored_sample));
4235 cds_lfht_node_init(&stored_sample->channel_state_ht_node);
4236 cds_lfht_add(state->channel_state_ht,
4237 hash_channel_key(&stored_sample->key),
4238 &stored_sample->channel_state_ht_node);
4239
4240 latest_session_consumed_total =
4241 previous_session_consumed_total +
4242 latest_sample.channel_total_consumed;
4243 }
4244
4245 channel_info->session_info->consumed_data_size =
4246 latest_session_consumed_total;
4247
4248 /* Find triggers associated with this channel. */
4249 cds_lfht_lookup(state->channel_triggers_ht,
4250 hash_channel_key(&latest_sample.key),
4251 match_channel_trigger_list,
4252 &latest_sample.key,
4253 &iter);
4254 node = cds_lfht_iter_get_node(&iter);
4255 if (caa_likely(!node)) {
4256 goto end_unlock;
4257 }
4258
4259 channel_creds = (typeof(channel_creds)) {
4260 .uid = LTTNG_OPTIONAL_INIT_VALUE(channel_info->session_info->uid),
4261 .gid = LTTNG_OPTIONAL_INIT_VALUE(channel_info->session_info->gid),
4262 };
4263
4264 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
4265 channel_triggers_ht_node);
4266 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
4267 node) {
4268 const struct lttng_condition *condition;
4269 struct lttng_trigger *trigger;
4270 struct notification_client_list *client_list = NULL;
4271 struct lttng_evaluation *evaluation = NULL;
4272 enum action_executor_status executor_status;
4273
4274 ret = 0;
4275 trigger = trigger_list_element->trigger;
4276 condition = lttng_trigger_get_const_condition(trigger);
4277 assert(condition);
4278
4279 /*
4280 * Check if any client is subscribed to the result of this
4281 * evaluation.
4282 */
4283 client_list = get_client_list_from_condition(state, condition);
4284
4285 ret = evaluate_buffer_condition(condition, &evaluation, state,
4286 previous_sample_available ? &previous_sample : NULL,
4287 &latest_sample,
4288 previous_session_consumed_total,
4289 latest_session_consumed_total,
4290 channel_info);
4291 if (caa_unlikely(ret)) {
4292 goto put_list;
4293 }
4294
4295 if (caa_likely(!evaluation)) {
4296 goto put_list;
4297 }
4298
4299 if (!lttng_trigger_should_fire(trigger)) {
4300 goto put_list;
4301 }
4302
4303 lttng_trigger_fire(trigger);
4304
4305 /*
4306 * Ownership of `evaluation` transferred to the action executor
4307 * no matter the result.
4308 */
4309 executor_status = action_executor_enqueue(state->executor,
4310 trigger, evaluation, &channel_creds,
4311 client_list);
4312 evaluation = NULL;
4313 switch (executor_status) {
4314 case ACTION_EXECUTOR_STATUS_OK:
4315 break;
4316 case ACTION_EXECUTOR_STATUS_ERROR:
4317 case ACTION_EXECUTOR_STATUS_INVALID:
4318 /*
4319 * TODO Add trigger identification (name/id) when
4320 * it is added to the API.
4321 */
4322 ERR("Fatal error occurred while enqueuing action associated with buffer-condition trigger");
4323 ret = -1;
4324 goto put_list;
4325 case ACTION_EXECUTOR_STATUS_OVERFLOW:
4326 /*
4327 * TODO Add trigger identification (name/id) when
4328 * it is added to the API.
4329 *
4330 * Not a fatal error.
4331 */
4332 WARN("No space left when enqueuing action associated with buffer-condition trigger");
4333 ret = 0;
4334 goto put_list;
4335 default:
4336 abort();
4337 }
4338
4339 put_list:
4340 notification_client_list_put(client_list);
4341 if (caa_unlikely(ret)) {
4342 break;
4343 }
4344 }
4345 end_unlock:
4346 rcu_read_unlock();
4347 end:
4348 return ret;
4349 }
This page took 0.194177 seconds and 4 git commands to generate.