sessiond: trigger: run trigger actions through an action executor
[lttng-tools.git] / src / bin / lttng-sessiond / notification-thread-events.c
1 /*
2 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * SPDX-License-Identifier: GPL-2.0-only
5 *
6 */
7
8 #define _LGPL_SOURCE
9 #include <urcu.h>
10 #include <urcu/rculfhash.h>
11
12 #include <common/defaults.h>
13 #include <common/error.h>
14 #include <common/futex.h>
15 #include <common/unix.h>
16 #include <common/dynamic-buffer.h>
17 #include <common/hashtable/utils.h>
18 #include <common/sessiond-comm/sessiond-comm.h>
19 #include <common/macros.h>
20 #include <lttng/condition/condition.h>
21 #include <lttng/action/action-internal.h>
22 #include <lttng/notification/notification-internal.h>
23 #include <lttng/condition/condition-internal.h>
24 #include <lttng/condition/buffer-usage-internal.h>
25 #include <lttng/condition/session-consumed-size-internal.h>
26 #include <lttng/condition/session-rotation-internal.h>
27 #include <lttng/notification/channel-internal.h>
28
29 #include <time.h>
30 #include <unistd.h>
31 #include <assert.h>
32 #include <inttypes.h>
33 #include <fcntl.h>
34
35 #include "notification-thread.h"
36 #include "notification-thread-events.h"
37 #include "notification-thread-commands.h"
38 #include "lttng-sessiond.h"
39 #include "kernel.h"
40
41 #define CLIENT_POLL_MASK_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
42 #define CLIENT_POLL_MASK_IN_OUT (CLIENT_POLL_MASK_IN | LPOLLOUT)
43
44 enum lttng_object_type {
45 LTTNG_OBJECT_TYPE_UNKNOWN,
46 LTTNG_OBJECT_TYPE_NONE,
47 LTTNG_OBJECT_TYPE_CHANNEL,
48 LTTNG_OBJECT_TYPE_SESSION,
49 };
50
51 struct lttng_trigger_list_element {
52 /* No ownership of the trigger object is assumed. */
53 struct lttng_trigger *trigger;
54 struct cds_list_head node;
55 };
56
57 struct lttng_channel_trigger_list {
58 struct channel_key channel_key;
59 /* List of struct lttng_trigger_list_element. */
60 struct cds_list_head list;
61 /* Node in the channel_triggers_ht */
62 struct cds_lfht_node channel_triggers_ht_node;
63 /* call_rcu delayed reclaim. */
64 struct rcu_head rcu_node;
65 };
66
67 /*
68 * List of triggers applying to a given session.
69 *
70 * See:
71 * - lttng_session_trigger_list_create()
72 * - lttng_session_trigger_list_build()
73 * - lttng_session_trigger_list_destroy()
74 * - lttng_session_trigger_list_add()
75 */
76 struct lttng_session_trigger_list {
77 /*
78 * Not owned by this; points to the session_info structure's
79 * session name.
80 */
81 const char *session_name;
82 /* List of struct lttng_trigger_list_element. */
83 struct cds_list_head list;
84 /* Node in the session_triggers_ht */
85 struct cds_lfht_node session_triggers_ht_node;
86 /*
87 * Weak reference to the notification system's session triggers
88 * hashtable.
89 *
90 * The session trigger list structure structure is owned by
91 * the session's session_info.
92 *
93 * The session_info is kept alive the the channel_infos holding a
94 * reference to it (reference counting). When those channels are
95 * destroyed (at runtime or on teardown), the reference they hold
96 * to the session_info are released. On destruction of session_info,
97 * session_info_destroy() will remove the list of triggers applying
98 * to this session from the notification system's state.
99 *
100 * This implies that the session_triggers_ht must be destroyed
101 * after the channels.
102 */
103 struct cds_lfht *session_triggers_ht;
104 /* Used for delayed RCU reclaim. */
105 struct rcu_head rcu_node;
106 };
107
108 struct lttng_trigger_ht_element {
109 struct lttng_trigger *trigger;
110 struct cds_lfht_node node;
111 /* call_rcu delayed reclaim. */
112 struct rcu_head rcu_node;
113 };
114
115 struct lttng_condition_list_element {
116 struct lttng_condition *condition;
117 struct cds_list_head node;
118 };
119
120 struct channel_state_sample {
121 struct channel_key key;
122 struct cds_lfht_node channel_state_ht_node;
123 uint64_t highest_usage;
124 uint64_t lowest_usage;
125 uint64_t channel_total_consumed;
126 /* call_rcu delayed reclaim. */
127 struct rcu_head rcu_node;
128 };
129
130 static unsigned long hash_channel_key(struct channel_key *key);
131 static int evaluate_buffer_condition(const struct lttng_condition *condition,
132 struct lttng_evaluation **evaluation,
133 const struct notification_thread_state *state,
134 const struct channel_state_sample *previous_sample,
135 const struct channel_state_sample *latest_sample,
136 uint64_t previous_session_consumed_total,
137 uint64_t latest_session_consumed_total,
138 struct channel_info *channel_info);
139 static
140 int send_evaluation_to_clients(const struct lttng_trigger *trigger,
141 const struct lttng_evaluation *evaluation,
142 struct notification_client_list *client_list,
143 struct notification_thread_state *state,
144 uid_t channel_uid, gid_t channel_gid);
145
146
147 /* session_info API */
148 static
149 void session_info_destroy(void *_data);
150 static
151 void session_info_get(struct session_info *session_info);
152 static
153 void session_info_put(struct session_info *session_info);
154 static
155 struct session_info *session_info_create(const char *name,
156 uid_t uid, gid_t gid,
157 struct lttng_session_trigger_list *trigger_list,
158 struct cds_lfht *sessions_ht);
159 static
160 void session_info_add_channel(struct session_info *session_info,
161 struct channel_info *channel_info);
162 static
163 void session_info_remove_channel(struct session_info *session_info,
164 struct channel_info *channel_info);
165
166 /* lttng_session_trigger_list API */
167 static
168 struct lttng_session_trigger_list *lttng_session_trigger_list_create(
169 const char *session_name,
170 struct cds_lfht *session_triggers_ht);
171 static
172 struct lttng_session_trigger_list *lttng_session_trigger_list_build(
173 const struct notification_thread_state *state,
174 const char *session_name);
175 static
176 void lttng_session_trigger_list_destroy(
177 struct lttng_session_trigger_list *list);
178 static
179 int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
180 struct lttng_trigger *trigger);
181
182 static
183 int client_handle_transmission_status(
184 struct notification_client *client,
185 enum client_transmission_status transmission_status,
186 struct notification_thread_state *state);
187
188 static
189 int match_client_socket(struct cds_lfht_node *node, const void *key)
190 {
191 /* This double-cast is intended to supress pointer-to-cast warning. */
192 const int socket = (int) (intptr_t) key;
193 const struct notification_client *client = caa_container_of(node,
194 struct notification_client, client_socket_ht_node);
195
196 return client->socket == socket;
197 }
198
199 static
200 int match_client_id(struct cds_lfht_node *node, const void *key)
201 {
202 /* This double-cast is intended to supress pointer-to-cast warning. */
203 const notification_client_id id = *((notification_client_id *) key);
204 const struct notification_client *client = caa_container_of(
205 node, struct notification_client, client_id_ht_node);
206
207 return client->id == id;
208 }
209
210 static
211 int match_channel_trigger_list(struct cds_lfht_node *node, const void *key)
212 {
213 struct channel_key *channel_key = (struct channel_key *) key;
214 struct lttng_channel_trigger_list *trigger_list;
215
216 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
217 channel_triggers_ht_node);
218
219 return !!((channel_key->key == trigger_list->channel_key.key) &&
220 (channel_key->domain == trigger_list->channel_key.domain));
221 }
222
223 static
224 int match_session_trigger_list(struct cds_lfht_node *node, const void *key)
225 {
226 const char *session_name = (const char *) key;
227 struct lttng_session_trigger_list *trigger_list;
228
229 trigger_list = caa_container_of(node, struct lttng_session_trigger_list,
230 session_triggers_ht_node);
231
232 return !!(strcmp(trigger_list->session_name, session_name) == 0);
233 }
234
235 static
236 int match_channel_state_sample(struct cds_lfht_node *node, const void *key)
237 {
238 struct channel_key *channel_key = (struct channel_key *) key;
239 struct channel_state_sample *sample;
240
241 sample = caa_container_of(node, struct channel_state_sample,
242 channel_state_ht_node);
243
244 return !!((channel_key->key == sample->key.key) &&
245 (channel_key->domain == sample->key.domain));
246 }
247
248 static
249 int match_channel_info(struct cds_lfht_node *node, const void *key)
250 {
251 struct channel_key *channel_key = (struct channel_key *) key;
252 struct channel_info *channel_info;
253
254 channel_info = caa_container_of(node, struct channel_info,
255 channels_ht_node);
256
257 return !!((channel_key->key == channel_info->key.key) &&
258 (channel_key->domain == channel_info->key.domain));
259 }
260
261 static
262 int match_condition(struct cds_lfht_node *node, const void *key)
263 {
264 struct lttng_condition *condition_key = (struct lttng_condition *) key;
265 struct lttng_trigger_ht_element *trigger;
266 struct lttng_condition *condition;
267
268 trigger = caa_container_of(node, struct lttng_trigger_ht_element,
269 node);
270 condition = lttng_trigger_get_condition(trigger->trigger);
271 assert(condition);
272
273 return !!lttng_condition_is_equal(condition_key, condition);
274 }
275
276 static
277 int match_client_list_condition(struct cds_lfht_node *node, const void *key)
278 {
279 struct lttng_condition *condition_key = (struct lttng_condition *) key;
280 struct notification_client_list *client_list;
281 const struct lttng_condition *condition;
282
283 assert(condition_key);
284
285 client_list = caa_container_of(node, struct notification_client_list,
286 notification_trigger_clients_ht_node);
287 condition = lttng_trigger_get_const_condition(client_list->trigger);
288
289 return !!lttng_condition_is_equal(condition_key, condition);
290 }
291
292 static
293 int match_session(struct cds_lfht_node *node, const void *key)
294 {
295 const char *name = key;
296 struct session_info *session_info = caa_container_of(
297 node, struct session_info, sessions_ht_node);
298
299 return !strcmp(session_info->name, name);
300 }
301
302 static
303 unsigned long lttng_condition_buffer_usage_hash(
304 const struct lttng_condition *_condition)
305 {
306 unsigned long hash;
307 unsigned long condition_type;
308 struct lttng_condition_buffer_usage *condition;
309
310 condition = container_of(_condition,
311 struct lttng_condition_buffer_usage, parent);
312
313 condition_type = (unsigned long) condition->parent.type;
314 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
315 if (condition->session_name) {
316 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
317 }
318 if (condition->channel_name) {
319 hash ^= hash_key_str(condition->channel_name, lttng_ht_seed);
320 }
321 if (condition->domain.set) {
322 hash ^= hash_key_ulong(
323 (void *) condition->domain.type,
324 lttng_ht_seed);
325 }
326 if (condition->threshold_ratio.set) {
327 uint64_t val;
328
329 val = condition->threshold_ratio.value * (double) UINT32_MAX;
330 hash ^= hash_key_u64(&val, lttng_ht_seed);
331 } else if (condition->threshold_bytes.set) {
332 uint64_t val;
333
334 val = condition->threshold_bytes.value;
335 hash ^= hash_key_u64(&val, lttng_ht_seed);
336 }
337 return hash;
338 }
339
340 static
341 unsigned long lttng_condition_session_consumed_size_hash(
342 const struct lttng_condition *_condition)
343 {
344 unsigned long hash;
345 unsigned long condition_type =
346 (unsigned long) LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE;
347 struct lttng_condition_session_consumed_size *condition;
348 uint64_t val;
349
350 condition = container_of(_condition,
351 struct lttng_condition_session_consumed_size, parent);
352
353 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
354 if (condition->session_name) {
355 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
356 }
357 val = condition->consumed_threshold_bytes.value;
358 hash ^= hash_key_u64(&val, lttng_ht_seed);
359 return hash;
360 }
361
362 static
363 unsigned long lttng_condition_session_rotation_hash(
364 const struct lttng_condition *_condition)
365 {
366 unsigned long hash, condition_type;
367 struct lttng_condition_session_rotation *condition;
368
369 condition = container_of(_condition,
370 struct lttng_condition_session_rotation, parent);
371 condition_type = (unsigned long) condition->parent.type;
372 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
373 assert(condition->session_name);
374 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
375 return hash;
376 }
377
378 /*
379 * The lttng_condition hashing code is kept in this file (rather than
380 * condition.c) since it makes use of GPLv2 code (hashtable utils), which we
381 * don't want to link in liblttng-ctl.
382 */
383 static
384 unsigned long lttng_condition_hash(const struct lttng_condition *condition)
385 {
386 switch (condition->type) {
387 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
388 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
389 return lttng_condition_buffer_usage_hash(condition);
390 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
391 return lttng_condition_session_consumed_size_hash(condition);
392 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
393 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
394 return lttng_condition_session_rotation_hash(condition);
395 default:
396 ERR("[notification-thread] Unexpected condition type caught");
397 abort();
398 }
399 }
400
401 static
402 unsigned long hash_channel_key(struct channel_key *key)
403 {
404 unsigned long key_hash = hash_key_u64(&key->key, lttng_ht_seed);
405 unsigned long domain_hash = hash_key_ulong(
406 (void *) (unsigned long) key->domain, lttng_ht_seed);
407
408 return key_hash ^ domain_hash;
409 }
410
411 static
412 unsigned long hash_client_socket(int socket)
413 {
414 return hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed);
415 }
416
417 static
418 unsigned long hash_client_id(notification_client_id id)
419 {
420 return hash_key_u64(&id, lttng_ht_seed);
421 }
422
423 /*
424 * Get the type of object to which a given condition applies. Bindings let
425 * the notification system evaluate a trigger's condition when a given
426 * object's state is updated.
427 *
428 * For instance, a condition bound to a channel will be evaluated everytime
429 * the channel's state is changed by a channel monitoring sample.
430 */
431 static
432 enum lttng_object_type get_condition_binding_object(
433 const struct lttng_condition *condition)
434 {
435 switch (lttng_condition_get_type(condition)) {
436 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
437 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
438 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
439 return LTTNG_OBJECT_TYPE_CHANNEL;
440 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
441 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
442 return LTTNG_OBJECT_TYPE_SESSION;
443 default:
444 return LTTNG_OBJECT_TYPE_UNKNOWN;
445 }
446 }
447
448 static
449 void free_channel_info_rcu(struct rcu_head *node)
450 {
451 free(caa_container_of(node, struct channel_info, rcu_node));
452 }
453
454 static
455 void channel_info_destroy(struct channel_info *channel_info)
456 {
457 if (!channel_info) {
458 return;
459 }
460
461 if (channel_info->session_info) {
462 session_info_remove_channel(channel_info->session_info,
463 channel_info);
464 session_info_put(channel_info->session_info);
465 }
466 if (channel_info->name) {
467 free(channel_info->name);
468 }
469 call_rcu(&channel_info->rcu_node, free_channel_info_rcu);
470 }
471
472 static
473 void free_session_info_rcu(struct rcu_head *node)
474 {
475 free(caa_container_of(node, struct session_info, rcu_node));
476 }
477
478 /* Don't call directly, use the ref-counting mechanism. */
479 static
480 void session_info_destroy(void *_data)
481 {
482 struct session_info *session_info = _data;
483 int ret;
484
485 assert(session_info);
486 if (session_info->channel_infos_ht) {
487 ret = cds_lfht_destroy(session_info->channel_infos_ht, NULL);
488 if (ret) {
489 ERR("[notification-thread] Failed to destroy channel information hash table");
490 }
491 }
492 lttng_session_trigger_list_destroy(session_info->trigger_list);
493
494 rcu_read_lock();
495 cds_lfht_del(session_info->sessions_ht,
496 &session_info->sessions_ht_node);
497 rcu_read_unlock();
498 free(session_info->name);
499 call_rcu(&session_info->rcu_node, free_session_info_rcu);
500 }
501
502 static
503 void session_info_get(struct session_info *session_info)
504 {
505 if (!session_info) {
506 return;
507 }
508 lttng_ref_get(&session_info->ref);
509 }
510
511 static
512 void session_info_put(struct session_info *session_info)
513 {
514 if (!session_info) {
515 return;
516 }
517 lttng_ref_put(&session_info->ref);
518 }
519
520 static
521 struct session_info *session_info_create(const char *name, uid_t uid, gid_t gid,
522 struct lttng_session_trigger_list *trigger_list,
523 struct cds_lfht *sessions_ht)
524 {
525 struct session_info *session_info;
526
527 assert(name);
528
529 session_info = zmalloc(sizeof(*session_info));
530 if (!session_info) {
531 goto end;
532 }
533 lttng_ref_init(&session_info->ref, session_info_destroy);
534
535 session_info->channel_infos_ht = cds_lfht_new(DEFAULT_HT_SIZE,
536 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
537 if (!session_info->channel_infos_ht) {
538 goto error;
539 }
540
541 cds_lfht_node_init(&session_info->sessions_ht_node);
542 session_info->name = strdup(name);
543 if (!session_info->name) {
544 goto error;
545 }
546 session_info->uid = uid;
547 session_info->gid = gid;
548 session_info->trigger_list = trigger_list;
549 session_info->sessions_ht = sessions_ht;
550 end:
551 return session_info;
552 error:
553 session_info_put(session_info);
554 return NULL;
555 }
556
557 static
558 void session_info_add_channel(struct session_info *session_info,
559 struct channel_info *channel_info)
560 {
561 rcu_read_lock();
562 cds_lfht_add(session_info->channel_infos_ht,
563 hash_channel_key(&channel_info->key),
564 &channel_info->session_info_channels_ht_node);
565 rcu_read_unlock();
566 }
567
568 static
569 void session_info_remove_channel(struct session_info *session_info,
570 struct channel_info *channel_info)
571 {
572 rcu_read_lock();
573 cds_lfht_del(session_info->channel_infos_ht,
574 &channel_info->session_info_channels_ht_node);
575 rcu_read_unlock();
576 }
577
578 static
579 struct channel_info *channel_info_create(const char *channel_name,
580 struct channel_key *channel_key, uint64_t channel_capacity,
581 struct session_info *session_info)
582 {
583 struct channel_info *channel_info = zmalloc(sizeof(*channel_info));
584
585 if (!channel_info) {
586 goto end;
587 }
588
589 cds_lfht_node_init(&channel_info->channels_ht_node);
590 cds_lfht_node_init(&channel_info->session_info_channels_ht_node);
591 memcpy(&channel_info->key, channel_key, sizeof(*channel_key));
592 channel_info->capacity = channel_capacity;
593
594 channel_info->name = strdup(channel_name);
595 if (!channel_info->name) {
596 goto error;
597 }
598
599 /*
600 * Set the references between session and channel infos:
601 * - channel_info holds a strong reference to session_info
602 * - session_info holds a weak reference to channel_info
603 */
604 session_info_get(session_info);
605 session_info_add_channel(session_info, channel_info);
606 channel_info->session_info = session_info;
607 end:
608 return channel_info;
609 error:
610 channel_info_destroy(channel_info);
611 return NULL;
612 }
613
614 LTTNG_HIDDEN
615 bool notification_client_list_get(struct notification_client_list *list)
616 {
617 return urcu_ref_get_unless_zero(&list->ref);
618 }
619
620 static
621 void free_notification_client_list_rcu(struct rcu_head *node)
622 {
623 free(caa_container_of(node, struct notification_client_list,
624 rcu_node));
625 }
626
627 static
628 void notification_client_list_release(struct urcu_ref *list_ref)
629 {
630 struct notification_client_list *list =
631 container_of(list_ref, typeof(*list), ref);
632 struct notification_client_list_element *client_list_element, *tmp;
633
634 if (list->notification_trigger_clients_ht) {
635 rcu_read_lock();
636 cds_lfht_del(list->notification_trigger_clients_ht,
637 &list->notification_trigger_clients_ht_node);
638 rcu_read_unlock();
639 list->notification_trigger_clients_ht = NULL;
640 }
641 cds_list_for_each_entry_safe(client_list_element, tmp,
642 &list->list, node) {
643 free(client_list_element);
644 }
645 pthread_mutex_destroy(&list->lock);
646 call_rcu(&list->rcu_node, free_notification_client_list_rcu);
647 }
648
649 static
650 struct notification_client_list *notification_client_list_create(
651 const struct lttng_trigger *trigger)
652 {
653 struct notification_client_list *client_list =
654 zmalloc(sizeof(*client_list));
655
656 if (!client_list) {
657 goto error;
658 }
659 pthread_mutex_init(&client_list->lock, NULL);
660 urcu_ref_init(&client_list->ref);
661 cds_lfht_node_init(&client_list->notification_trigger_clients_ht_node);
662 CDS_INIT_LIST_HEAD(&client_list->list);
663 client_list->trigger = trigger;
664 error:
665 return client_list;
666 }
667
668 static
669 void publish_notification_client_list(
670 struct notification_thread_state *state,
671 struct notification_client_list *list)
672 {
673 const struct lttng_condition *condition =
674 lttng_trigger_get_const_condition(list->trigger);
675
676 assert(!list->notification_trigger_clients_ht);
677
678 list->notification_trigger_clients_ht =
679 state->notification_trigger_clients_ht;
680
681 rcu_read_lock();
682 cds_lfht_add(state->notification_trigger_clients_ht,
683 lttng_condition_hash(condition),
684 &list->notification_trigger_clients_ht_node);
685 rcu_read_unlock();
686 }
687
688 LTTNG_HIDDEN
689 void notification_client_list_put(struct notification_client_list *list)
690 {
691 if (!list) {
692 return;
693 }
694 return urcu_ref_put(&list->ref, notification_client_list_release);
695 }
696
697 /* Provides a reference to the returned list. */
698 static
699 struct notification_client_list *get_client_list_from_condition(
700 struct notification_thread_state *state,
701 const struct lttng_condition *condition)
702 {
703 struct cds_lfht_node *node;
704 struct cds_lfht_iter iter;
705 struct notification_client_list *list = NULL;
706
707 rcu_read_lock();
708 cds_lfht_lookup(state->notification_trigger_clients_ht,
709 lttng_condition_hash(condition),
710 match_client_list_condition,
711 condition,
712 &iter);
713 node = cds_lfht_iter_get_node(&iter);
714 if (node) {
715 list = container_of(node, struct notification_client_list,
716 notification_trigger_clients_ht_node);
717 list = notification_client_list_get(list) ? list : NULL;
718 }
719
720 rcu_read_unlock();
721 return list;
722 }
723
724 static
725 int evaluate_channel_condition_for_client(
726 const struct lttng_condition *condition,
727 struct notification_thread_state *state,
728 struct lttng_evaluation **evaluation,
729 uid_t *session_uid, gid_t *session_gid)
730 {
731 int ret;
732 struct cds_lfht_iter iter;
733 struct cds_lfht_node *node;
734 struct channel_info *channel_info = NULL;
735 struct channel_key *channel_key = NULL;
736 struct channel_state_sample *last_sample = NULL;
737 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
738
739 rcu_read_lock();
740
741 /* Find the channel associated with the condition. */
742 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter,
743 channel_trigger_list, channel_triggers_ht_node) {
744 struct lttng_trigger_list_element *element;
745
746 cds_list_for_each_entry(element, &channel_trigger_list->list, node) {
747 const struct lttng_condition *current_condition =
748 lttng_trigger_get_const_condition(
749 element->trigger);
750
751 assert(current_condition);
752 if (!lttng_condition_is_equal(condition,
753 current_condition)) {
754 continue;
755 }
756
757 /* Found the trigger, save the channel key. */
758 channel_key = &channel_trigger_list->channel_key;
759 break;
760 }
761 if (channel_key) {
762 /* The channel key was found stop iteration. */
763 break;
764 }
765 }
766
767 if (!channel_key){
768 /* No channel found; normal exit. */
769 DBG("[notification-thread] No known channel associated with newly subscribed-to condition");
770 ret = 0;
771 goto end;
772 }
773
774 /* Fetch channel info for the matching channel. */
775 cds_lfht_lookup(state->channels_ht,
776 hash_channel_key(channel_key),
777 match_channel_info,
778 channel_key,
779 &iter);
780 node = cds_lfht_iter_get_node(&iter);
781 assert(node);
782 channel_info = caa_container_of(node, struct channel_info,
783 channels_ht_node);
784
785 /* Retrieve the channel's last sample, if it exists. */
786 cds_lfht_lookup(state->channel_state_ht,
787 hash_channel_key(channel_key),
788 match_channel_state_sample,
789 channel_key,
790 &iter);
791 node = cds_lfht_iter_get_node(&iter);
792 if (node) {
793 last_sample = caa_container_of(node,
794 struct channel_state_sample,
795 channel_state_ht_node);
796 } else {
797 /* Nothing to evaluate, no sample was ever taken. Normal exit */
798 DBG("[notification-thread] No channel sample associated with newly subscribed-to condition");
799 ret = 0;
800 goto end;
801 }
802
803 ret = evaluate_buffer_condition(condition, evaluation, state,
804 NULL, last_sample,
805 0, channel_info->session_info->consumed_data_size,
806 channel_info);
807 if (ret) {
808 WARN("[notification-thread] Fatal error occurred while evaluating a newly subscribed-to condition");
809 goto end;
810 }
811
812 *session_uid = channel_info->session_info->uid;
813 *session_gid = channel_info->session_info->gid;
814 end:
815 rcu_read_unlock();
816 return ret;
817 }
818
819 static
820 const char *get_condition_session_name(const struct lttng_condition *condition)
821 {
822 const char *session_name = NULL;
823 enum lttng_condition_status status;
824
825 switch (lttng_condition_get_type(condition)) {
826 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
827 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
828 status = lttng_condition_buffer_usage_get_session_name(
829 condition, &session_name);
830 break;
831 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
832 status = lttng_condition_session_consumed_size_get_session_name(
833 condition, &session_name);
834 break;
835 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
836 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
837 status = lttng_condition_session_rotation_get_session_name(
838 condition, &session_name);
839 break;
840 default:
841 abort();
842 }
843 if (status != LTTNG_CONDITION_STATUS_OK) {
844 ERR("[notification-thread] Failed to retrieve session rotation condition's session name");
845 goto end;
846 }
847 end:
848 return session_name;
849 }
850
851 static
852 int evaluate_session_condition_for_client(
853 const struct lttng_condition *condition,
854 struct notification_thread_state *state,
855 struct lttng_evaluation **evaluation,
856 uid_t *session_uid, gid_t *session_gid)
857 {
858 int ret;
859 struct cds_lfht_iter iter;
860 struct cds_lfht_node *node;
861 const char *session_name;
862 struct session_info *session_info = NULL;
863
864 rcu_read_lock();
865 session_name = get_condition_session_name(condition);
866
867 /* Find the session associated with the trigger. */
868 cds_lfht_lookup(state->sessions_ht,
869 hash_key_str(session_name, lttng_ht_seed),
870 match_session,
871 session_name,
872 &iter);
873 node = cds_lfht_iter_get_node(&iter);
874 if (!node) {
875 DBG("[notification-thread] No known session matching name \"%s\"",
876 session_name);
877 ret = 0;
878 goto end;
879 }
880
881 session_info = caa_container_of(node, struct session_info,
882 sessions_ht_node);
883 session_info_get(session_info);
884
885 /*
886 * Evaluation is performed in-line here since only one type of
887 * session-bound condition is handled for the moment.
888 */
889 switch (lttng_condition_get_type(condition)) {
890 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
891 if (!session_info->rotation.ongoing) {
892 ret = 0;
893 goto end_session_put;
894 }
895
896 *evaluation = lttng_evaluation_session_rotation_ongoing_create(
897 session_info->rotation.id);
898 if (!*evaluation) {
899 /* Fatal error. */
900 ERR("[notification-thread] Failed to create session rotation ongoing evaluation for session \"%s\"",
901 session_info->name);
902 ret = -1;
903 goto end_session_put;
904 }
905 ret = 0;
906 break;
907 default:
908 ret = 0;
909 goto end_session_put;
910 }
911
912 *session_uid = session_info->uid;
913 *session_gid = session_info->gid;
914
915 end_session_put:
916 session_info_put(session_info);
917 end:
918 rcu_read_unlock();
919 return ret;
920 }
921
922 static
923 int evaluate_condition_for_client(const struct lttng_trigger *trigger,
924 const struct lttng_condition *condition,
925 struct notification_client *client,
926 struct notification_thread_state *state)
927 {
928 int ret;
929 struct lttng_evaluation *evaluation = NULL;
930 struct notification_client_list client_list = {
931 .lock = PTHREAD_MUTEX_INITIALIZER,
932 };
933 struct notification_client_list_element client_list_element = { 0 };
934 uid_t object_uid = 0;
935 gid_t object_gid = 0;
936
937 assert(trigger);
938 assert(condition);
939 assert(client);
940 assert(state);
941
942 switch (get_condition_binding_object(condition)) {
943 case LTTNG_OBJECT_TYPE_SESSION:
944 ret = evaluate_session_condition_for_client(condition, state,
945 &evaluation, &object_uid, &object_gid);
946 break;
947 case LTTNG_OBJECT_TYPE_CHANNEL:
948 ret = evaluate_channel_condition_for_client(condition, state,
949 &evaluation, &object_uid, &object_gid);
950 break;
951 case LTTNG_OBJECT_TYPE_NONE:
952 ret = 0;
953 goto end;
954 case LTTNG_OBJECT_TYPE_UNKNOWN:
955 default:
956 ret = -1;
957 goto end;
958 }
959 if (ret) {
960 /* Fatal error. */
961 goto end;
962 }
963 if (!evaluation) {
964 /* Evaluation yielded nothing. Normal exit. */
965 DBG("[notification-thread] Newly subscribed-to condition evaluated to false, nothing to report to client");
966 ret = 0;
967 goto end;
968 }
969
970 /*
971 * Create a temporary client list with the client currently
972 * subscribing.
973 */
974 cds_lfht_node_init(&client_list.notification_trigger_clients_ht_node);
975 CDS_INIT_LIST_HEAD(&client_list.list);
976 client_list.trigger = trigger;
977
978 CDS_INIT_LIST_HEAD(&client_list_element.node);
979 client_list_element.client = client;
980 cds_list_add(&client_list_element.node, &client_list.list);
981
982 /* Send evaluation result to the newly-subscribed client. */
983 DBG("[notification-thread] Newly subscribed-to condition evaluated to true, notifying client");
984 ret = send_evaluation_to_clients(trigger, evaluation, &client_list,
985 state, object_uid, object_gid);
986
987 end:
988 return ret;
989 }
990
991 static
992 int notification_thread_client_subscribe(struct notification_client *client,
993 struct lttng_condition *condition,
994 struct notification_thread_state *state,
995 enum lttng_notification_channel_status *_status)
996 {
997 int ret = 0;
998 struct notification_client_list *client_list = NULL;
999 struct lttng_condition_list_element *condition_list_element = NULL;
1000 struct notification_client_list_element *client_list_element = NULL;
1001 enum lttng_notification_channel_status status =
1002 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
1003
1004 /*
1005 * Ensure that the client has not already subscribed to this condition
1006 * before.
1007 */
1008 cds_list_for_each_entry(condition_list_element, &client->condition_list, node) {
1009 if (lttng_condition_is_equal(condition_list_element->condition,
1010 condition)) {
1011 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ALREADY_SUBSCRIBED;
1012 goto end;
1013 }
1014 }
1015
1016 condition_list_element = zmalloc(sizeof(*condition_list_element));
1017 if (!condition_list_element) {
1018 ret = -1;
1019 goto error;
1020 }
1021 client_list_element = zmalloc(sizeof(*client_list_element));
1022 if (!client_list_element) {
1023 ret = -1;
1024 goto error;
1025 }
1026
1027 /*
1028 * Add the newly-subscribed condition to the client's subscription list.
1029 */
1030 CDS_INIT_LIST_HEAD(&condition_list_element->node);
1031 condition_list_element->condition = condition;
1032 cds_list_add(&condition_list_element->node, &client->condition_list);
1033
1034 client_list = get_client_list_from_condition(state, condition);
1035 if (!client_list) {
1036 /*
1037 * No notification-emiting trigger registered with this
1038 * condition. We don't evaluate the condition right away
1039 * since this trigger is not registered yet.
1040 */
1041 free(client_list_element);
1042 goto end;
1043 }
1044
1045 /*
1046 * The condition to which the client just subscribed is evaluated
1047 * at this point so that conditions that are already TRUE result
1048 * in a notification being sent out.
1049 *
1050 * The client_list's trigger is used without locking the list itself.
1051 * This is correct since the list doesn't own the trigger and the
1052 * object is immutable.
1053 */
1054 if (evaluate_condition_for_client(client_list->trigger, condition,
1055 client, state)) {
1056 WARN("[notification-thread] Evaluation of a condition on client subscription failed, aborting.");
1057 ret = -1;
1058 free(client_list_element);
1059 goto end;
1060 }
1061
1062 /*
1063 * Add the client to the list of clients interested in a given trigger
1064 * if a "notification" trigger with a corresponding condition was
1065 * added prior.
1066 */
1067 client_list_element->client = client;
1068 CDS_INIT_LIST_HEAD(&client_list_element->node);
1069
1070 pthread_mutex_lock(&client_list->lock);
1071 cds_list_add(&client_list_element->node, &client_list->list);
1072 pthread_mutex_unlock(&client_list->lock);
1073 end:
1074 if (_status) {
1075 *_status = status;
1076 }
1077 if (client_list) {
1078 notification_client_list_put(client_list);
1079 }
1080 return ret;
1081 error:
1082 free(condition_list_element);
1083 free(client_list_element);
1084 return ret;
1085 }
1086
1087 static
1088 int notification_thread_client_unsubscribe(
1089 struct notification_client *client,
1090 struct lttng_condition *condition,
1091 struct notification_thread_state *state,
1092 enum lttng_notification_channel_status *_status)
1093 {
1094 struct notification_client_list *client_list;
1095 struct lttng_condition_list_element *condition_list_element,
1096 *condition_tmp;
1097 struct notification_client_list_element *client_list_element,
1098 *client_tmp;
1099 bool condition_found = false;
1100 enum lttng_notification_channel_status status =
1101 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
1102
1103 /* Remove the condition from the client's condition list. */
1104 cds_list_for_each_entry_safe(condition_list_element, condition_tmp,
1105 &client->condition_list, node) {
1106 if (!lttng_condition_is_equal(condition_list_element->condition,
1107 condition)) {
1108 continue;
1109 }
1110
1111 cds_list_del(&condition_list_element->node);
1112 /*
1113 * The caller may be iterating on the client's conditions to
1114 * tear down a client's connection. In this case, the condition
1115 * will be destroyed at the end.
1116 */
1117 if (condition != condition_list_element->condition) {
1118 lttng_condition_destroy(
1119 condition_list_element->condition);
1120 }
1121 free(condition_list_element);
1122 condition_found = true;
1123 break;
1124 }
1125
1126 if (!condition_found) {
1127 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNKNOWN_CONDITION;
1128 goto end;
1129 }
1130
1131 /*
1132 * Remove the client from the list of clients interested the trigger
1133 * matching the condition.
1134 */
1135 client_list = get_client_list_from_condition(state, condition);
1136 if (!client_list) {
1137 goto end;
1138 }
1139
1140 pthread_mutex_lock(&client_list->lock);
1141 cds_list_for_each_entry_safe(client_list_element, client_tmp,
1142 &client_list->list, node) {
1143 if (client_list_element->client->id != client->id) {
1144 continue;
1145 }
1146 cds_list_del(&client_list_element->node);
1147 free(client_list_element);
1148 break;
1149 }
1150 pthread_mutex_unlock(&client_list->lock);
1151 notification_client_list_put(client_list);
1152 client_list = NULL;
1153 end:
1154 lttng_condition_destroy(condition);
1155 if (_status) {
1156 *_status = status;
1157 }
1158 return 0;
1159 }
1160
1161 static
1162 void free_notification_client_rcu(struct rcu_head *node)
1163 {
1164 free(caa_container_of(node, struct notification_client, rcu_node));
1165 }
1166
1167 static
1168 void notification_client_destroy(struct notification_client *client,
1169 struct notification_thread_state *state)
1170 {
1171 if (!client) {
1172 return;
1173 }
1174
1175 /*
1176 * The client object is not reachable by other threads, no need to lock
1177 * the client here.
1178 */
1179 if (client->socket >= 0) {
1180 (void) lttcomm_close_unix_sock(client->socket);
1181 client->socket = -1;
1182 }
1183 client->communication.active = false;
1184 lttng_dynamic_buffer_reset(&client->communication.inbound.buffer);
1185 lttng_dynamic_buffer_reset(&client->communication.outbound.buffer);
1186 pthread_mutex_destroy(&client->lock);
1187 call_rcu(&client->rcu_node, free_notification_client_rcu);
1188 }
1189
1190 /*
1191 * Call with rcu_read_lock held (and hold for the lifetime of the returned
1192 * client pointer).
1193 */
1194 static
1195 struct notification_client *get_client_from_socket(int socket,
1196 struct notification_thread_state *state)
1197 {
1198 struct cds_lfht_iter iter;
1199 struct cds_lfht_node *node;
1200 struct notification_client *client = NULL;
1201
1202 cds_lfht_lookup(state->client_socket_ht,
1203 hash_client_socket(socket),
1204 match_client_socket,
1205 (void *) (unsigned long) socket,
1206 &iter);
1207 node = cds_lfht_iter_get_node(&iter);
1208 if (!node) {
1209 goto end;
1210 }
1211
1212 client = caa_container_of(node, struct notification_client,
1213 client_socket_ht_node);
1214 end:
1215 return client;
1216 }
1217
1218 /*
1219 * Call with rcu_read_lock held (and hold for the lifetime of the returned
1220 * client pointer).
1221 */
1222 static
1223 struct notification_client *get_client_from_id(notification_client_id id,
1224 struct notification_thread_state *state)
1225 {
1226 struct cds_lfht_iter iter;
1227 struct cds_lfht_node *node;
1228 struct notification_client *client = NULL;
1229
1230 cds_lfht_lookup(state->client_id_ht,
1231 hash_client_id(id),
1232 match_client_id,
1233 &id,
1234 &iter);
1235 node = cds_lfht_iter_get_node(&iter);
1236 if (!node) {
1237 goto end;
1238 }
1239
1240 client = caa_container_of(node, struct notification_client,
1241 client_id_ht_node);
1242 end:
1243 return client;
1244 }
1245
1246 static
1247 bool buffer_usage_condition_applies_to_channel(
1248 const struct lttng_condition *condition,
1249 const struct channel_info *channel_info)
1250 {
1251 enum lttng_condition_status status;
1252 enum lttng_domain_type condition_domain;
1253 const char *condition_session_name = NULL;
1254 const char *condition_channel_name = NULL;
1255
1256 status = lttng_condition_buffer_usage_get_domain_type(condition,
1257 &condition_domain);
1258 assert(status == LTTNG_CONDITION_STATUS_OK);
1259 if (channel_info->key.domain != condition_domain) {
1260 goto fail;
1261 }
1262
1263 status = lttng_condition_buffer_usage_get_session_name(
1264 condition, &condition_session_name);
1265 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
1266
1267 status = lttng_condition_buffer_usage_get_channel_name(
1268 condition, &condition_channel_name);
1269 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_channel_name);
1270
1271 if (strcmp(channel_info->session_info->name, condition_session_name)) {
1272 goto fail;
1273 }
1274 if (strcmp(channel_info->name, condition_channel_name)) {
1275 goto fail;
1276 }
1277
1278 return true;
1279 fail:
1280 return false;
1281 }
1282
1283 static
1284 bool session_consumed_size_condition_applies_to_channel(
1285 const struct lttng_condition *condition,
1286 const struct channel_info *channel_info)
1287 {
1288 enum lttng_condition_status status;
1289 const char *condition_session_name = NULL;
1290
1291 status = lttng_condition_session_consumed_size_get_session_name(
1292 condition, &condition_session_name);
1293 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
1294
1295 if (strcmp(channel_info->session_info->name, condition_session_name)) {
1296 goto fail;
1297 }
1298
1299 return true;
1300 fail:
1301 return false;
1302 }
1303
1304 static
1305 bool trigger_applies_to_channel(const struct lttng_trigger *trigger,
1306 const struct channel_info *channel_info)
1307 {
1308 const struct lttng_condition *condition;
1309 bool trigger_applies;
1310
1311 condition = lttng_trigger_get_const_condition(trigger);
1312 if (!condition) {
1313 goto fail;
1314 }
1315
1316 switch (lttng_condition_get_type(condition)) {
1317 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
1318 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
1319 trigger_applies = buffer_usage_condition_applies_to_channel(
1320 condition, channel_info);
1321 break;
1322 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
1323 trigger_applies = session_consumed_size_condition_applies_to_channel(
1324 condition, channel_info);
1325 break;
1326 default:
1327 goto fail;
1328 }
1329
1330 return trigger_applies;
1331 fail:
1332 return false;
1333 }
1334
1335 static
1336 bool trigger_applies_to_client(struct lttng_trigger *trigger,
1337 struct notification_client *client)
1338 {
1339 bool applies = false;
1340 struct lttng_condition_list_element *condition_list_element;
1341
1342 cds_list_for_each_entry(condition_list_element, &client->condition_list,
1343 node) {
1344 applies = lttng_condition_is_equal(
1345 condition_list_element->condition,
1346 lttng_trigger_get_condition(trigger));
1347 if (applies) {
1348 break;
1349 }
1350 }
1351 return applies;
1352 }
1353
1354 /* Must be called with RCU read lock held. */
1355 static
1356 struct lttng_session_trigger_list *get_session_trigger_list(
1357 struct notification_thread_state *state,
1358 const char *session_name)
1359 {
1360 struct lttng_session_trigger_list *list = NULL;
1361 struct cds_lfht_node *node;
1362 struct cds_lfht_iter iter;
1363
1364 cds_lfht_lookup(state->session_triggers_ht,
1365 hash_key_str(session_name, lttng_ht_seed),
1366 match_session_trigger_list,
1367 session_name,
1368 &iter);
1369 node = cds_lfht_iter_get_node(&iter);
1370 if (!node) {
1371 /*
1372 * Not an error, the list of triggers applying to that session
1373 * will be initialized when the session is created.
1374 */
1375 DBG("[notification-thread] No trigger list found for session \"%s\" as it is not yet known to the notification system",
1376 session_name);
1377 goto end;
1378 }
1379
1380 list = caa_container_of(node,
1381 struct lttng_session_trigger_list,
1382 session_triggers_ht_node);
1383 end:
1384 return list;
1385 }
1386
1387 /*
1388 * Allocate an empty lttng_session_trigger_list for the session named
1389 * 'session_name'.
1390 *
1391 * No ownership of 'session_name' is assumed by the session trigger list.
1392 * It is the caller's responsability to ensure the session name is alive
1393 * for as long as this list is.
1394 */
1395 static
1396 struct lttng_session_trigger_list *lttng_session_trigger_list_create(
1397 const char *session_name,
1398 struct cds_lfht *session_triggers_ht)
1399 {
1400 struct lttng_session_trigger_list *list;
1401
1402 list = zmalloc(sizeof(*list));
1403 if (!list) {
1404 goto end;
1405 }
1406 list->session_name = session_name;
1407 CDS_INIT_LIST_HEAD(&list->list);
1408 cds_lfht_node_init(&list->session_triggers_ht_node);
1409 list->session_triggers_ht = session_triggers_ht;
1410
1411 rcu_read_lock();
1412 /* Publish the list through the session_triggers_ht. */
1413 cds_lfht_add(session_triggers_ht,
1414 hash_key_str(session_name, lttng_ht_seed),
1415 &list->session_triggers_ht_node);
1416 rcu_read_unlock();
1417 end:
1418 return list;
1419 }
1420
1421 static
1422 void free_session_trigger_list_rcu(struct rcu_head *node)
1423 {
1424 free(caa_container_of(node, struct lttng_session_trigger_list,
1425 rcu_node));
1426 }
1427
1428 static
1429 void lttng_session_trigger_list_destroy(struct lttng_session_trigger_list *list)
1430 {
1431 struct lttng_trigger_list_element *trigger_list_element, *tmp;
1432
1433 /* Empty the list element by element, and then free the list itself. */
1434 cds_list_for_each_entry_safe(trigger_list_element, tmp,
1435 &list->list, node) {
1436 cds_list_del(&trigger_list_element->node);
1437 free(trigger_list_element);
1438 }
1439 rcu_read_lock();
1440 /* Unpublish the list from the session_triggers_ht. */
1441 cds_lfht_del(list->session_triggers_ht,
1442 &list->session_triggers_ht_node);
1443 rcu_read_unlock();
1444 call_rcu(&list->rcu_node, free_session_trigger_list_rcu);
1445 }
1446
1447 static
1448 int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
1449 struct lttng_trigger *trigger)
1450 {
1451 int ret = 0;
1452 struct lttng_trigger_list_element *new_element =
1453 zmalloc(sizeof(*new_element));
1454
1455 if (!new_element) {
1456 ret = -1;
1457 goto end;
1458 }
1459 CDS_INIT_LIST_HEAD(&new_element->node);
1460 new_element->trigger = trigger;
1461 cds_list_add(&new_element->node, &list->list);
1462 end:
1463 return ret;
1464 }
1465
1466 static
1467 bool trigger_applies_to_session(const struct lttng_trigger *trigger,
1468 const char *session_name)
1469 {
1470 bool applies = false;
1471 const struct lttng_condition *condition;
1472
1473 condition = lttng_trigger_get_const_condition(trigger);
1474 switch (lttng_condition_get_type(condition)) {
1475 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
1476 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
1477 {
1478 enum lttng_condition_status condition_status;
1479 const char *condition_session_name;
1480
1481 condition_status = lttng_condition_session_rotation_get_session_name(
1482 condition, &condition_session_name);
1483 if (condition_status != LTTNG_CONDITION_STATUS_OK) {
1484 ERR("[notification-thread] Failed to retrieve session rotation condition's session name");
1485 goto end;
1486 }
1487
1488 assert(condition_session_name);
1489 applies = !strcmp(condition_session_name, session_name);
1490 break;
1491 }
1492 default:
1493 goto end;
1494 }
1495 end:
1496 return applies;
1497 }
1498
1499 /*
1500 * Allocate and initialize an lttng_session_trigger_list which contains
1501 * all triggers that apply to the session named 'session_name'.
1502 *
1503 * No ownership of 'session_name' is assumed by the session trigger list.
1504 * It is the caller's responsability to ensure the session name is alive
1505 * for as long as this list is.
1506 */
1507 static
1508 struct lttng_session_trigger_list *lttng_session_trigger_list_build(
1509 const struct notification_thread_state *state,
1510 const char *session_name)
1511 {
1512 int trigger_count = 0;
1513 struct lttng_session_trigger_list *session_trigger_list = NULL;
1514 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1515 struct cds_lfht_iter iter;
1516
1517 session_trigger_list = lttng_session_trigger_list_create(session_name,
1518 state->session_triggers_ht);
1519
1520 /* Add all triggers applying to the session named 'session_name'. */
1521 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1522 node) {
1523 int ret;
1524
1525 if (!trigger_applies_to_session(trigger_ht_element->trigger,
1526 session_name)) {
1527 continue;
1528 }
1529
1530 ret = lttng_session_trigger_list_add(session_trigger_list,
1531 trigger_ht_element->trigger);
1532 if (ret) {
1533 goto error;
1534 }
1535
1536 trigger_count++;
1537 }
1538
1539 DBG("[notification-thread] Found %i triggers that apply to newly created session",
1540 trigger_count);
1541 return session_trigger_list;
1542 error:
1543 lttng_session_trigger_list_destroy(session_trigger_list);
1544 return NULL;
1545 }
1546
1547 static
1548 struct session_info *find_or_create_session_info(
1549 struct notification_thread_state *state,
1550 const char *name, uid_t uid, gid_t gid)
1551 {
1552 struct session_info *session = NULL;
1553 struct cds_lfht_node *node;
1554 struct cds_lfht_iter iter;
1555 struct lttng_session_trigger_list *trigger_list;
1556
1557 rcu_read_lock();
1558 cds_lfht_lookup(state->sessions_ht,
1559 hash_key_str(name, lttng_ht_seed),
1560 match_session,
1561 name,
1562 &iter);
1563 node = cds_lfht_iter_get_node(&iter);
1564 if (node) {
1565 DBG("[notification-thread] Found session info of session \"%s\" (uid = %i, gid = %i)",
1566 name, uid, gid);
1567 session = caa_container_of(node, struct session_info,
1568 sessions_ht_node);
1569 assert(session->uid == uid);
1570 assert(session->gid == gid);
1571 session_info_get(session);
1572 goto end;
1573 }
1574
1575 trigger_list = lttng_session_trigger_list_build(state, name);
1576 if (!trigger_list) {
1577 goto error;
1578 }
1579
1580 session = session_info_create(name, uid, gid, trigger_list,
1581 state->sessions_ht);
1582 if (!session) {
1583 ERR("[notification-thread] Failed to allocation session info for session \"%s\" (uid = %i, gid = %i)",
1584 name, uid, gid);
1585 lttng_session_trigger_list_destroy(trigger_list);
1586 goto error;
1587 }
1588 trigger_list = NULL;
1589
1590 cds_lfht_add(state->sessions_ht, hash_key_str(name, lttng_ht_seed),
1591 &session->sessions_ht_node);
1592 end:
1593 rcu_read_unlock();
1594 return session;
1595 error:
1596 rcu_read_unlock();
1597 session_info_put(session);
1598 return NULL;
1599 }
1600
1601 static
1602 int handle_notification_thread_command_add_channel(
1603 struct notification_thread_state *state,
1604 const char *session_name, uid_t session_uid, gid_t session_gid,
1605 const char *channel_name, enum lttng_domain_type channel_domain,
1606 uint64_t channel_key_int, uint64_t channel_capacity,
1607 enum lttng_error_code *cmd_result)
1608 {
1609 struct cds_list_head trigger_list;
1610 struct channel_info *new_channel_info = NULL;
1611 struct channel_key channel_key = {
1612 .key = channel_key_int,
1613 .domain = channel_domain,
1614 };
1615 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
1616 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1617 int trigger_count = 0;
1618 struct cds_lfht_iter iter;
1619 struct session_info *session_info = NULL;
1620
1621 DBG("[notification-thread] Adding channel %s from session %s, channel key = %" PRIu64 " in %s domain",
1622 channel_name, session_name, channel_key_int,
1623 channel_domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
1624
1625 CDS_INIT_LIST_HEAD(&trigger_list);
1626
1627 session_info = find_or_create_session_info(state, session_name,
1628 session_uid, session_gid);
1629 if (!session_info) {
1630 /* Allocation error or an internal error occurred. */
1631 goto error;
1632 }
1633
1634 new_channel_info = channel_info_create(channel_name, &channel_key,
1635 channel_capacity, session_info);
1636 if (!new_channel_info) {
1637 goto error;
1638 }
1639
1640 rcu_read_lock();
1641 /* Build a list of all triggers applying to the new channel. */
1642 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1643 node) {
1644 struct lttng_trigger_list_element *new_element;
1645
1646 if (!trigger_applies_to_channel(trigger_ht_element->trigger,
1647 new_channel_info)) {
1648 continue;
1649 }
1650
1651 new_element = zmalloc(sizeof(*new_element));
1652 if (!new_element) {
1653 rcu_read_unlock();
1654 goto error;
1655 }
1656 CDS_INIT_LIST_HEAD(&new_element->node);
1657 new_element->trigger = trigger_ht_element->trigger;
1658 cds_list_add(&new_element->node, &trigger_list);
1659 trigger_count++;
1660 }
1661 rcu_read_unlock();
1662
1663 DBG("[notification-thread] Found %i triggers that apply to newly added channel",
1664 trigger_count);
1665 channel_trigger_list = zmalloc(sizeof(*channel_trigger_list));
1666 if (!channel_trigger_list) {
1667 goto error;
1668 }
1669 channel_trigger_list->channel_key = new_channel_info->key;
1670 CDS_INIT_LIST_HEAD(&channel_trigger_list->list);
1671 cds_lfht_node_init(&channel_trigger_list->channel_triggers_ht_node);
1672 cds_list_splice(&trigger_list, &channel_trigger_list->list);
1673
1674 rcu_read_lock();
1675 /* Add channel to the channel_ht which owns the channel_infos. */
1676 cds_lfht_add(state->channels_ht,
1677 hash_channel_key(&new_channel_info->key),
1678 &new_channel_info->channels_ht_node);
1679 /*
1680 * Add the list of triggers associated with this channel to the
1681 * channel_triggers_ht.
1682 */
1683 cds_lfht_add(state->channel_triggers_ht,
1684 hash_channel_key(&new_channel_info->key),
1685 &channel_trigger_list->channel_triggers_ht_node);
1686 rcu_read_unlock();
1687 session_info_put(session_info);
1688 *cmd_result = LTTNG_OK;
1689 return 0;
1690 error:
1691 channel_info_destroy(new_channel_info);
1692 session_info_put(session_info);
1693 return 1;
1694 }
1695
1696 static
1697 void free_channel_trigger_list_rcu(struct rcu_head *node)
1698 {
1699 free(caa_container_of(node, struct lttng_channel_trigger_list,
1700 rcu_node));
1701 }
1702
1703 static
1704 void free_channel_state_sample_rcu(struct rcu_head *node)
1705 {
1706 free(caa_container_of(node, struct channel_state_sample,
1707 rcu_node));
1708 }
1709
1710 static
1711 int handle_notification_thread_command_remove_channel(
1712 struct notification_thread_state *state,
1713 uint64_t channel_key, enum lttng_domain_type domain,
1714 enum lttng_error_code *cmd_result)
1715 {
1716 struct cds_lfht_node *node;
1717 struct cds_lfht_iter iter;
1718 struct lttng_channel_trigger_list *trigger_list;
1719 struct lttng_trigger_list_element *trigger_list_element, *tmp;
1720 struct channel_key key = { .key = channel_key, .domain = domain };
1721 struct channel_info *channel_info;
1722
1723 DBG("[notification-thread] Removing channel key = %" PRIu64 " in %s domain",
1724 channel_key, domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
1725
1726 rcu_read_lock();
1727
1728 cds_lfht_lookup(state->channel_triggers_ht,
1729 hash_channel_key(&key),
1730 match_channel_trigger_list,
1731 &key,
1732 &iter);
1733 node = cds_lfht_iter_get_node(&iter);
1734 /*
1735 * There is a severe internal error if we are being asked to remove a
1736 * channel that doesn't exist.
1737 */
1738 if (!node) {
1739 ERR("[notification-thread] Channel being removed is unknown to the notification thread");
1740 goto end;
1741 }
1742
1743 /* Free the list of triggers associated with this channel. */
1744 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
1745 channel_triggers_ht_node);
1746 cds_list_for_each_entry_safe(trigger_list_element, tmp,
1747 &trigger_list->list, node) {
1748 cds_list_del(&trigger_list_element->node);
1749 free(trigger_list_element);
1750 }
1751 cds_lfht_del(state->channel_triggers_ht, node);
1752 call_rcu(&trigger_list->rcu_node, free_channel_trigger_list_rcu);
1753
1754 /* Free sampled channel state. */
1755 cds_lfht_lookup(state->channel_state_ht,
1756 hash_channel_key(&key),
1757 match_channel_state_sample,
1758 &key,
1759 &iter);
1760 node = cds_lfht_iter_get_node(&iter);
1761 /*
1762 * This is expected to be NULL if the channel is destroyed before we
1763 * received a sample.
1764 */
1765 if (node) {
1766 struct channel_state_sample *sample = caa_container_of(node,
1767 struct channel_state_sample,
1768 channel_state_ht_node);
1769
1770 cds_lfht_del(state->channel_state_ht, node);
1771 call_rcu(&sample->rcu_node, free_channel_state_sample_rcu);
1772 }
1773
1774 /* Remove the channel from the channels_ht and free it. */
1775 cds_lfht_lookup(state->channels_ht,
1776 hash_channel_key(&key),
1777 match_channel_info,
1778 &key,
1779 &iter);
1780 node = cds_lfht_iter_get_node(&iter);
1781 assert(node);
1782 channel_info = caa_container_of(node, struct channel_info,
1783 channels_ht_node);
1784 cds_lfht_del(state->channels_ht, node);
1785 channel_info_destroy(channel_info);
1786 end:
1787 rcu_read_unlock();
1788 *cmd_result = LTTNG_OK;
1789 return 0;
1790 }
1791
1792 static
1793 int handle_notification_thread_command_session_rotation(
1794 struct notification_thread_state *state,
1795 enum notification_thread_command_type cmd_type,
1796 const char *session_name, uid_t session_uid, gid_t session_gid,
1797 uint64_t trace_archive_chunk_id,
1798 struct lttng_trace_archive_location *location,
1799 enum lttng_error_code *_cmd_result)
1800 {
1801 int ret = 0;
1802 enum lttng_error_code cmd_result = LTTNG_OK;
1803 struct lttng_session_trigger_list *trigger_list;
1804 struct lttng_trigger_list_element *trigger_list_element;
1805 struct session_info *session_info;
1806 const struct lttng_credentials session_creds = {
1807 .uid = session_uid,
1808 .gid = session_gid,
1809 };
1810
1811 rcu_read_lock();
1812
1813 session_info = find_or_create_session_info(state, session_name,
1814 session_uid, session_gid);
1815 if (!session_info) {
1816 /* Allocation error or an internal error occurred. */
1817 ret = -1;
1818 cmd_result = LTTNG_ERR_NOMEM;
1819 goto end;
1820 }
1821
1822 session_info->rotation.ongoing =
1823 cmd_type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING;
1824 session_info->rotation.id = trace_archive_chunk_id;
1825 trigger_list = get_session_trigger_list(state, session_name);
1826 if (!trigger_list) {
1827 DBG("[notification-thread] No triggers applying to session \"%s\" found",
1828 session_name);
1829 goto end;
1830 }
1831
1832 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
1833 node) {
1834 const struct lttng_condition *condition;
1835 const struct lttng_action *action;
1836 struct lttng_trigger *trigger;
1837 struct notification_client_list *client_list;
1838 struct lttng_evaluation *evaluation = NULL;
1839 enum lttng_condition_type condition_type;
1840 bool client_list_is_empty;
1841 enum action_executor_status executor_status;
1842
1843 trigger = trigger_list_element->trigger;
1844 condition = lttng_trigger_get_const_condition(trigger);
1845 assert(condition);
1846 condition_type = lttng_condition_get_type(condition);
1847
1848 if (condition_type == LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING &&
1849 cmd_type != NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING) {
1850 continue;
1851 } else if (condition_type == LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED &&
1852 cmd_type != NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED) {
1853 continue;
1854 }
1855
1856 action = lttng_trigger_get_const_action(trigger);
1857
1858 /* Notify actions are the only type currently supported. */
1859 assert(lttng_action_get_type_const(action) ==
1860 LTTNG_ACTION_TYPE_NOTIFY);
1861
1862 client_list = get_client_list_from_condition(state, condition);
1863 assert(client_list);
1864
1865 pthread_mutex_lock(&client_list->lock);
1866 client_list_is_empty = cds_list_empty(&client_list->list);
1867 pthread_mutex_unlock(&client_list->lock);
1868 if (client_list_is_empty) {
1869 /*
1870 * No clients interested in the evaluation's result,
1871 * skip it.
1872 */
1873 continue;
1874 }
1875
1876 if (cmd_type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING) {
1877 evaluation = lttng_evaluation_session_rotation_ongoing_create(
1878 trace_archive_chunk_id);
1879 } else {
1880 evaluation = lttng_evaluation_session_rotation_completed_create(
1881 trace_archive_chunk_id, location);
1882 }
1883
1884 if (!evaluation) {
1885 /* Internal error */
1886 ret = -1;
1887 cmd_result = LTTNG_ERR_UNK;
1888 goto put_list;
1889 }
1890
1891 /*
1892 * Ownership of `evaluation` transferred to the action executor
1893 * no matter the result.
1894 */
1895 executor_status = action_executor_enqueue(state->executor,
1896 trigger, evaluation, &session_creds,
1897 client_list);
1898 evaluation = NULL;
1899 switch (executor_status) {
1900 case ACTION_EXECUTOR_STATUS_OK:
1901 break;
1902 case ACTION_EXECUTOR_STATUS_ERROR:
1903 case ACTION_EXECUTOR_STATUS_INVALID:
1904 /*
1905 * TODO Add trigger identification (name/id) when
1906 * it is added to the API.
1907 */
1908 ERR("Fatal error occurred while enqueuing action associated with session rotation trigger");
1909 ret = -1;
1910 goto put_list;
1911 case ACTION_EXECUTOR_STATUS_OVERFLOW:
1912 /*
1913 * TODO Add trigger identification (name/id) when
1914 * it is added to the API.
1915 *
1916 * Not a fatal error.
1917 */
1918 WARN("No space left when enqueuing action associated with session rotation trigger");
1919 ret = 0;
1920 goto put_list;
1921 default:
1922 abort();
1923 }
1924
1925 put_list:
1926 notification_client_list_put(client_list);
1927 if (caa_unlikely(ret)) {
1928 break;
1929 }
1930 }
1931 end:
1932 session_info_put(session_info);
1933 *_cmd_result = cmd_result;
1934 rcu_read_unlock();
1935 return ret;
1936 }
1937
1938 static
1939 int condition_is_supported(struct lttng_condition *condition)
1940 {
1941 int ret;
1942
1943 switch (lttng_condition_get_type(condition)) {
1944 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
1945 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
1946 {
1947 enum lttng_domain_type domain;
1948
1949 ret = lttng_condition_buffer_usage_get_domain_type(condition,
1950 &domain);
1951 if (ret) {
1952 ret = -1;
1953 goto end;
1954 }
1955
1956 if (domain != LTTNG_DOMAIN_KERNEL) {
1957 ret = 1;
1958 goto end;
1959 }
1960
1961 /*
1962 * Older kernel tracers don't expose the API to monitor their
1963 * buffers. Therefore, we reject triggers that require that
1964 * mechanism to be available to be evaluated.
1965 */
1966 ret = kernel_supports_ring_buffer_snapshot_sample_positions();
1967 break;
1968 }
1969 default:
1970 ret = 1;
1971 }
1972 end:
1973 return ret;
1974 }
1975
1976 /* Must be called with RCU read lock held. */
1977 static
1978 int bind_trigger_to_matching_session(struct lttng_trigger *trigger,
1979 struct notification_thread_state *state)
1980 {
1981 int ret = 0;
1982 const struct lttng_condition *condition;
1983 const char *session_name;
1984 struct lttng_session_trigger_list *trigger_list;
1985
1986 condition = lttng_trigger_get_const_condition(trigger);
1987 switch (lttng_condition_get_type(condition)) {
1988 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
1989 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
1990 {
1991 enum lttng_condition_status status;
1992
1993 status = lttng_condition_session_rotation_get_session_name(
1994 condition, &session_name);
1995 if (status != LTTNG_CONDITION_STATUS_OK) {
1996 ERR("[notification-thread] Failed to bind trigger to session: unable to get 'session_rotation' condition's session name");
1997 ret = -1;
1998 goto end;
1999 }
2000 break;
2001 }
2002 default:
2003 ret = -1;
2004 goto end;
2005 }
2006
2007 trigger_list = get_session_trigger_list(state, session_name);
2008 if (!trigger_list) {
2009 DBG("[notification-thread] Unable to bind trigger applying to session \"%s\" as it is not yet known to the notification system",
2010 session_name);
2011 goto end;
2012
2013 }
2014
2015 DBG("[notification-thread] Newly registered trigger bound to session \"%s\"",
2016 session_name);
2017 ret = lttng_session_trigger_list_add(trigger_list, trigger);
2018 end:
2019 return ret;
2020 }
2021
2022 /* Must be called with RCU read lock held. */
2023 static
2024 int bind_trigger_to_matching_channels(struct lttng_trigger *trigger,
2025 struct notification_thread_state *state)
2026 {
2027 int ret = 0;
2028 struct cds_lfht_node *node;
2029 struct cds_lfht_iter iter;
2030 struct channel_info *channel;
2031
2032 cds_lfht_for_each_entry(state->channels_ht, &iter, channel,
2033 channels_ht_node) {
2034 struct lttng_trigger_list_element *trigger_list_element;
2035 struct lttng_channel_trigger_list *trigger_list;
2036 struct cds_lfht_iter lookup_iter;
2037
2038 if (!trigger_applies_to_channel(trigger, channel)) {
2039 continue;
2040 }
2041
2042 cds_lfht_lookup(state->channel_triggers_ht,
2043 hash_channel_key(&channel->key),
2044 match_channel_trigger_list,
2045 &channel->key,
2046 &lookup_iter);
2047 node = cds_lfht_iter_get_node(&lookup_iter);
2048 assert(node);
2049 trigger_list = caa_container_of(node,
2050 struct lttng_channel_trigger_list,
2051 channel_triggers_ht_node);
2052
2053 trigger_list_element = zmalloc(sizeof(*trigger_list_element));
2054 if (!trigger_list_element) {
2055 ret = -1;
2056 goto end;
2057 }
2058 CDS_INIT_LIST_HEAD(&trigger_list_element->node);
2059 trigger_list_element->trigger = trigger;
2060 cds_list_add(&trigger_list_element->node, &trigger_list->list);
2061 DBG("[notification-thread] Newly registered trigger bound to channel \"%s\"",
2062 channel->name);
2063 }
2064 end:
2065 return ret;
2066 }
2067
2068 /*
2069 * FIXME A client's credentials are not checked when registering a trigger, nor
2070 * are they stored alongside with the trigger.
2071 *
2072 * The effects of this are benign since:
2073 * - The client will succeed in registering the trigger, as it is valid,
2074 * - The trigger will, internally, be bound to the channel/session,
2075 * - The notifications will not be sent since the client's credentials
2076 * are checked against the channel at that moment.
2077 *
2078 * If this function returns a non-zero value, it means something is
2079 * fundamentally broken and the whole subsystem/thread will be torn down.
2080 *
2081 * If a non-fatal error occurs, just set the cmd_result to the appropriate
2082 * error code.
2083 */
2084 static
2085 int handle_notification_thread_command_register_trigger(
2086 struct notification_thread_state *state,
2087 struct lttng_trigger *trigger,
2088 enum lttng_error_code *cmd_result)
2089 {
2090 int ret = 0;
2091 struct lttng_condition *condition;
2092 struct notification_client *client;
2093 struct notification_client_list *client_list = NULL;
2094 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
2095 struct notification_client_list_element *client_list_element, *tmp;
2096 struct cds_lfht_node *node;
2097 struct cds_lfht_iter iter;
2098 bool free_trigger = true;
2099
2100 rcu_read_lock();
2101
2102 condition = lttng_trigger_get_condition(trigger);
2103 assert(condition);
2104
2105 ret = condition_is_supported(condition);
2106 if (ret < 0) {
2107 goto error;
2108 } else if (ret == 0) {
2109 *cmd_result = LTTNG_ERR_NOT_SUPPORTED;
2110 goto error;
2111 } else {
2112 /* Feature is supported, continue. */
2113 ret = 0;
2114 }
2115
2116 trigger_ht_element = zmalloc(sizeof(*trigger_ht_element));
2117 if (!trigger_ht_element) {
2118 ret = -1;
2119 goto error;
2120 }
2121
2122 /* Add trigger to the trigger_ht. */
2123 cds_lfht_node_init(&trigger_ht_element->node);
2124 trigger_ht_element->trigger = trigger;
2125
2126 node = cds_lfht_add_unique(state->triggers_ht,
2127 lttng_condition_hash(condition),
2128 match_condition,
2129 condition,
2130 &trigger_ht_element->node);
2131 if (node != &trigger_ht_element->node) {
2132 /* Not a fatal error, simply report it to the client. */
2133 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
2134 goto error_free_ht_element;
2135 }
2136
2137 /*
2138 * Ownership of the trigger and of its wrapper was transfered to
2139 * the triggers_ht.
2140 */
2141 trigger_ht_element = NULL;
2142 free_trigger = false;
2143
2144 /*
2145 * The rest only applies to triggers that have a "notify" action.
2146 * It is not skipped as this is the only action type currently
2147 * supported.
2148 */
2149 client_list = notification_client_list_create(trigger);
2150 if (!client_list) {
2151 ret = -1;
2152 goto error_free_ht_element;
2153 }
2154
2155 /* Build a list of clients to which this new trigger applies. */
2156 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
2157 client_socket_ht_node) {
2158 if (!trigger_applies_to_client(trigger, client)) {
2159 continue;
2160 }
2161
2162 client_list_element = zmalloc(sizeof(*client_list_element));
2163 if (!client_list_element) {
2164 ret = -1;
2165 goto error_put_client_list;
2166 }
2167 CDS_INIT_LIST_HEAD(&client_list_element->node);
2168 client_list_element->client = client;
2169 cds_list_add(&client_list_element->node, &client_list->list);
2170 }
2171
2172 switch (get_condition_binding_object(condition)) {
2173 case LTTNG_OBJECT_TYPE_SESSION:
2174 /* Add the trigger to the list if it matches a known session. */
2175 ret = bind_trigger_to_matching_session(trigger, state);
2176 if (ret) {
2177 goto error_put_client_list;
2178 }
2179 break;
2180 case LTTNG_OBJECT_TYPE_CHANNEL:
2181 /*
2182 * Add the trigger to list of triggers bound to the channels
2183 * currently known.
2184 */
2185 ret = bind_trigger_to_matching_channels(trigger, state);
2186 if (ret) {
2187 goto error_put_client_list;
2188 }
2189 break;
2190 case LTTNG_OBJECT_TYPE_NONE:
2191 break;
2192 default:
2193 ERR("[notification-thread] Unknown object type on which to bind a newly registered trigger was encountered");
2194 ret = -1;
2195 goto error_put_client_list;
2196 }
2197
2198 /*
2199 * Since there is nothing preventing clients from subscribing to a
2200 * condition before the corresponding trigger is registered, we have
2201 * to evaluate this new condition right away.
2202 *
2203 * At some point, we were waiting for the next "evaluation" (e.g. on
2204 * reception of a channel sample) to evaluate this new condition, but
2205 * that was broken.
2206 *
2207 * The reason it was broken is that waiting for the next sample
2208 * does not allow us to properly handle transitions for edge-triggered
2209 * conditions.
2210 *
2211 * Consider this example: when we handle a new channel sample, we
2212 * evaluate each conditions twice: once with the previous state, and
2213 * again with the newest state. We then use those two results to
2214 * determine whether a state change happened: a condition was false and
2215 * became true. If a state change happened, we have to notify clients.
2216 *
2217 * Now, if a client subscribes to a given notification and registers
2218 * a trigger *after* that subscription, we have to make sure the
2219 * condition is evaluated at this point while considering only the
2220 * current state. Otherwise, the next evaluation cycle may only see
2221 * that the evaluations remain the same (true for samples n-1 and n) and
2222 * the client will never know that the condition has been met.
2223 *
2224 * No need to lock the list here as it has not been published yet.
2225 */
2226 cds_list_for_each_entry_safe(client_list_element, tmp,
2227 &client_list->list, node) {
2228 ret = evaluate_condition_for_client(trigger, condition,
2229 client_list_element->client, state);
2230 if (ret) {
2231 goto error_put_client_list;
2232 }
2233 }
2234
2235 /*
2236 * Client list ownership transferred to the
2237 * notification_trigger_clients_ht.
2238 */
2239 publish_notification_client_list(state, client_list);
2240 client_list = NULL;
2241
2242 *cmd_result = LTTNG_OK;
2243
2244 error_put_client_list:
2245 notification_client_list_put(client_list);
2246
2247 error_free_ht_element:
2248 free(trigger_ht_element);
2249 error:
2250 if (free_trigger) {
2251 lttng_trigger_destroy(trigger);
2252 }
2253 rcu_read_unlock();
2254 return ret;
2255 }
2256
2257 static
2258 void free_lttng_trigger_ht_element_rcu(struct rcu_head *node)
2259 {
2260 free(caa_container_of(node, struct lttng_trigger_ht_element,
2261 rcu_node));
2262 }
2263
2264 static
2265 int handle_notification_thread_command_unregister_trigger(
2266 struct notification_thread_state *state,
2267 struct lttng_trigger *trigger,
2268 enum lttng_error_code *_cmd_reply)
2269 {
2270 struct cds_lfht_iter iter;
2271 struct cds_lfht_node *triggers_ht_node;
2272 struct lttng_channel_trigger_list *trigger_list;
2273 struct notification_client_list *client_list;
2274 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
2275 struct lttng_condition *condition = lttng_trigger_get_condition(
2276 trigger);
2277 enum lttng_error_code cmd_reply;
2278
2279 rcu_read_lock();
2280
2281 cds_lfht_lookup(state->triggers_ht,
2282 lttng_condition_hash(condition),
2283 match_condition,
2284 condition,
2285 &iter);
2286 triggers_ht_node = cds_lfht_iter_get_node(&iter);
2287 if (!triggers_ht_node) {
2288 cmd_reply = LTTNG_ERR_TRIGGER_NOT_FOUND;
2289 goto end;
2290 } else {
2291 cmd_reply = LTTNG_OK;
2292 }
2293
2294 /* Remove trigger from channel_triggers_ht. */
2295 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter, trigger_list,
2296 channel_triggers_ht_node) {
2297 struct lttng_trigger_list_element *trigger_element, *tmp;
2298
2299 cds_list_for_each_entry_safe(trigger_element, tmp,
2300 &trigger_list->list, node) {
2301 const struct lttng_condition *current_condition =
2302 lttng_trigger_get_const_condition(
2303 trigger_element->trigger);
2304
2305 assert(current_condition);
2306 if (!lttng_condition_is_equal(condition,
2307 current_condition)) {
2308 continue;
2309 }
2310
2311 DBG("[notification-thread] Removed trigger from channel_triggers_ht");
2312 cds_list_del(&trigger_element->node);
2313 /* A trigger can only appear once per channel */
2314 break;
2315 }
2316 }
2317
2318 /*
2319 * Remove and release the client list from
2320 * notification_trigger_clients_ht.
2321 */
2322 client_list = get_client_list_from_condition(state, condition);
2323 assert(client_list);
2324
2325 /* Put new reference and the hashtable's reference. */
2326 notification_client_list_put(client_list);
2327 notification_client_list_put(client_list);
2328 client_list = NULL;
2329
2330 /* Remove trigger from triggers_ht. */
2331 trigger_ht_element = caa_container_of(triggers_ht_node,
2332 struct lttng_trigger_ht_element, node);
2333 cds_lfht_del(state->triggers_ht, triggers_ht_node);
2334
2335 /* Release the ownership of the trigger. */
2336 lttng_trigger_destroy(trigger_ht_element->trigger);
2337 call_rcu(&trigger_ht_element->rcu_node, free_lttng_trigger_ht_element_rcu);
2338 end:
2339 rcu_read_unlock();
2340 if (_cmd_reply) {
2341 *_cmd_reply = cmd_reply;
2342 }
2343 return 0;
2344 }
2345
2346 /* Returns 0 on success, 1 on exit requested, negative value on error. */
2347 int handle_notification_thread_command(
2348 struct notification_thread_handle *handle,
2349 struct notification_thread_state *state)
2350 {
2351 int ret;
2352 uint64_t counter;
2353 struct notification_thread_command *cmd;
2354
2355 /* Read the event pipe to put it back into a quiescent state. */
2356 ret = lttng_read(lttng_pipe_get_readfd(handle->cmd_queue.event_pipe), &counter,
2357 sizeof(counter));
2358 if (ret != sizeof(counter)) {
2359 goto error;
2360 }
2361
2362 pthread_mutex_lock(&handle->cmd_queue.lock);
2363 cmd = cds_list_first_entry(&handle->cmd_queue.list,
2364 struct notification_thread_command, cmd_list_node);
2365 cds_list_del(&cmd->cmd_list_node);
2366 pthread_mutex_unlock(&handle->cmd_queue.lock);
2367 switch (cmd->type) {
2368 case NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER:
2369 DBG("[notification-thread] Received register trigger command");
2370 ret = handle_notification_thread_command_register_trigger(
2371 state, cmd->parameters.trigger,
2372 &cmd->reply_code);
2373 break;
2374 case NOTIFICATION_COMMAND_TYPE_UNREGISTER_TRIGGER:
2375 DBG("[notification-thread] Received unregister trigger command");
2376 ret = handle_notification_thread_command_unregister_trigger(
2377 state, cmd->parameters.trigger,
2378 &cmd->reply_code);
2379 break;
2380 case NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL:
2381 DBG("[notification-thread] Received add channel command");
2382 ret = handle_notification_thread_command_add_channel(
2383 state,
2384 cmd->parameters.add_channel.session.name,
2385 cmd->parameters.add_channel.session.uid,
2386 cmd->parameters.add_channel.session.gid,
2387 cmd->parameters.add_channel.channel.name,
2388 cmd->parameters.add_channel.channel.domain,
2389 cmd->parameters.add_channel.channel.key,
2390 cmd->parameters.add_channel.channel.capacity,
2391 &cmd->reply_code);
2392 break;
2393 case NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL:
2394 DBG("[notification-thread] Received remove channel command");
2395 ret = handle_notification_thread_command_remove_channel(
2396 state, cmd->parameters.remove_channel.key,
2397 cmd->parameters.remove_channel.domain,
2398 &cmd->reply_code);
2399 break;
2400 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING:
2401 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED:
2402 DBG("[notification-thread] Received session rotation %s command",
2403 cmd->type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING ?
2404 "ongoing" : "completed");
2405 ret = handle_notification_thread_command_session_rotation(
2406 state,
2407 cmd->type,
2408 cmd->parameters.session_rotation.session_name,
2409 cmd->parameters.session_rotation.uid,
2410 cmd->parameters.session_rotation.gid,
2411 cmd->parameters.session_rotation.trace_archive_chunk_id,
2412 cmd->parameters.session_rotation.location,
2413 &cmd->reply_code);
2414 break;
2415 case NOTIFICATION_COMMAND_TYPE_QUIT:
2416 DBG("[notification-thread] Received quit command");
2417 cmd->reply_code = LTTNG_OK;
2418 ret = 1;
2419 goto end;
2420 case NOTIFICATION_COMMAND_TYPE_CLIENT_COMMUNICATION_UPDATE:
2421 {
2422 const enum client_transmission_status client_status =
2423 cmd->parameters.client_communication_update
2424 .status;
2425 const notification_client_id client_id =
2426 cmd->parameters.client_communication_update.id;
2427 struct notification_client *client;
2428
2429 rcu_read_lock();
2430 client = get_client_from_id(client_id, state);
2431
2432 if (!client) {
2433 /*
2434 * Client error was probably already picked-up by the
2435 * notification thread or it has disconnected
2436 * gracefully while this command was queued.
2437 */
2438 DBG("Failed to find notification client to update communication status, client id = %" PRIu64,
2439 client_id);
2440 ret = 0;
2441 } else {
2442 pthread_mutex_lock(&client->lock);
2443 ret = client_handle_transmission_status(
2444 client, client_status, state);
2445 pthread_mutex_unlock(&client->lock);
2446 }
2447 rcu_read_unlock();
2448 break;
2449 }
2450 default:
2451 ERR("[notification-thread] Unknown internal command received");
2452 goto error_unlock;
2453 }
2454
2455 if (ret) {
2456 goto error_unlock;
2457 }
2458 end:
2459 if (cmd->is_async) {
2460 free(cmd);
2461 cmd = NULL;
2462 } else {
2463 lttng_waiter_wake_up(&cmd->reply_waiter);
2464 }
2465 return ret;
2466 error_unlock:
2467 /* Wake-up and return a fatal error to the calling thread. */
2468 lttng_waiter_wake_up(&cmd->reply_waiter);
2469 cmd->reply_code = LTTNG_ERR_FATAL;
2470 error:
2471 /* Indicate a fatal error to the caller. */
2472 return -1;
2473 }
2474
2475 static
2476 int socket_set_non_blocking(int socket)
2477 {
2478 int ret, flags;
2479
2480 /* Set the pipe as non-blocking. */
2481 ret = fcntl(socket, F_GETFL, 0);
2482 if (ret == -1) {
2483 PERROR("fcntl get socket flags");
2484 goto end;
2485 }
2486 flags = ret;
2487
2488 ret = fcntl(socket, F_SETFL, flags | O_NONBLOCK);
2489 if (ret == -1) {
2490 PERROR("fcntl set O_NONBLOCK socket flag");
2491 goto end;
2492 }
2493 DBG("Client socket (fd = %i) set as non-blocking", socket);
2494 end:
2495 return ret;
2496 }
2497
2498 /* Client lock must be acquired by caller. */
2499 static
2500 int client_reset_inbound_state(struct notification_client *client)
2501 {
2502 int ret;
2503
2504 ASSERT_LOCKED(client->lock);
2505
2506 ret = lttng_dynamic_buffer_set_size(
2507 &client->communication.inbound.buffer, 0);
2508 assert(!ret);
2509
2510 client->communication.inbound.bytes_to_receive =
2511 sizeof(struct lttng_notification_channel_message);
2512 client->communication.inbound.msg_type =
2513 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN;
2514 LTTNG_SOCK_SET_UID_CRED(&client->communication.inbound.creds, -1);
2515 LTTNG_SOCK_SET_GID_CRED(&client->communication.inbound.creds, -1);
2516 ret = lttng_dynamic_buffer_set_size(
2517 &client->communication.inbound.buffer,
2518 client->communication.inbound.bytes_to_receive);
2519 return ret;
2520 }
2521
2522 int handle_notification_thread_client_connect(
2523 struct notification_thread_state *state)
2524 {
2525 int ret;
2526 struct notification_client *client;
2527
2528 DBG("[notification-thread] Handling new notification channel client connection");
2529
2530 client = zmalloc(sizeof(*client));
2531 if (!client) {
2532 /* Fatal error. */
2533 ret = -1;
2534 goto error;
2535 }
2536 pthread_mutex_init(&client->lock, NULL);
2537 client->id = state->next_notification_client_id++;
2538 CDS_INIT_LIST_HEAD(&client->condition_list);
2539 lttng_dynamic_buffer_init(&client->communication.inbound.buffer);
2540 lttng_dynamic_buffer_init(&client->communication.outbound.buffer);
2541 client->communication.inbound.expect_creds = true;
2542
2543 pthread_mutex_lock(&client->lock);
2544 ret = client_reset_inbound_state(client);
2545 pthread_mutex_unlock(&client->lock);
2546 if (ret) {
2547 ERR("[notification-thread] Failed to reset client communication's inbound state");
2548 ret = 0;
2549 goto error;
2550 }
2551
2552 ret = lttcomm_accept_unix_sock(state->notification_channel_socket);
2553 if (ret < 0) {
2554 ERR("[notification-thread] Failed to accept new notification channel client connection");
2555 ret = 0;
2556 goto error;
2557 }
2558
2559 client->socket = ret;
2560
2561 ret = socket_set_non_blocking(client->socket);
2562 if (ret) {
2563 ERR("[notification-thread] Failed to set new notification channel client connection socket as non-blocking");
2564 goto error;
2565 }
2566
2567 ret = lttcomm_setsockopt_creds_unix_sock(client->socket);
2568 if (ret < 0) {
2569 ERR("[notification-thread] Failed to set socket options on new notification channel client socket");
2570 ret = 0;
2571 goto error;
2572 }
2573
2574 ret = lttng_poll_add(&state->events, client->socket,
2575 LPOLLIN | LPOLLERR |
2576 LPOLLHUP | LPOLLRDHUP);
2577 if (ret < 0) {
2578 ERR("[notification-thread] Failed to add notification channel client socket to poll set");
2579 ret = 0;
2580 goto error;
2581 }
2582 DBG("[notification-thread] Added new notification channel client socket (%i) to poll set",
2583 client->socket);
2584
2585 rcu_read_lock();
2586 cds_lfht_add(state->client_socket_ht,
2587 hash_client_socket(client->socket),
2588 &client->client_socket_ht_node);
2589 cds_lfht_add(state->client_id_ht,
2590 hash_client_id(client->id),
2591 &client->client_id_ht_node);
2592 rcu_read_unlock();
2593
2594 return ret;
2595 error:
2596 notification_client_destroy(client, state);
2597 return ret;
2598 }
2599
2600 /* RCU read-lock must be held by the caller. */
2601 /* Client lock must be held by the caller */
2602 static
2603 int notification_thread_client_disconnect(
2604 struct notification_client *client,
2605 struct notification_thread_state *state)
2606 {
2607 int ret;
2608 struct lttng_condition_list_element *condition_list_element, *tmp;
2609
2610 /* Acquire the client lock to disable its communication atomically. */
2611 client->communication.active = false;
2612 ret = lttng_poll_del(&state->events, client->socket);
2613 if (ret) {
2614 ERR("[notification-thread] Failed to remove client socket %d from poll set",
2615 client->socket);
2616 }
2617
2618 cds_lfht_del(state->client_socket_ht, &client->client_socket_ht_node);
2619 cds_lfht_del(state->client_id_ht, &client->client_id_ht_node);
2620
2621 /* Release all conditions to which the client was subscribed. */
2622 cds_list_for_each_entry_safe(condition_list_element, tmp,
2623 &client->condition_list, node) {
2624 (void) notification_thread_client_unsubscribe(client,
2625 condition_list_element->condition, state, NULL);
2626 }
2627
2628 /*
2629 * Client no longer accessible to other threads (through the
2630 * client lists).
2631 */
2632 notification_client_destroy(client, state);
2633 return ret;
2634 }
2635
2636 int handle_notification_thread_client_disconnect(
2637 int client_socket, struct notification_thread_state *state)
2638 {
2639 int ret = 0;
2640 struct notification_client *client;
2641
2642 rcu_read_lock();
2643 DBG("[notification-thread] Closing client connection (socket fd = %i)",
2644 client_socket);
2645 client = get_client_from_socket(client_socket, state);
2646 if (!client) {
2647 /* Internal state corruption, fatal error. */
2648 ERR("[notification-thread] Unable to find client (socket fd = %i)",
2649 client_socket);
2650 ret = -1;
2651 goto end;
2652 }
2653
2654 pthread_mutex_lock(&client->lock);
2655 ret = notification_thread_client_disconnect(client, state);
2656 pthread_mutex_unlock(&client->lock);
2657 end:
2658 rcu_read_unlock();
2659 return ret;
2660 }
2661
2662 int handle_notification_thread_client_disconnect_all(
2663 struct notification_thread_state *state)
2664 {
2665 struct cds_lfht_iter iter;
2666 struct notification_client *client;
2667 bool error_encoutered = false;
2668
2669 rcu_read_lock();
2670 DBG("[notification-thread] Closing all client connections");
2671 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
2672 client_socket_ht_node) {
2673 int ret;
2674
2675 pthread_mutex_lock(&client->lock);
2676 ret = notification_thread_client_disconnect(
2677 client, state);
2678 pthread_mutex_unlock(&client->lock);
2679 if (ret) {
2680 error_encoutered = true;
2681 }
2682 }
2683 rcu_read_unlock();
2684 return error_encoutered ? 1 : 0;
2685 }
2686
2687 int handle_notification_thread_trigger_unregister_all(
2688 struct notification_thread_state *state)
2689 {
2690 bool error_occurred = false;
2691 struct cds_lfht_iter iter;
2692 struct lttng_trigger_ht_element *trigger_ht_element;
2693
2694 rcu_read_lock();
2695 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
2696 node) {
2697 int ret = handle_notification_thread_command_unregister_trigger(
2698 state, trigger_ht_element->trigger, NULL);
2699 if (ret) {
2700 error_occurred = true;
2701 }
2702 }
2703 rcu_read_unlock();
2704 return error_occurred ? -1 : 0;
2705 }
2706
2707 static
2708 int client_handle_transmission_status(
2709 struct notification_client *client,
2710 enum client_transmission_status transmission_status,
2711 struct notification_thread_state *state)
2712 {
2713 int ret = 0;
2714
2715 switch (transmission_status) {
2716 case CLIENT_TRANSMISSION_STATUS_COMPLETE:
2717 ret = lttng_poll_mod(&state->events, client->socket,
2718 CLIENT_POLL_MASK_IN);
2719 if (ret) {
2720 goto end;
2721 }
2722
2723 client->communication.outbound.queued_command_reply = false;
2724 client->communication.outbound.dropped_notification = false;
2725 break;
2726 case CLIENT_TRANSMISSION_STATUS_QUEUED:
2727 /*
2728 * We want to be notified whenever there is buffer space
2729 * available to send the rest of the payload.
2730 */
2731 ret = lttng_poll_mod(&state->events, client->socket,
2732 CLIENT_POLL_MASK_IN_OUT);
2733 if (ret) {
2734 goto end;
2735 }
2736 break;
2737 case CLIENT_TRANSMISSION_STATUS_FAIL:
2738 ret = notification_thread_client_disconnect(client, state);
2739 if (ret) {
2740 goto end;
2741 }
2742 break;
2743 case CLIENT_TRANSMISSION_STATUS_ERROR:
2744 ret = -1;
2745 goto end;
2746 default:
2747 abort();
2748 }
2749 end:
2750 return ret;
2751 }
2752
2753 /* Client lock must be acquired by caller. */
2754 static
2755 enum client_transmission_status client_flush_outgoing_queue(
2756 struct notification_client *client)
2757 {
2758 ssize_t ret;
2759 size_t to_send_count;
2760 enum client_transmission_status status;
2761
2762 ASSERT_LOCKED(client->lock);
2763
2764 if (!client->communication.active) {
2765 status = CLIENT_TRANSMISSION_STATUS_FAIL;
2766 goto end;
2767 }
2768
2769 assert(client->communication.outbound.buffer.size != 0);
2770 to_send_count = client->communication.outbound.buffer.size;
2771 DBG("[notification-thread] Flushing client (socket fd = %i) outgoing queue",
2772 client->socket);
2773
2774 ret = lttcomm_send_unix_sock_non_block(client->socket,
2775 client->communication.outbound.buffer.data,
2776 to_send_count);
2777 if ((ret >= 0 && ret < to_send_count)) {
2778 DBG("[notification-thread] Client (socket fd = %i) outgoing queue could not be completely flushed",
2779 client->socket);
2780 to_send_count -= max(ret, 0);
2781
2782 memcpy(client->communication.outbound.buffer.data,
2783 client->communication.outbound.buffer.data +
2784 client->communication.outbound.buffer.size - to_send_count,
2785 to_send_count);
2786 ret = lttng_dynamic_buffer_set_size(
2787 &client->communication.outbound.buffer,
2788 to_send_count);
2789 if (ret) {
2790 goto error;
2791 }
2792 status = CLIENT_TRANSMISSION_STATUS_QUEUED;
2793 } else if (ret < 0) {
2794 /* Generic error, disconnect the client. */
2795 ERR("[notification-thread] Failed to flush outgoing queue, disconnecting client (socket fd = %i)",
2796 client->socket);
2797 status = CLIENT_TRANSMISSION_STATUS_FAIL;
2798 } else {
2799 /* No error and flushed the queue completely. */
2800 ret = lttng_dynamic_buffer_set_size(
2801 &client->communication.outbound.buffer, 0);
2802 if (ret) {
2803 goto error;
2804 }
2805 status = CLIENT_TRANSMISSION_STATUS_COMPLETE;
2806 }
2807 end:
2808 return status;
2809 error:
2810 return CLIENT_TRANSMISSION_STATUS_ERROR;
2811 }
2812
2813 /* Client lock must be acquired by caller. */
2814 static
2815 int client_send_command_reply(struct notification_client *client,
2816 struct notification_thread_state *state,
2817 enum lttng_notification_channel_status status)
2818 {
2819 int ret;
2820 struct lttng_notification_channel_command_reply reply = {
2821 .status = (int8_t) status,
2822 };
2823 struct lttng_notification_channel_message msg = {
2824 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY,
2825 .size = sizeof(reply),
2826 };
2827 char buffer[sizeof(msg) + sizeof(reply)];
2828 enum client_transmission_status transmission_status;
2829
2830 ASSERT_LOCKED(client->lock);
2831
2832 if (client->communication.outbound.queued_command_reply) {
2833 /* Protocol error. */
2834 goto error;
2835 }
2836
2837 memcpy(buffer, &msg, sizeof(msg));
2838 memcpy(buffer + sizeof(msg), &reply, sizeof(reply));
2839 DBG("[notification-thread] Send command reply (%i)", (int) status);
2840
2841 /* Enqueue buffer to outgoing queue and flush it. */
2842 ret = lttng_dynamic_buffer_append(
2843 &client->communication.outbound.buffer,
2844 buffer, sizeof(buffer));
2845 if (ret) {
2846 goto error;
2847 }
2848
2849 transmission_status = client_flush_outgoing_queue(client);
2850 ret = client_handle_transmission_status(
2851 client, transmission_status, state);
2852 if (ret) {
2853 goto error;
2854 }
2855
2856 if (client->communication.outbound.buffer.size != 0) {
2857 /* Queue could not be emptied. */
2858 client->communication.outbound.queued_command_reply = true;
2859 }
2860
2861 return 0;
2862 error:
2863 return -1;
2864 }
2865
2866 static
2867 int client_handle_message_unknown(struct notification_client *client,
2868 struct notification_thread_state *state)
2869 {
2870 int ret;
2871
2872 pthread_mutex_lock(&client->lock);
2873
2874 /*
2875 * Receiving message header. The function will be called again
2876 * once the rest of the message as been received and can be
2877 * interpreted.
2878 */
2879 const struct lttng_notification_channel_message *msg;
2880
2881 assert(sizeof(*msg) == client->communication.inbound.buffer.size);
2882 msg = (const struct lttng_notification_channel_message *)
2883 client->communication.inbound.buffer.data;
2884
2885 if (msg->size == 0 ||
2886 msg->size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) {
2887 ERR("[notification-thread] Invalid notification channel message: length = %u",
2888 msg->size);
2889 ret = -1;
2890 goto end;
2891 }
2892
2893 switch (msg->type) {
2894 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
2895 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
2896 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
2897 break;
2898 default:
2899 ret = -1;
2900 ERR("[notification-thread] Invalid notification channel message: unexpected message type");
2901 goto end;
2902 }
2903
2904 client->communication.inbound.bytes_to_receive = msg->size;
2905 client->communication.inbound.msg_type =
2906 (enum lttng_notification_channel_message_type) msg->type;
2907 ret = lttng_dynamic_buffer_set_size(
2908 &client->communication.inbound.buffer, msg->size);
2909 end:
2910 pthread_mutex_unlock(&client->lock);
2911 return ret;
2912 }
2913
2914 static
2915 int client_handle_message_handshake(struct notification_client *client,
2916 struct notification_thread_state *state)
2917 {
2918 int ret;
2919 struct lttng_notification_channel_command_handshake *handshake_client;
2920 const struct lttng_notification_channel_command_handshake handshake_reply = {
2921 .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
2922 .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
2923 };
2924 const struct lttng_notification_channel_message msg_header = {
2925 .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE,
2926 .size = sizeof(handshake_reply),
2927 };
2928 enum lttng_notification_channel_status status =
2929 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
2930 char send_buffer[sizeof(msg_header) + sizeof(handshake_reply)];
2931 enum client_transmission_status transmission_status;
2932
2933 pthread_mutex_lock(&client->lock);
2934
2935 memcpy(send_buffer, &msg_header, sizeof(msg_header));
2936 memcpy(send_buffer + sizeof(msg_header), &handshake_reply,
2937 sizeof(handshake_reply));
2938
2939 handshake_client =
2940 (struct lttng_notification_channel_command_handshake *)
2941 client->communication.inbound.buffer
2942 .data;
2943 client->major = handshake_client->major;
2944 client->minor = handshake_client->minor;
2945 if (!client->communication.inbound.creds_received) {
2946 ERR("[notification-thread] No credentials received from client");
2947 ret = -1;
2948 goto end;
2949 }
2950
2951 client->uid = LTTNG_SOCK_GET_UID_CRED(
2952 &client->communication.inbound.creds);
2953 client->gid = LTTNG_SOCK_GET_GID_CRED(
2954 &client->communication.inbound.creds);
2955 DBG("[notification-thread] Received handshake from client (uid = %u, gid = %u) with version %i.%i",
2956 client->uid, client->gid, (int) client->major,
2957 (int) client->minor);
2958
2959 if (handshake_client->major !=
2960 LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) {
2961 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNSUPPORTED_VERSION;
2962 }
2963
2964 ret = lttng_dynamic_buffer_append(
2965 &client->communication.outbound.buffer, send_buffer,
2966 sizeof(send_buffer));
2967 if (ret) {
2968 ERR("[notification-thread] Failed to send protocol version to notification channel client");
2969 goto end;
2970 }
2971
2972 client->validated = true;
2973 client->communication.active = true;
2974
2975 transmission_status = client_flush_outgoing_queue(client);
2976 ret = client_handle_transmission_status(
2977 client, transmission_status, state);
2978 if (ret) {
2979 goto end;
2980 }
2981
2982 ret = client_send_command_reply(client, state, status);
2983 if (ret) {
2984 ERR("[notification-thread] Failed to send reply to notification channel client");
2985 goto end;
2986 }
2987
2988 /* Set reception state to receive the next message header. */
2989 ret = client_reset_inbound_state(client);
2990 if (ret) {
2991 ERR("[notification-thread] Failed to reset client communication's inbound state");
2992 goto end;
2993 }
2994
2995 end:
2996 pthread_mutex_unlock(&client->lock);
2997 return ret;
2998 }
2999
3000 static
3001 int client_handle_message_subscription(
3002 struct notification_client *client,
3003 enum lttng_notification_channel_message_type msg_type,
3004 struct notification_thread_state *state)
3005 {
3006 int ret;
3007 struct lttng_condition *condition;
3008 enum lttng_notification_channel_status status =
3009 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
3010 struct lttng_payload_view condition_view =
3011 lttng_payload_view_from_dynamic_buffer(
3012 &client->communication.inbound.buffer,
3013 0, -1);
3014 size_t expected_condition_size;
3015
3016 pthread_mutex_lock(&client->lock);
3017 expected_condition_size = client->communication.inbound.buffer.size;
3018 pthread_mutex_unlock(&client->lock);
3019
3020 ret = lttng_condition_create_from_payload(&condition_view, &condition);
3021 if (ret != expected_condition_size) {
3022 ERR("[notification-thread] Malformed condition received from client");
3023 goto end;
3024 }
3025
3026 if (msg_type == LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE) {
3027 ret = notification_thread_client_subscribe(
3028 client, condition, state, &status);
3029 } else {
3030 ret = notification_thread_client_unsubscribe(
3031 client, condition, state, &status);
3032 }
3033 if (ret) {
3034 goto end;
3035 }
3036
3037 pthread_mutex_lock(&client->lock);
3038 ret = client_send_command_reply(client, state, status);
3039 if (ret) {
3040 ERR("[notification-thread] Failed to send reply to notification channel client");
3041 goto end_unlock;
3042 }
3043
3044 /* Set reception state to receive the next message header. */
3045 ret = client_reset_inbound_state(client);
3046 if (ret) {
3047 ERR("[notification-thread] Failed to reset client communication's inbound state");
3048 goto end_unlock;
3049 }
3050
3051 end_unlock:
3052 pthread_mutex_unlock(&client->lock);
3053 end:
3054 return ret;
3055 }
3056
3057 static
3058 int client_dispatch_message(struct notification_client *client,
3059 struct notification_thread_state *state)
3060 {
3061 int ret = 0;
3062
3063 if (client->communication.inbound.msg_type !=
3064 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE &&
3065 client->communication.inbound.msg_type !=
3066 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN &&
3067 !client->validated) {
3068 WARN("[notification-thread] client attempted a command before handshake");
3069 ret = -1;
3070 goto end;
3071 }
3072
3073 switch (client->communication.inbound.msg_type) {
3074 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN:
3075 {
3076 ret = client_handle_message_unknown(client, state);
3077 break;
3078 }
3079 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
3080 {
3081 ret = client_handle_message_handshake(client, state);
3082 break;
3083 }
3084 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
3085 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
3086 {
3087 ret = client_handle_message_subscription(client,
3088 client->communication.inbound.msg_type, state);
3089 break;
3090 }
3091 default:
3092 abort();
3093 }
3094 end:
3095 return ret;
3096 }
3097
3098 /* Incoming data from client. */
3099 int handle_notification_thread_client_in(
3100 struct notification_thread_state *state, int socket)
3101 {
3102 int ret = 0;
3103 struct notification_client *client;
3104 ssize_t recv_ret;
3105 size_t offset;
3106 bool message_is_complete = false;
3107
3108 client = get_client_from_socket(socket, state);
3109 if (!client) {
3110 /* Internal error, abort. */
3111 ret = -1;
3112 goto end;
3113 }
3114
3115 pthread_mutex_lock(&client->lock);
3116 offset = client->communication.inbound.buffer.size -
3117 client->communication.inbound.bytes_to_receive;
3118 if (client->communication.inbound.expect_creds) {
3119 recv_ret = lttcomm_recv_creds_unix_sock(socket,
3120 client->communication.inbound.buffer.data + offset,
3121 client->communication.inbound.bytes_to_receive,
3122 &client->communication.inbound.creds);
3123 if (recv_ret > 0) {
3124 client->communication.inbound.expect_creds = false;
3125 client->communication.inbound.creds_received = true;
3126 }
3127 } else {
3128 recv_ret = lttcomm_recv_unix_sock_non_block(socket,
3129 client->communication.inbound.buffer.data + offset,
3130 client->communication.inbound.bytes_to_receive);
3131 }
3132 if (recv_ret >= 0) {
3133 client->communication.inbound.bytes_to_receive -= recv_ret;
3134 message_is_complete = client->communication.inbound
3135 .bytes_to_receive == 0;
3136 }
3137 pthread_mutex_unlock(&client->lock);
3138 if (recv_ret < 0) {
3139 goto error_disconnect_client;
3140 }
3141
3142 if (message_is_complete) {
3143 ret = client_dispatch_message(client, state);
3144 if (ret) {
3145 /*
3146 * Only returns an error if this client must be
3147 * disconnected.
3148 */
3149 goto error_disconnect_client;
3150 }
3151 }
3152 end:
3153 return ret;
3154 error_disconnect_client:
3155 pthread_mutex_lock(&client->lock);
3156 ret = notification_thread_client_disconnect(client, state);
3157 pthread_mutex_unlock(&client->lock);
3158 return ret;
3159 }
3160
3161 /* Client ready to receive outgoing data. */
3162 int handle_notification_thread_client_out(
3163 struct notification_thread_state *state, int socket)
3164 {
3165 int ret;
3166 struct notification_client *client;
3167 enum client_transmission_status transmission_status;
3168
3169 client = get_client_from_socket(socket, state);
3170 if (!client) {
3171 /* Internal error, abort. */
3172 ret = -1;
3173 goto end;
3174 }
3175
3176 pthread_mutex_lock(&client->lock);
3177 transmission_status = client_flush_outgoing_queue(client);
3178 ret = client_handle_transmission_status(
3179 client, transmission_status, state);
3180 pthread_mutex_unlock(&client->lock);
3181 if (ret) {
3182 goto end;
3183 }
3184 end:
3185 return ret;
3186 }
3187
3188 static
3189 bool evaluate_buffer_usage_condition(const struct lttng_condition *condition,
3190 const struct channel_state_sample *sample,
3191 uint64_t buffer_capacity)
3192 {
3193 bool result = false;
3194 uint64_t threshold;
3195 enum lttng_condition_type condition_type;
3196 const struct lttng_condition_buffer_usage *use_condition = container_of(
3197 condition, struct lttng_condition_buffer_usage,
3198 parent);
3199
3200 if (use_condition->threshold_bytes.set) {
3201 threshold = use_condition->threshold_bytes.value;
3202 } else {
3203 /*
3204 * Threshold was expressed as a ratio.
3205 *
3206 * TODO the threshold (in bytes) of conditions expressed
3207 * as a ratio of total buffer size could be cached to
3208 * forego this double-multiplication or it could be performed
3209 * as fixed-point math.
3210 *
3211 * Note that caching should accommodates the case where the
3212 * condition applies to multiple channels (i.e. don't assume
3213 * that all channels matching my_chann* have the same size...)
3214 */
3215 threshold = (uint64_t) (use_condition->threshold_ratio.value *
3216 (double) buffer_capacity);
3217 }
3218
3219 condition_type = lttng_condition_get_type(condition);
3220 if (condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW) {
3221 DBG("[notification-thread] Low buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
3222 threshold, sample->highest_usage);
3223
3224 /*
3225 * The low condition should only be triggered once _all_ of the
3226 * streams in a channel have gone below the "low" threshold.
3227 */
3228 if (sample->highest_usage <= threshold) {
3229 result = true;
3230 }
3231 } else {
3232 DBG("[notification-thread] High buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
3233 threshold, sample->highest_usage);
3234
3235 /*
3236 * For high buffer usage scenarios, we want to trigger whenever
3237 * _any_ of the streams has reached the "high" threshold.
3238 */
3239 if (sample->highest_usage >= threshold) {
3240 result = true;
3241 }
3242 }
3243
3244 return result;
3245 }
3246
3247 static
3248 bool evaluate_session_consumed_size_condition(
3249 const struct lttng_condition *condition,
3250 uint64_t session_consumed_size)
3251 {
3252 uint64_t threshold;
3253 const struct lttng_condition_session_consumed_size *size_condition =
3254 container_of(condition,
3255 struct lttng_condition_session_consumed_size,
3256 parent);
3257
3258 threshold = size_condition->consumed_threshold_bytes.value;
3259 DBG("[notification-thread] Session consumed size condition being evaluated: threshold = %" PRIu64 ", current size = %" PRIu64,
3260 threshold, session_consumed_size);
3261 return session_consumed_size >= threshold;
3262 }
3263
3264 static
3265 int evaluate_buffer_condition(const struct lttng_condition *condition,
3266 struct lttng_evaluation **evaluation,
3267 const struct notification_thread_state *state,
3268 const struct channel_state_sample *previous_sample,
3269 const struct channel_state_sample *latest_sample,
3270 uint64_t previous_session_consumed_total,
3271 uint64_t latest_session_consumed_total,
3272 struct channel_info *channel_info)
3273 {
3274 int ret = 0;
3275 enum lttng_condition_type condition_type;
3276 const bool previous_sample_available = !!previous_sample;
3277 bool previous_sample_result = false;
3278 bool latest_sample_result;
3279
3280 condition_type = lttng_condition_get_type(condition);
3281
3282 switch (condition_type) {
3283 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
3284 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
3285 if (caa_likely(previous_sample_available)) {
3286 previous_sample_result =
3287 evaluate_buffer_usage_condition(condition,
3288 previous_sample, channel_info->capacity);
3289 }
3290 latest_sample_result = evaluate_buffer_usage_condition(
3291 condition, latest_sample,
3292 channel_info->capacity);
3293 break;
3294 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
3295 if (caa_likely(previous_sample_available)) {
3296 previous_sample_result =
3297 evaluate_session_consumed_size_condition(
3298 condition,
3299 previous_session_consumed_total);
3300 }
3301 latest_sample_result =
3302 evaluate_session_consumed_size_condition(
3303 condition,
3304 latest_session_consumed_total);
3305 break;
3306 default:
3307 /* Unknown condition type; internal error. */
3308 abort();
3309 }
3310
3311 if (!latest_sample_result ||
3312 (previous_sample_result == latest_sample_result)) {
3313 /*
3314 * Only trigger on a condition evaluation transition.
3315 *
3316 * NOTE: This edge-triggered logic may not be appropriate for
3317 * future condition types.
3318 */
3319 goto end;
3320 }
3321
3322 if (!evaluation || !latest_sample_result) {
3323 goto end;
3324 }
3325
3326 switch (condition_type) {
3327 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
3328 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
3329 *evaluation = lttng_evaluation_buffer_usage_create(
3330 condition_type,
3331 latest_sample->highest_usage,
3332 channel_info->capacity);
3333 break;
3334 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
3335 *evaluation = lttng_evaluation_session_consumed_size_create(
3336 latest_session_consumed_total);
3337 break;
3338 default:
3339 abort();
3340 }
3341
3342 if (!*evaluation) {
3343 ret = -1;
3344 goto end;
3345 }
3346 end:
3347 return ret;
3348 }
3349
3350 static
3351 int client_notification_overflow(struct notification_client *client)
3352 {
3353 int ret = 0;
3354 const struct lttng_notification_channel_message msg = {
3355 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED,
3356 };
3357
3358 ASSERT_LOCKED(client->lock);
3359
3360 DBG("Dropping notification addressed to client (socket fd = %i)",
3361 client->socket);
3362 if (client->communication.outbound.dropped_notification) {
3363 /*
3364 * The client already has a "notification dropped" message
3365 * in its outgoing queue. Nothing to do since all
3366 * of those messages are coalesced.
3367 */
3368 goto end;
3369 }
3370
3371 client->communication.outbound.dropped_notification = true;
3372 ret = lttng_dynamic_buffer_append(
3373 &client->communication.outbound.buffer, &msg,
3374 sizeof(msg));
3375 if (ret) {
3376 PERROR("Failed to enqueue \"dropped notification\" message in client's (socket fd = %i) outgoing queue",
3377 client->socket);
3378 }
3379 end:
3380 return ret;
3381 }
3382
3383 static int client_handle_transmission_status_wrapper(
3384 struct notification_client *client,
3385 enum client_transmission_status status,
3386 void *user_data)
3387 {
3388 return client_handle_transmission_status(client, status,
3389 (struct notification_thread_state *) user_data);
3390 }
3391
3392 static
3393 int send_evaluation_to_clients(const struct lttng_trigger *trigger,
3394 const struct lttng_evaluation *evaluation,
3395 struct notification_client_list* client_list,
3396 struct notification_thread_state *state,
3397 uid_t object_uid, gid_t object_gid)
3398 {
3399 return notification_client_list_send_evaluation(client_list,
3400 lttng_trigger_get_const_condition(trigger), evaluation,
3401 lttng_trigger_get_credentials(trigger),
3402 &(struct lttng_credentials){
3403 .uid = object_uid, .gid = object_gid},
3404 client_handle_transmission_status_wrapper, state);
3405 }
3406
3407 /*
3408 * Permission checks relative to notification channel clients are performed
3409 * here. Notice how object, client, and trigger credentials are involved in
3410 * this check.
3411 *
3412 * The `object` credentials are the credentials associated with the "subject"
3413 * of a condition. For instance, a `rotation completed` condition applies
3414 * to a session. When that condition is met, it will produce an evaluation
3415 * against a session. Hence, in this case, the `object` credentials are the
3416 * credentials of the "subject" session.
3417 *
3418 * The `trigger` credentials are the credentials of the user that registered the
3419 * trigger.
3420 *
3421 * The `client` credentials are the credentials of the user that created a given
3422 * notification channel.
3423 *
3424 * In terms of visibility, it is expected that non-privilieged users can only
3425 * register triggers against "their" objects (their own sessions and
3426 * applications they are allowed to interact with). They can then open a
3427 * notification channel and subscribe to notifications associated with those
3428 * triggers.
3429 *
3430 * As for privilieged users, they can register triggers against the objects of
3431 * other users. They can then subscribe to the notifications associated to their
3432 * triggers. Privilieged users _can't_ subscribe to the notifications of
3433 * triggers owned by other users; they must create their own triggers.
3434 *
3435 * This is more a concern of usability than security. It would be difficult for
3436 * a root user reliably subscribe to a specific set of conditions without
3437 * interference from external users (those could, for instance, unregister
3438 * their triggers).
3439 */
3440 LTTNG_HIDDEN
3441 int notification_client_list_send_evaluation(
3442 struct notification_client_list *client_list,
3443 const struct lttng_condition *condition,
3444 const struct lttng_evaluation *evaluation,
3445 const struct lttng_credentials *trigger_creds,
3446 const struct lttng_credentials *source_object_creds,
3447 report_client_transmission_result_cb client_report,
3448 void *user_data)
3449 {
3450 int ret = 0;
3451 struct lttng_payload msg_payload;
3452 struct notification_client_list_element *client_list_element, *tmp;
3453 const struct lttng_notification notification = {
3454 .condition = (struct lttng_condition *) condition,
3455 .evaluation = (struct lttng_evaluation *) evaluation,
3456 };
3457 struct lttng_notification_channel_message msg_header = {
3458 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION,
3459 };
3460
3461 lttng_payload_init(&msg_payload);
3462
3463 ret = lttng_dynamic_buffer_append(&msg_payload.buffer, &msg_header,
3464 sizeof(msg_header));
3465 if (ret) {
3466 goto end;
3467 }
3468
3469 ret = lttng_notification_serialize(&notification, &msg_payload);
3470 if (ret) {
3471 ERR("[notification-thread] Failed to serialize notification");
3472 ret = -1;
3473 goto end;
3474 }
3475
3476 /* Update payload size. */
3477 ((struct lttng_notification_channel_message *) msg_payload.buffer.data)
3478 ->size = (uint32_t)(
3479 msg_payload.buffer.size - sizeof(msg_header));
3480
3481 pthread_mutex_lock(&client_list->lock);
3482 cds_list_for_each_entry_safe(client_list_element, tmp,
3483 &client_list->list, node) {
3484 enum client_transmission_status transmission_status;
3485 struct notification_client *client =
3486 client_list_element->client;
3487
3488 ret = 0;
3489 pthread_mutex_lock(&client->lock);
3490 if (source_object_creds) {
3491 if (client->uid != source_object_creds->uid &&
3492 client->gid != source_object_creds->gid &&
3493 client->uid != 0) {
3494 /*
3495 * Client is not allowed to monitor this
3496 * object.
3497 */
3498 DBG("[notification-thread] Skipping client at it does not have the object permission to receive notification for this trigger");
3499 goto unlock_client;
3500 }
3501 }
3502
3503 if (client->uid != trigger_creds->uid && client->gid != trigger_creds->gid) {
3504 DBG("[notification-thread] Skipping client at it does not have the permission to receive notification for this trigger");
3505 goto unlock_client;
3506 }
3507
3508 DBG("[notification-thread] Sending notification to client (fd = %i, %zu bytes)",
3509 client->socket, msg_payload.buffer.size);
3510 if (client->communication.outbound.buffer.size) {
3511 /*
3512 * Outgoing data is already buffered for this client;
3513 * drop the notification and enqueue a "dropped
3514 * notification" message if this is the first dropped
3515 * notification since the socket spilled-over to the
3516 * queue.
3517 */
3518 ret = client_notification_overflow(client);
3519 if (ret) {
3520 goto unlock_client;
3521 }
3522 }
3523
3524 ret = lttng_dynamic_buffer_append_buffer(
3525 &client->communication.outbound.buffer,
3526 &msg_payload.buffer);
3527 if (ret) {
3528 goto unlock_client;
3529 }
3530
3531 transmission_status = client_flush_outgoing_queue(client);
3532 ret = client_report(client, transmission_status, user_data);
3533 if (ret) {
3534 goto unlock_client;
3535 }
3536 unlock_client:
3537 pthread_mutex_unlock(&client->lock);
3538 if (ret) {
3539 goto end_unlock_list;
3540 }
3541 }
3542 ret = 0;
3543
3544 end_unlock_list:
3545 pthread_mutex_unlock(&client_list->lock);
3546 end:
3547 lttng_payload_reset(&msg_payload);
3548 return ret;
3549 }
3550
3551 int handle_notification_thread_channel_sample(
3552 struct notification_thread_state *state, int pipe,
3553 enum lttng_domain_type domain)
3554 {
3555 int ret = 0;
3556 struct lttcomm_consumer_channel_monitor_msg sample_msg;
3557 struct channel_info *channel_info;
3558 struct cds_lfht_node *node;
3559 struct cds_lfht_iter iter;
3560 struct lttng_channel_trigger_list *trigger_list;
3561 struct lttng_trigger_list_element *trigger_list_element;
3562 bool previous_sample_available = false;
3563 struct channel_state_sample previous_sample, latest_sample;
3564 uint64_t previous_session_consumed_total, latest_session_consumed_total;
3565 struct lttng_credentials channel_creds;
3566
3567 /*
3568 * The monitoring pipe only holds messages smaller than PIPE_BUF,
3569 * ensuring that read/write of sampling messages are atomic.
3570 */
3571 ret = lttng_read(pipe, &sample_msg, sizeof(sample_msg));
3572 if (ret != sizeof(sample_msg)) {
3573 ERR("[notification-thread] Failed to read from monitoring pipe (fd = %i)",
3574 pipe);
3575 ret = -1;
3576 goto end;
3577 }
3578
3579 ret = 0;
3580 latest_sample.key.key = sample_msg.key;
3581 latest_sample.key.domain = domain;
3582 latest_sample.highest_usage = sample_msg.highest;
3583 latest_sample.lowest_usage = sample_msg.lowest;
3584 latest_sample.channel_total_consumed = sample_msg.total_consumed;
3585
3586 rcu_read_lock();
3587
3588 /* Retrieve the channel's informations */
3589 cds_lfht_lookup(state->channels_ht,
3590 hash_channel_key(&latest_sample.key),
3591 match_channel_info,
3592 &latest_sample.key,
3593 &iter);
3594 node = cds_lfht_iter_get_node(&iter);
3595 if (caa_unlikely(!node)) {
3596 /*
3597 * Not an error since the consumer can push a sample to the pipe
3598 * and the rest of the session daemon could notify us of the
3599 * channel's destruction before we get a chance to process that
3600 * sample.
3601 */
3602 DBG("[notification-thread] Received a sample for an unknown channel from consumerd, key = %" PRIu64 " in %s domain",
3603 latest_sample.key.key,
3604 domain == LTTNG_DOMAIN_KERNEL ? "kernel" :
3605 "user space");
3606 goto end_unlock;
3607 }
3608 channel_info = caa_container_of(node, struct channel_info,
3609 channels_ht_node);
3610 DBG("[notification-thread] Handling channel sample for channel %s (key = %" PRIu64 ") in session %s (highest usage = %" PRIu64 ", lowest usage = %" PRIu64", total consumed = %" PRIu64")",
3611 channel_info->name,
3612 latest_sample.key.key,
3613 channel_info->session_info->name,
3614 latest_sample.highest_usage,
3615 latest_sample.lowest_usage,
3616 latest_sample.channel_total_consumed);
3617
3618 previous_session_consumed_total =
3619 channel_info->session_info->consumed_data_size;
3620
3621 /* Retrieve the channel's last sample, if it exists, and update it. */
3622 cds_lfht_lookup(state->channel_state_ht,
3623 hash_channel_key(&latest_sample.key),
3624 match_channel_state_sample,
3625 &latest_sample.key,
3626 &iter);
3627 node = cds_lfht_iter_get_node(&iter);
3628 if (caa_likely(node)) {
3629 struct channel_state_sample *stored_sample;
3630
3631 /* Update the sample stored. */
3632 stored_sample = caa_container_of(node,
3633 struct channel_state_sample,
3634 channel_state_ht_node);
3635
3636 memcpy(&previous_sample, stored_sample,
3637 sizeof(previous_sample));
3638 stored_sample->highest_usage = latest_sample.highest_usage;
3639 stored_sample->lowest_usage = latest_sample.lowest_usage;
3640 stored_sample->channel_total_consumed = latest_sample.channel_total_consumed;
3641 previous_sample_available = true;
3642
3643 latest_session_consumed_total =
3644 previous_session_consumed_total +
3645 (latest_sample.channel_total_consumed - previous_sample.channel_total_consumed);
3646 } else {
3647 /*
3648 * This is the channel's first sample, allocate space for and
3649 * store the new sample.
3650 */
3651 struct channel_state_sample *stored_sample;
3652
3653 stored_sample = zmalloc(sizeof(*stored_sample));
3654 if (!stored_sample) {
3655 ret = -1;
3656 goto end_unlock;
3657 }
3658
3659 memcpy(stored_sample, &latest_sample, sizeof(*stored_sample));
3660 cds_lfht_node_init(&stored_sample->channel_state_ht_node);
3661 cds_lfht_add(state->channel_state_ht,
3662 hash_channel_key(&stored_sample->key),
3663 &stored_sample->channel_state_ht_node);
3664
3665 latest_session_consumed_total =
3666 previous_session_consumed_total +
3667 latest_sample.channel_total_consumed;
3668 }
3669
3670 channel_info->session_info->consumed_data_size =
3671 latest_session_consumed_total;
3672
3673 /* Find triggers associated with this channel. */
3674 cds_lfht_lookup(state->channel_triggers_ht,
3675 hash_channel_key(&latest_sample.key),
3676 match_channel_trigger_list,
3677 &latest_sample.key,
3678 &iter);
3679 node = cds_lfht_iter_get_node(&iter);
3680 if (caa_likely(!node)) {
3681 goto end_unlock;
3682 }
3683
3684 channel_creds = (typeof(channel_creds)) {
3685 .uid = channel_info->session_info->uid,
3686 .gid = channel_info->session_info->gid,
3687 };
3688
3689 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
3690 channel_triggers_ht_node);
3691 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
3692 node) {
3693 const struct lttng_condition *condition;
3694 const struct lttng_action *action;
3695 struct lttng_trigger *trigger;
3696 struct notification_client_list *client_list = NULL;
3697 struct lttng_evaluation *evaluation = NULL;
3698 bool client_list_is_empty;
3699 enum action_executor_status executor_status;
3700
3701 ret = 0;
3702 trigger = trigger_list_element->trigger;
3703 condition = lttng_trigger_get_const_condition(trigger);
3704 assert(condition);
3705 action = lttng_trigger_get_const_action(trigger);
3706
3707 /* Notify actions are the only type currently supported. */
3708 assert(lttng_action_get_type_const(action) ==
3709 LTTNG_ACTION_TYPE_NOTIFY);
3710
3711 /*
3712 * Check if any client is subscribed to the result of this
3713 * evaluation.
3714 */
3715 client_list = get_client_list_from_condition(state, condition);
3716 assert(client_list);
3717 client_list_is_empty = cds_list_empty(&client_list->list);
3718 if (client_list_is_empty) {
3719 /*
3720 * No clients interested in the evaluation's result,
3721 * skip it.
3722 */
3723 goto put_list;
3724 }
3725
3726 ret = evaluate_buffer_condition(condition, &evaluation, state,
3727 previous_sample_available ? &previous_sample : NULL,
3728 &latest_sample,
3729 previous_session_consumed_total,
3730 latest_session_consumed_total,
3731 channel_info);
3732 if (caa_unlikely(ret)) {
3733 goto put_list;
3734 }
3735
3736 if (caa_likely(!evaluation)) {
3737 goto put_list;
3738 }
3739
3740 /*
3741 * Ownership of `evaluation` transferred to the action executor
3742 * no matter the result.
3743 */
3744 executor_status = action_executor_enqueue(state->executor,
3745 trigger, evaluation, &channel_creds,
3746 client_list);
3747 evaluation = NULL;
3748 switch (executor_status) {
3749 case ACTION_EXECUTOR_STATUS_OK:
3750 break;
3751 case ACTION_EXECUTOR_STATUS_ERROR:
3752 case ACTION_EXECUTOR_STATUS_INVALID:
3753 /*
3754 * TODO Add trigger identification (name/id) when
3755 * it is added to the API.
3756 */
3757 ERR("Fatal error occurred while enqueuing action associated with buffer-condition trigger");
3758 ret = -1;
3759 goto put_list;
3760 case ACTION_EXECUTOR_STATUS_OVERFLOW:
3761 /*
3762 * TODO Add trigger identification (name/id) when
3763 * it is added to the API.
3764 *
3765 * Not a fatal error.
3766 */
3767 WARN("No space left when enqueuing action associated with buffer-condition trigger");
3768 ret = 0;
3769 goto put_list;
3770 default:
3771 abort();
3772 }
3773
3774 put_list:
3775 notification_client_list_put(client_list);
3776 if (caa_unlikely(ret)) {
3777 break;
3778 }
3779 }
3780 end_unlock:
3781 rcu_read_unlock();
3782 end:
3783 return ret;
3784 }
This page took 0.162534 seconds and 5 git commands to generate.