51b95ee6da2e85b2501eda2b839cd39629ff760c
[lttng-tools.git] / src / bin / lttng-sessiond / notification-thread-events.c
1 /*
2 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * SPDX-License-Identifier: GPL-2.0-only
5 *
6 */
7
8 #define _LGPL_SOURCE
9 #include <urcu.h>
10 #include <urcu/rculfhash.h>
11
12 #include <common/defaults.h>
13 #include <common/error.h>
14 #include <common/futex.h>
15 #include <common/unix.h>
16 #include <common/dynamic-buffer.h>
17 #include <common/hashtable/utils.h>
18 #include <common/sessiond-comm/sessiond-comm.h>
19 #include <common/macros.h>
20 #include <lttng/condition/condition.h>
21 #include <lttng/action/action-internal.h>
22 #include <lttng/notification/notification-internal.h>
23 #include <lttng/condition/condition-internal.h>
24 #include <lttng/condition/buffer-usage-internal.h>
25 #include <lttng/condition/session-consumed-size-internal.h>
26 #include <lttng/condition/session-rotation-internal.h>
27 #include <lttng/notification/channel-internal.h>
28
29 #include <time.h>
30 #include <unistd.h>
31 #include <assert.h>
32 #include <inttypes.h>
33 #include <fcntl.h>
34
35 #include "notification-thread.h"
36 #include "notification-thread-events.h"
37 #include "notification-thread-commands.h"
38 #include "lttng-sessiond.h"
39 #include "kernel.h"
40
41 #define CLIENT_POLL_MASK_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
42 #define CLIENT_POLL_MASK_IN_OUT (CLIENT_POLL_MASK_IN | LPOLLOUT)
43
44 enum lttng_object_type {
45 LTTNG_OBJECT_TYPE_UNKNOWN,
46 LTTNG_OBJECT_TYPE_NONE,
47 LTTNG_OBJECT_TYPE_CHANNEL,
48 LTTNG_OBJECT_TYPE_SESSION,
49 };
50
51 struct lttng_trigger_list_element {
52 /* No ownership of the trigger object is assumed. */
53 const struct lttng_trigger *trigger;
54 struct cds_list_head node;
55 };
56
57 struct lttng_channel_trigger_list {
58 struct channel_key channel_key;
59 /* List of struct lttng_trigger_list_element. */
60 struct cds_list_head list;
61 /* Node in the channel_triggers_ht */
62 struct cds_lfht_node channel_triggers_ht_node;
63 /* call_rcu delayed reclaim. */
64 struct rcu_head rcu_node;
65 };
66
67 /*
68 * List of triggers applying to a given session.
69 *
70 * See:
71 * - lttng_session_trigger_list_create()
72 * - lttng_session_trigger_list_build()
73 * - lttng_session_trigger_list_destroy()
74 * - lttng_session_trigger_list_add()
75 */
76 struct lttng_session_trigger_list {
77 /*
78 * Not owned by this; points to the session_info structure's
79 * session name.
80 */
81 const char *session_name;
82 /* List of struct lttng_trigger_list_element. */
83 struct cds_list_head list;
84 /* Node in the session_triggers_ht */
85 struct cds_lfht_node session_triggers_ht_node;
86 /*
87 * Weak reference to the notification system's session triggers
88 * hashtable.
89 *
90 * The session trigger list structure structure is owned by
91 * the session's session_info.
92 *
93 * The session_info is kept alive the the channel_infos holding a
94 * reference to it (reference counting). When those channels are
95 * destroyed (at runtime or on teardown), the reference they hold
96 * to the session_info are released. On destruction of session_info,
97 * session_info_destroy() will remove the list of triggers applying
98 * to this session from the notification system's state.
99 *
100 * This implies that the session_triggers_ht must be destroyed
101 * after the channels.
102 */
103 struct cds_lfht *session_triggers_ht;
104 /* Used for delayed RCU reclaim. */
105 struct rcu_head rcu_node;
106 };
107
108 struct lttng_trigger_ht_element {
109 struct lttng_trigger *trigger;
110 struct cds_lfht_node node;
111 /* call_rcu delayed reclaim. */
112 struct rcu_head rcu_node;
113 };
114
115 struct lttng_condition_list_element {
116 struct lttng_condition *condition;
117 struct cds_list_head node;
118 };
119
120 struct notification_client_list_element {
121 struct notification_client *client;
122 struct cds_list_head node;
123 };
124
125 struct notification_client_list {
126 const struct lttng_trigger *trigger;
127 struct cds_list_head list;
128 struct cds_lfht_node notification_trigger_ht_node;
129 /* call_rcu delayed reclaim. */
130 struct rcu_head rcu_node;
131 };
132
133 struct notification_client {
134 notification_client_id id;
135 int socket;
136 /* Client protocol version. */
137 uint8_t major, minor;
138 uid_t uid;
139 gid_t gid;
140 /*
141 * Indicates if the credentials and versions of the client have been
142 * checked.
143 */
144 bool validated;
145 /*
146 * Conditions to which the client's notification channel is subscribed.
147 * List of struct lttng_condition_list_node. The condition member is
148 * owned by the client.
149 */
150 struct cds_list_head condition_list;
151 struct cds_lfht_node client_socket_ht_node;
152 struct cds_lfht_node client_id_ht_node;
153 struct {
154 struct {
155 /*
156 * During the reception of a message, the reception
157 * buffers' "size" is set to contain the current
158 * message's complete payload.
159 */
160 struct lttng_dynamic_buffer buffer;
161 /* Bytes left to receive for the current message. */
162 size_t bytes_to_receive;
163 /* Type of the message being received. */
164 enum lttng_notification_channel_message_type msg_type;
165 /*
166 * Indicates whether or not credentials are expected
167 * from the client.
168 */
169 bool expect_creds;
170 /*
171 * Indicates whether or not credentials were received
172 * from the client.
173 */
174 bool creds_received;
175 /* Only used during credentials reception. */
176 lttng_sock_cred creds;
177 } inbound;
178 struct {
179 /*
180 * Indicates whether or not a notification addressed to
181 * this client was dropped because a command reply was
182 * already buffered.
183 *
184 * A notification is dropped whenever the buffer is not
185 * empty.
186 */
187 bool dropped_notification;
188 /*
189 * Indicates whether or not a command reply is already
190 * buffered. In this case, it means that the client is
191 * not consuming command replies before emitting a new
192 * one. This could be caused by a protocol error or a
193 * misbehaving/malicious client.
194 */
195 bool queued_command_reply;
196 struct lttng_dynamic_buffer buffer;
197 } outbound;
198 } communication;
199 /* call_rcu delayed reclaim. */
200 struct rcu_head rcu_node;
201 };
202
203 struct channel_state_sample {
204 struct channel_key key;
205 struct cds_lfht_node channel_state_ht_node;
206 uint64_t highest_usage;
207 uint64_t lowest_usage;
208 uint64_t channel_total_consumed;
209 /* call_rcu delayed reclaim. */
210 struct rcu_head rcu_node;
211 };
212
213 static unsigned long hash_channel_key(struct channel_key *key);
214 static int evaluate_buffer_condition(const struct lttng_condition *condition,
215 struct lttng_evaluation **evaluation,
216 const struct notification_thread_state *state,
217 const struct channel_state_sample *previous_sample,
218 const struct channel_state_sample *latest_sample,
219 uint64_t previous_session_consumed_total,
220 uint64_t latest_session_consumed_total,
221 struct channel_info *channel_info);
222 static
223 int send_evaluation_to_clients(const struct lttng_trigger *trigger,
224 const struct lttng_evaluation *evaluation,
225 struct notification_client_list *client_list,
226 struct notification_thread_state *state,
227 uid_t channel_uid, gid_t channel_gid);
228
229
230 /* session_info API */
231 static
232 void session_info_destroy(void *_data);
233 static
234 void session_info_get(struct session_info *session_info);
235 static
236 void session_info_put(struct session_info *session_info);
237 static
238 struct session_info *session_info_create(const char *name,
239 uid_t uid, gid_t gid,
240 struct lttng_session_trigger_list *trigger_list,
241 struct cds_lfht *sessions_ht);
242 static
243 void session_info_add_channel(struct session_info *session_info,
244 struct channel_info *channel_info);
245 static
246 void session_info_remove_channel(struct session_info *session_info,
247 struct channel_info *channel_info);
248
249 /* lttng_session_trigger_list API */
250 static
251 struct lttng_session_trigger_list *lttng_session_trigger_list_create(
252 const char *session_name,
253 struct cds_lfht *session_triggers_ht);
254 static
255 struct lttng_session_trigger_list *lttng_session_trigger_list_build(
256 const struct notification_thread_state *state,
257 const char *session_name);
258 static
259 void lttng_session_trigger_list_destroy(
260 struct lttng_session_trigger_list *list);
261 static
262 int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
263 const struct lttng_trigger *trigger);
264
265
266 static
267 int match_client_socket(struct cds_lfht_node *node, const void *key)
268 {
269 /* This double-cast is intended to supress pointer-to-cast warning. */
270 const int socket = (int) (intptr_t) key;
271 const struct notification_client *client = caa_container_of(node,
272 struct notification_client, client_socket_ht_node);
273
274 return client->socket == socket;
275 }
276
277 static
278 int match_client_id(struct cds_lfht_node *node, const void *key)
279 {
280 /* This double-cast is intended to supress pointer-to-cast warning. */
281 const notification_client_id id = *((notification_client_id *) key);
282 const struct notification_client *client = caa_container_of(
283 node, struct notification_client, client_id_ht_node);
284
285 return client->id == id;
286 }
287
288 static
289 int match_channel_trigger_list(struct cds_lfht_node *node, const void *key)
290 {
291 struct channel_key *channel_key = (struct channel_key *) key;
292 struct lttng_channel_trigger_list *trigger_list;
293
294 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
295 channel_triggers_ht_node);
296
297 return !!((channel_key->key == trigger_list->channel_key.key) &&
298 (channel_key->domain == trigger_list->channel_key.domain));
299 }
300
301 static
302 int match_session_trigger_list(struct cds_lfht_node *node, const void *key)
303 {
304 const char *session_name = (const char *) key;
305 struct lttng_session_trigger_list *trigger_list;
306
307 trigger_list = caa_container_of(node, struct lttng_session_trigger_list,
308 session_triggers_ht_node);
309
310 return !!(strcmp(trigger_list->session_name, session_name) == 0);
311 }
312
313 static
314 int match_channel_state_sample(struct cds_lfht_node *node, const void *key)
315 {
316 struct channel_key *channel_key = (struct channel_key *) key;
317 struct channel_state_sample *sample;
318
319 sample = caa_container_of(node, struct channel_state_sample,
320 channel_state_ht_node);
321
322 return !!((channel_key->key == sample->key.key) &&
323 (channel_key->domain == sample->key.domain));
324 }
325
326 static
327 int match_channel_info(struct cds_lfht_node *node, const void *key)
328 {
329 struct channel_key *channel_key = (struct channel_key *) key;
330 struct channel_info *channel_info;
331
332 channel_info = caa_container_of(node, struct channel_info,
333 channels_ht_node);
334
335 return !!((channel_key->key == channel_info->key.key) &&
336 (channel_key->domain == channel_info->key.domain));
337 }
338
339 static
340 int match_condition(struct cds_lfht_node *node, const void *key)
341 {
342 struct lttng_condition *condition_key = (struct lttng_condition *) key;
343 struct lttng_trigger_ht_element *trigger;
344 struct lttng_condition *condition;
345
346 trigger = caa_container_of(node, struct lttng_trigger_ht_element,
347 node);
348 condition = lttng_trigger_get_condition(trigger->trigger);
349 assert(condition);
350
351 return !!lttng_condition_is_equal(condition_key, condition);
352 }
353
354 static
355 int match_client_list_condition(struct cds_lfht_node *node, const void *key)
356 {
357 struct lttng_condition *condition_key = (struct lttng_condition *) key;
358 struct notification_client_list *client_list;
359 const struct lttng_condition *condition;
360
361 assert(condition_key);
362
363 client_list = caa_container_of(node, struct notification_client_list,
364 notification_trigger_ht_node);
365 condition = lttng_trigger_get_const_condition(client_list->trigger);
366
367 return !!lttng_condition_is_equal(condition_key, condition);
368 }
369
370 static
371 int match_session(struct cds_lfht_node *node, const void *key)
372 {
373 const char *name = key;
374 struct session_info *session_info = caa_container_of(
375 node, struct session_info, sessions_ht_node);
376
377 return !strcmp(session_info->name, name);
378 }
379
380 static
381 unsigned long lttng_condition_buffer_usage_hash(
382 const struct lttng_condition *_condition)
383 {
384 unsigned long hash;
385 unsigned long condition_type;
386 struct lttng_condition_buffer_usage *condition;
387
388 condition = container_of(_condition,
389 struct lttng_condition_buffer_usage, parent);
390
391 condition_type = (unsigned long) condition->parent.type;
392 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
393 if (condition->session_name) {
394 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
395 }
396 if (condition->channel_name) {
397 hash ^= hash_key_str(condition->channel_name, lttng_ht_seed);
398 }
399 if (condition->domain.set) {
400 hash ^= hash_key_ulong(
401 (void *) condition->domain.type,
402 lttng_ht_seed);
403 }
404 if (condition->threshold_ratio.set) {
405 uint64_t val;
406
407 val = condition->threshold_ratio.value * (double) UINT32_MAX;
408 hash ^= hash_key_u64(&val, lttng_ht_seed);
409 } else if (condition->threshold_bytes.set) {
410 uint64_t val;
411
412 val = condition->threshold_bytes.value;
413 hash ^= hash_key_u64(&val, lttng_ht_seed);
414 }
415 return hash;
416 }
417
418 static
419 unsigned long lttng_condition_session_consumed_size_hash(
420 const struct lttng_condition *_condition)
421 {
422 unsigned long hash;
423 unsigned long condition_type =
424 (unsigned long) LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE;
425 struct lttng_condition_session_consumed_size *condition;
426 uint64_t val;
427
428 condition = container_of(_condition,
429 struct lttng_condition_session_consumed_size, parent);
430
431 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
432 if (condition->session_name) {
433 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
434 }
435 val = condition->consumed_threshold_bytes.value;
436 hash ^= hash_key_u64(&val, lttng_ht_seed);
437 return hash;
438 }
439
440 static
441 unsigned long lttng_condition_session_rotation_hash(
442 const struct lttng_condition *_condition)
443 {
444 unsigned long hash, condition_type;
445 struct lttng_condition_session_rotation *condition;
446
447 condition = container_of(_condition,
448 struct lttng_condition_session_rotation, parent);
449 condition_type = (unsigned long) condition->parent.type;
450 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
451 assert(condition->session_name);
452 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
453 return hash;
454 }
455
456 /*
457 * The lttng_condition hashing code is kept in this file (rather than
458 * condition.c) since it makes use of GPLv2 code (hashtable utils), which we
459 * don't want to link in liblttng-ctl.
460 */
461 static
462 unsigned long lttng_condition_hash(const struct lttng_condition *condition)
463 {
464 switch (condition->type) {
465 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
466 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
467 return lttng_condition_buffer_usage_hash(condition);
468 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
469 return lttng_condition_session_consumed_size_hash(condition);
470 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
471 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
472 return lttng_condition_session_rotation_hash(condition);
473 default:
474 ERR("[notification-thread] Unexpected condition type caught");
475 abort();
476 }
477 }
478
479 static
480 unsigned long hash_channel_key(struct channel_key *key)
481 {
482 unsigned long key_hash = hash_key_u64(&key->key, lttng_ht_seed);
483 unsigned long domain_hash = hash_key_ulong(
484 (void *) (unsigned long) key->domain, lttng_ht_seed);
485
486 return key_hash ^ domain_hash;
487 }
488
489 static
490 unsigned long hash_client_socket(int socket)
491 {
492 return hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed);
493 }
494
495 static
496 unsigned long hash_client_id(notification_client_id id)
497 {
498 return hash_key_u64(&id, lttng_ht_seed);
499 }
500
501 /*
502 * Get the type of object to which a given condition applies. Bindings let
503 * the notification system evaluate a trigger's condition when a given
504 * object's state is updated.
505 *
506 * For instance, a condition bound to a channel will be evaluated everytime
507 * the channel's state is changed by a channel monitoring sample.
508 */
509 static
510 enum lttng_object_type get_condition_binding_object(
511 const struct lttng_condition *condition)
512 {
513 switch (lttng_condition_get_type(condition)) {
514 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
515 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
516 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
517 return LTTNG_OBJECT_TYPE_CHANNEL;
518 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
519 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
520 return LTTNG_OBJECT_TYPE_SESSION;
521 default:
522 return LTTNG_OBJECT_TYPE_UNKNOWN;
523 }
524 }
525
526 static
527 void free_channel_info_rcu(struct rcu_head *node)
528 {
529 free(caa_container_of(node, struct channel_info, rcu_node));
530 }
531
532 static
533 void channel_info_destroy(struct channel_info *channel_info)
534 {
535 if (!channel_info) {
536 return;
537 }
538
539 if (channel_info->session_info) {
540 session_info_remove_channel(channel_info->session_info,
541 channel_info);
542 session_info_put(channel_info->session_info);
543 }
544 if (channel_info->name) {
545 free(channel_info->name);
546 }
547 call_rcu(&channel_info->rcu_node, free_channel_info_rcu);
548 }
549
550 static
551 void free_session_info_rcu(struct rcu_head *node)
552 {
553 free(caa_container_of(node, struct session_info, rcu_node));
554 }
555
556 /* Don't call directly, use the ref-counting mechanism. */
557 static
558 void session_info_destroy(void *_data)
559 {
560 struct session_info *session_info = _data;
561 int ret;
562
563 assert(session_info);
564 if (session_info->channel_infos_ht) {
565 ret = cds_lfht_destroy(session_info->channel_infos_ht, NULL);
566 if (ret) {
567 ERR("[notification-thread] Failed to destroy channel information hash table");
568 }
569 }
570 lttng_session_trigger_list_destroy(session_info->trigger_list);
571
572 rcu_read_lock();
573 cds_lfht_del(session_info->sessions_ht,
574 &session_info->sessions_ht_node);
575 rcu_read_unlock();
576 free(session_info->name);
577 call_rcu(&session_info->rcu_node, free_session_info_rcu);
578 }
579
580 static
581 void session_info_get(struct session_info *session_info)
582 {
583 if (!session_info) {
584 return;
585 }
586 lttng_ref_get(&session_info->ref);
587 }
588
589 static
590 void session_info_put(struct session_info *session_info)
591 {
592 if (!session_info) {
593 return;
594 }
595 lttng_ref_put(&session_info->ref);
596 }
597
598 static
599 struct session_info *session_info_create(const char *name, uid_t uid, gid_t gid,
600 struct lttng_session_trigger_list *trigger_list,
601 struct cds_lfht *sessions_ht)
602 {
603 struct session_info *session_info;
604
605 assert(name);
606
607 session_info = zmalloc(sizeof(*session_info));
608 if (!session_info) {
609 goto end;
610 }
611 lttng_ref_init(&session_info->ref, session_info_destroy);
612
613 session_info->channel_infos_ht = cds_lfht_new(DEFAULT_HT_SIZE,
614 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
615 if (!session_info->channel_infos_ht) {
616 goto error;
617 }
618
619 cds_lfht_node_init(&session_info->sessions_ht_node);
620 session_info->name = strdup(name);
621 if (!session_info->name) {
622 goto error;
623 }
624 session_info->uid = uid;
625 session_info->gid = gid;
626 session_info->trigger_list = trigger_list;
627 session_info->sessions_ht = sessions_ht;
628 end:
629 return session_info;
630 error:
631 session_info_put(session_info);
632 return NULL;
633 }
634
635 static
636 void session_info_add_channel(struct session_info *session_info,
637 struct channel_info *channel_info)
638 {
639 rcu_read_lock();
640 cds_lfht_add(session_info->channel_infos_ht,
641 hash_channel_key(&channel_info->key),
642 &channel_info->session_info_channels_ht_node);
643 rcu_read_unlock();
644 }
645
646 static
647 void session_info_remove_channel(struct session_info *session_info,
648 struct channel_info *channel_info)
649 {
650 rcu_read_lock();
651 cds_lfht_del(session_info->channel_infos_ht,
652 &channel_info->session_info_channels_ht_node);
653 rcu_read_unlock();
654 }
655
656 static
657 struct channel_info *channel_info_create(const char *channel_name,
658 struct channel_key *channel_key, uint64_t channel_capacity,
659 struct session_info *session_info)
660 {
661 struct channel_info *channel_info = zmalloc(sizeof(*channel_info));
662
663 if (!channel_info) {
664 goto end;
665 }
666
667 cds_lfht_node_init(&channel_info->channels_ht_node);
668 cds_lfht_node_init(&channel_info->session_info_channels_ht_node);
669 memcpy(&channel_info->key, channel_key, sizeof(*channel_key));
670 channel_info->capacity = channel_capacity;
671
672 channel_info->name = strdup(channel_name);
673 if (!channel_info->name) {
674 goto error;
675 }
676
677 /*
678 * Set the references between session and channel infos:
679 * - channel_info holds a strong reference to session_info
680 * - session_info holds a weak reference to channel_info
681 */
682 session_info_get(session_info);
683 session_info_add_channel(session_info, channel_info);
684 channel_info->session_info = session_info;
685 end:
686 return channel_info;
687 error:
688 channel_info_destroy(channel_info);
689 return NULL;
690 }
691
692 /* RCU read lock must be held by the caller. */
693 static
694 struct notification_client_list *get_client_list_from_condition(
695 struct notification_thread_state *state,
696 const struct lttng_condition *condition)
697 {
698 struct cds_lfht_node *node;
699 struct cds_lfht_iter iter;
700
701 cds_lfht_lookup(state->notification_trigger_clients_ht,
702 lttng_condition_hash(condition),
703 match_client_list_condition,
704 condition,
705 &iter);
706 node = cds_lfht_iter_get_node(&iter);
707
708 return node ? caa_container_of(node,
709 struct notification_client_list,
710 notification_trigger_ht_node) : NULL;
711 }
712
713 /* This function must be called with the RCU read lock held. */
714 static
715 int evaluate_channel_condition_for_client(
716 const struct lttng_condition *condition,
717 struct notification_thread_state *state,
718 struct lttng_evaluation **evaluation,
719 uid_t *session_uid, gid_t *session_gid)
720 {
721 int ret;
722 struct cds_lfht_iter iter;
723 struct cds_lfht_node *node;
724 struct channel_info *channel_info = NULL;
725 struct channel_key *channel_key = NULL;
726 struct channel_state_sample *last_sample = NULL;
727 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
728
729 /* Find the channel associated with the condition. */
730 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter,
731 channel_trigger_list, channel_triggers_ht_node) {
732 struct lttng_trigger_list_element *element;
733
734 cds_list_for_each_entry(element, &channel_trigger_list->list, node) {
735 const struct lttng_condition *current_condition =
736 lttng_trigger_get_const_condition(
737 element->trigger);
738
739 assert(current_condition);
740 if (!lttng_condition_is_equal(condition,
741 current_condition)) {
742 continue;
743 }
744
745 /* Found the trigger, save the channel key. */
746 channel_key = &channel_trigger_list->channel_key;
747 break;
748 }
749 if (channel_key) {
750 /* The channel key was found stop iteration. */
751 break;
752 }
753 }
754
755 if (!channel_key){
756 /* No channel found; normal exit. */
757 DBG("[notification-thread] No known channel associated with newly subscribed-to condition");
758 ret = 0;
759 goto end;
760 }
761
762 /* Fetch channel info for the matching channel. */
763 cds_lfht_lookup(state->channels_ht,
764 hash_channel_key(channel_key),
765 match_channel_info,
766 channel_key,
767 &iter);
768 node = cds_lfht_iter_get_node(&iter);
769 assert(node);
770 channel_info = caa_container_of(node, struct channel_info,
771 channels_ht_node);
772
773 /* Retrieve the channel's last sample, if it exists. */
774 cds_lfht_lookup(state->channel_state_ht,
775 hash_channel_key(channel_key),
776 match_channel_state_sample,
777 channel_key,
778 &iter);
779 node = cds_lfht_iter_get_node(&iter);
780 if (node) {
781 last_sample = caa_container_of(node,
782 struct channel_state_sample,
783 channel_state_ht_node);
784 } else {
785 /* Nothing to evaluate, no sample was ever taken. Normal exit */
786 DBG("[notification-thread] No channel sample associated with newly subscribed-to condition");
787 ret = 0;
788 goto end;
789 }
790
791 ret = evaluate_buffer_condition(condition, evaluation, state,
792 NULL, last_sample,
793 0, channel_info->session_info->consumed_data_size,
794 channel_info);
795 if (ret) {
796 WARN("[notification-thread] Fatal error occurred while evaluating a newly subscribed-to condition");
797 goto end;
798 }
799
800 *session_uid = channel_info->session_info->uid;
801 *session_gid = channel_info->session_info->gid;
802 end:
803 return ret;
804 }
805
806 static
807 const char *get_condition_session_name(const struct lttng_condition *condition)
808 {
809 const char *session_name = NULL;
810 enum lttng_condition_status status;
811
812 switch (lttng_condition_get_type(condition)) {
813 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
814 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
815 status = lttng_condition_buffer_usage_get_session_name(
816 condition, &session_name);
817 break;
818 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
819 status = lttng_condition_session_consumed_size_get_session_name(
820 condition, &session_name);
821 break;
822 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
823 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
824 status = lttng_condition_session_rotation_get_session_name(
825 condition, &session_name);
826 break;
827 default:
828 abort();
829 }
830 if (status != LTTNG_CONDITION_STATUS_OK) {
831 ERR("[notification-thread] Failed to retrieve session rotation condition's session name");
832 goto end;
833 }
834 end:
835 return session_name;
836 }
837
838 /* This function must be called with the RCU read lock held. */
839 static
840 int evaluate_session_condition_for_client(
841 const struct lttng_condition *condition,
842 struct notification_thread_state *state,
843 struct lttng_evaluation **evaluation,
844 uid_t *session_uid, gid_t *session_gid)
845 {
846 int ret;
847 struct cds_lfht_iter iter;
848 struct cds_lfht_node *node;
849 const char *session_name;
850 struct session_info *session_info = NULL;
851
852 session_name = get_condition_session_name(condition);
853
854 /* Find the session associated with the trigger. */
855 cds_lfht_lookup(state->sessions_ht,
856 hash_key_str(session_name, lttng_ht_seed),
857 match_session,
858 session_name,
859 &iter);
860 node = cds_lfht_iter_get_node(&iter);
861 if (!node) {
862 DBG("[notification-thread] No known session matching name \"%s\"",
863 session_name);
864 ret = 0;
865 goto end;
866 }
867
868 session_info = caa_container_of(node, struct session_info,
869 sessions_ht_node);
870 session_info_get(session_info);
871
872 /*
873 * Evaluation is performed in-line here since only one type of
874 * session-bound condition is handled for the moment.
875 */
876 switch (lttng_condition_get_type(condition)) {
877 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
878 if (!session_info->rotation.ongoing) {
879 ret = 0;
880 goto end_session_put;
881 }
882
883 *evaluation = lttng_evaluation_session_rotation_ongoing_create(
884 session_info->rotation.id);
885 if (!*evaluation) {
886 /* Fatal error. */
887 ERR("[notification-thread] Failed to create session rotation ongoing evaluation for session \"%s\"",
888 session_info->name);
889 ret = -1;
890 goto end_session_put;
891 }
892 ret = 0;
893 break;
894 default:
895 ret = 0;
896 goto end_session_put;
897 }
898
899 *session_uid = session_info->uid;
900 *session_gid = session_info->gid;
901
902 end_session_put:
903 session_info_put(session_info);
904 end:
905 return ret;
906 }
907
908 /* This function must be called with the RCU read lock held. */
909 static
910 int evaluate_condition_for_client(const struct lttng_trigger *trigger,
911 const struct lttng_condition *condition,
912 struct notification_client *client,
913 struct notification_thread_state *state)
914 {
915 int ret;
916 struct lttng_evaluation *evaluation = NULL;
917 struct notification_client_list client_list = { 0 };
918 struct notification_client_list_element client_list_element = { 0 };
919 uid_t object_uid = 0;
920 gid_t object_gid = 0;
921
922 assert(trigger);
923 assert(condition);
924 assert(client);
925 assert(state);
926
927 switch (get_condition_binding_object(condition)) {
928 case LTTNG_OBJECT_TYPE_SESSION:
929 ret = evaluate_session_condition_for_client(condition, state,
930 &evaluation, &object_uid, &object_gid);
931 break;
932 case LTTNG_OBJECT_TYPE_CHANNEL:
933 ret = evaluate_channel_condition_for_client(condition, state,
934 &evaluation, &object_uid, &object_gid);
935 break;
936 case LTTNG_OBJECT_TYPE_NONE:
937 ret = 0;
938 goto end;
939 case LTTNG_OBJECT_TYPE_UNKNOWN:
940 default:
941 ret = -1;
942 goto end;
943 }
944 if (ret) {
945 /* Fatal error. */
946 goto end;
947 }
948 if (!evaluation) {
949 /* Evaluation yielded nothing. Normal exit. */
950 DBG("[notification-thread] Newly subscribed-to condition evaluated to false, nothing to report to client");
951 ret = 0;
952 goto end;
953 }
954
955 /*
956 * Create a temporary client list with the client currently
957 * subscribing.
958 */
959 cds_lfht_node_init(&client_list.notification_trigger_ht_node);
960 CDS_INIT_LIST_HEAD(&client_list.list);
961 client_list.trigger = trigger;
962
963 CDS_INIT_LIST_HEAD(&client_list_element.node);
964 client_list_element.client = client;
965 cds_list_add(&client_list_element.node, &client_list.list);
966
967 /* Send evaluation result to the newly-subscribed client. */
968 DBG("[notification-thread] Newly subscribed-to condition evaluated to true, notifying client");
969 ret = send_evaluation_to_clients(trigger, evaluation, &client_list,
970 state, object_uid, object_gid);
971
972 end:
973 return ret;
974 }
975
976 static
977 int notification_thread_client_subscribe(struct notification_client *client,
978 struct lttng_condition *condition,
979 struct notification_thread_state *state,
980 enum lttng_notification_channel_status *_status)
981 {
982 int ret = 0;
983 struct notification_client_list *client_list;
984 struct lttng_condition_list_element *condition_list_element = NULL;
985 struct notification_client_list_element *client_list_element = NULL;
986 enum lttng_notification_channel_status status =
987 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
988
989 /*
990 * Ensure that the client has not already subscribed to this condition
991 * before.
992 */
993 cds_list_for_each_entry(condition_list_element, &client->condition_list, node) {
994 if (lttng_condition_is_equal(condition_list_element->condition,
995 condition)) {
996 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ALREADY_SUBSCRIBED;
997 goto end;
998 }
999 }
1000
1001 condition_list_element = zmalloc(sizeof(*condition_list_element));
1002 if (!condition_list_element) {
1003 ret = -1;
1004 goto error;
1005 }
1006 client_list_element = zmalloc(sizeof(*client_list_element));
1007 if (!client_list_element) {
1008 ret = -1;
1009 goto error;
1010 }
1011
1012 rcu_read_lock();
1013
1014 /*
1015 * Add the newly-subscribed condition to the client's subscription list.
1016 */
1017 CDS_INIT_LIST_HEAD(&condition_list_element->node);
1018 condition_list_element->condition = condition;
1019 cds_list_add(&condition_list_element->node, &client->condition_list);
1020
1021 client_list = get_client_list_from_condition(state, condition);
1022 if (!client_list) {
1023 /*
1024 * No notification-emiting trigger registered with this
1025 * condition. We don't evaluate the condition right away
1026 * since this trigger is not registered yet.
1027 */
1028 free(client_list_element);
1029 goto end_unlock;
1030 }
1031
1032 /*
1033 * The condition to which the client just subscribed is evaluated
1034 * at this point so that conditions that are already TRUE result
1035 * in a notification being sent out.
1036 */
1037 if (evaluate_condition_for_client(client_list->trigger, condition,
1038 client, state)) {
1039 WARN("[notification-thread] Evaluation of a condition on client subscription failed, aborting.");
1040 ret = -1;
1041 free(client_list_element);
1042 goto end_unlock;
1043 }
1044
1045 /*
1046 * Add the client to the list of clients interested in a given trigger
1047 * if a "notification" trigger with a corresponding condition was
1048 * added prior.
1049 */
1050 client_list_element->client = client;
1051 CDS_INIT_LIST_HEAD(&client_list_element->node);
1052 cds_list_add(&client_list_element->node, &client_list->list);
1053 end_unlock:
1054 rcu_read_unlock();
1055 end:
1056 if (_status) {
1057 *_status = status;
1058 }
1059 return ret;
1060 error:
1061 free(condition_list_element);
1062 free(client_list_element);
1063 return ret;
1064 }
1065
1066 static
1067 int notification_thread_client_unsubscribe(
1068 struct notification_client *client,
1069 struct lttng_condition *condition,
1070 struct notification_thread_state *state,
1071 enum lttng_notification_channel_status *_status)
1072 {
1073 struct notification_client_list *client_list;
1074 struct lttng_condition_list_element *condition_list_element,
1075 *condition_tmp;
1076 struct notification_client_list_element *client_list_element,
1077 *client_tmp;
1078 bool condition_found = false;
1079 enum lttng_notification_channel_status status =
1080 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
1081
1082 /* Remove the condition from the client's condition list. */
1083 cds_list_for_each_entry_safe(condition_list_element, condition_tmp,
1084 &client->condition_list, node) {
1085 if (!lttng_condition_is_equal(condition_list_element->condition,
1086 condition)) {
1087 continue;
1088 }
1089
1090 cds_list_del(&condition_list_element->node);
1091 /*
1092 * The caller may be iterating on the client's conditions to
1093 * tear down a client's connection. In this case, the condition
1094 * will be destroyed at the end.
1095 */
1096 if (condition != condition_list_element->condition) {
1097 lttng_condition_destroy(
1098 condition_list_element->condition);
1099 }
1100 free(condition_list_element);
1101 condition_found = true;
1102 break;
1103 }
1104
1105 if (!condition_found) {
1106 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNKNOWN_CONDITION;
1107 goto end;
1108 }
1109
1110 /*
1111 * Remove the client from the list of clients interested the trigger
1112 * matching the condition.
1113 */
1114 rcu_read_lock();
1115 client_list = get_client_list_from_condition(state, condition);
1116 if (!client_list) {
1117 goto end_unlock;
1118 }
1119
1120 cds_list_for_each_entry_safe(client_list_element, client_tmp,
1121 &client_list->list, node) {
1122 if (client_list_element->client->socket != client->socket) {
1123 continue;
1124 }
1125 cds_list_del(&client_list_element->node);
1126 free(client_list_element);
1127 break;
1128 }
1129 end_unlock:
1130 rcu_read_unlock();
1131 end:
1132 lttng_condition_destroy(condition);
1133 if (_status) {
1134 *_status = status;
1135 }
1136 return 0;
1137 }
1138
1139 static
1140 void free_notification_client_rcu(struct rcu_head *node)
1141 {
1142 free(caa_container_of(node, struct notification_client, rcu_node));
1143 }
1144
1145 static
1146 void notification_client_destroy(struct notification_client *client,
1147 struct notification_thread_state *state)
1148 {
1149 struct lttng_condition_list_element *condition_list_element, *tmp;
1150
1151 if (!client) {
1152 return;
1153 }
1154
1155 /* Release all conditions to which the client was subscribed. */
1156 cds_list_for_each_entry_safe(condition_list_element, tmp,
1157 &client->condition_list, node) {
1158 (void) notification_thread_client_unsubscribe(client,
1159 condition_list_element->condition, state, NULL);
1160 }
1161
1162 if (client->socket >= 0) {
1163 (void) lttcomm_close_unix_sock(client->socket);
1164 client->socket = -1;
1165 }
1166 lttng_dynamic_buffer_reset(&client->communication.inbound.buffer);
1167 lttng_dynamic_buffer_reset(&client->communication.outbound.buffer);
1168 call_rcu(&client->rcu_node, free_notification_client_rcu);
1169 }
1170
1171 /*
1172 * Call with rcu_read_lock held (and hold for the lifetime of the returned
1173 * client pointer).
1174 */
1175 static
1176 struct notification_client *get_client_from_socket(int socket,
1177 struct notification_thread_state *state)
1178 {
1179 struct cds_lfht_iter iter;
1180 struct cds_lfht_node *node;
1181 struct notification_client *client = NULL;
1182
1183 cds_lfht_lookup(state->client_socket_ht,
1184 hash_client_socket(socket),
1185 match_client_socket,
1186 (void *) (unsigned long) socket,
1187 &iter);
1188 node = cds_lfht_iter_get_node(&iter);
1189 if (!node) {
1190 goto end;
1191 }
1192
1193 client = caa_container_of(node, struct notification_client,
1194 client_socket_ht_node);
1195 end:
1196 return client;
1197 }
1198
1199 /*
1200 * Call with rcu_read_lock held (and hold for the lifetime of the returned
1201 * client pointer).
1202 */
1203 static
1204 struct notification_client *get_client_from_id(notification_client_id id,
1205 struct notification_thread_state *state)
1206 {
1207 struct cds_lfht_iter iter;
1208 struct cds_lfht_node *node;
1209 struct notification_client *client = NULL;
1210
1211 cds_lfht_lookup(state->client_id_ht,
1212 hash_client_id(id),
1213 match_client_id,
1214 &id,
1215 &iter);
1216 node = cds_lfht_iter_get_node(&iter);
1217 if (!node) {
1218 goto end;
1219 }
1220
1221 client = caa_container_of(node, struct notification_client,
1222 client_id_ht_node);
1223 end:
1224 return client;
1225 }
1226
1227 static
1228 bool buffer_usage_condition_applies_to_channel(
1229 const struct lttng_condition *condition,
1230 const struct channel_info *channel_info)
1231 {
1232 enum lttng_condition_status status;
1233 enum lttng_domain_type condition_domain;
1234 const char *condition_session_name = NULL;
1235 const char *condition_channel_name = NULL;
1236
1237 status = lttng_condition_buffer_usage_get_domain_type(condition,
1238 &condition_domain);
1239 assert(status == LTTNG_CONDITION_STATUS_OK);
1240 if (channel_info->key.domain != condition_domain) {
1241 goto fail;
1242 }
1243
1244 status = lttng_condition_buffer_usage_get_session_name(
1245 condition, &condition_session_name);
1246 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
1247
1248 status = lttng_condition_buffer_usage_get_channel_name(
1249 condition, &condition_channel_name);
1250 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_channel_name);
1251
1252 if (strcmp(channel_info->session_info->name, condition_session_name)) {
1253 goto fail;
1254 }
1255 if (strcmp(channel_info->name, condition_channel_name)) {
1256 goto fail;
1257 }
1258
1259 return true;
1260 fail:
1261 return false;
1262 }
1263
1264 static
1265 bool session_consumed_size_condition_applies_to_channel(
1266 const struct lttng_condition *condition,
1267 const struct channel_info *channel_info)
1268 {
1269 enum lttng_condition_status status;
1270 const char *condition_session_name = NULL;
1271
1272 status = lttng_condition_session_consumed_size_get_session_name(
1273 condition, &condition_session_name);
1274 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
1275
1276 if (strcmp(channel_info->session_info->name, condition_session_name)) {
1277 goto fail;
1278 }
1279
1280 return true;
1281 fail:
1282 return false;
1283 }
1284
1285 static
1286 bool trigger_applies_to_channel(const struct lttng_trigger *trigger,
1287 const struct channel_info *channel_info)
1288 {
1289 const struct lttng_condition *condition;
1290 bool trigger_applies;
1291
1292 condition = lttng_trigger_get_const_condition(trigger);
1293 if (!condition) {
1294 goto fail;
1295 }
1296
1297 switch (lttng_condition_get_type(condition)) {
1298 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
1299 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
1300 trigger_applies = buffer_usage_condition_applies_to_channel(
1301 condition, channel_info);
1302 break;
1303 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
1304 trigger_applies = session_consumed_size_condition_applies_to_channel(
1305 condition, channel_info);
1306 break;
1307 default:
1308 goto fail;
1309 }
1310
1311 return trigger_applies;
1312 fail:
1313 return false;
1314 }
1315
1316 static
1317 bool trigger_applies_to_client(struct lttng_trigger *trigger,
1318 struct notification_client *client)
1319 {
1320 bool applies = false;
1321 struct lttng_condition_list_element *condition_list_element;
1322
1323 cds_list_for_each_entry(condition_list_element, &client->condition_list,
1324 node) {
1325 applies = lttng_condition_is_equal(
1326 condition_list_element->condition,
1327 lttng_trigger_get_condition(trigger));
1328 if (applies) {
1329 break;
1330 }
1331 }
1332 return applies;
1333 }
1334
1335 /* Must be called with RCU read lock held. */
1336 static
1337 struct lttng_session_trigger_list *get_session_trigger_list(
1338 struct notification_thread_state *state,
1339 const char *session_name)
1340 {
1341 struct lttng_session_trigger_list *list = NULL;
1342 struct cds_lfht_node *node;
1343 struct cds_lfht_iter iter;
1344
1345 cds_lfht_lookup(state->session_triggers_ht,
1346 hash_key_str(session_name, lttng_ht_seed),
1347 match_session_trigger_list,
1348 session_name,
1349 &iter);
1350 node = cds_lfht_iter_get_node(&iter);
1351 if (!node) {
1352 /*
1353 * Not an error, the list of triggers applying to that session
1354 * will be initialized when the session is created.
1355 */
1356 DBG("[notification-thread] No trigger list found for session \"%s\" as it is not yet known to the notification system",
1357 session_name);
1358 goto end;
1359 }
1360
1361 list = caa_container_of(node,
1362 struct lttng_session_trigger_list,
1363 session_triggers_ht_node);
1364 end:
1365 return list;
1366 }
1367
1368 /*
1369 * Allocate an empty lttng_session_trigger_list for the session named
1370 * 'session_name'.
1371 *
1372 * No ownership of 'session_name' is assumed by the session trigger list.
1373 * It is the caller's responsability to ensure the session name is alive
1374 * for as long as this list is.
1375 */
1376 static
1377 struct lttng_session_trigger_list *lttng_session_trigger_list_create(
1378 const char *session_name,
1379 struct cds_lfht *session_triggers_ht)
1380 {
1381 struct lttng_session_trigger_list *list;
1382
1383 list = zmalloc(sizeof(*list));
1384 if (!list) {
1385 goto end;
1386 }
1387 list->session_name = session_name;
1388 CDS_INIT_LIST_HEAD(&list->list);
1389 cds_lfht_node_init(&list->session_triggers_ht_node);
1390 list->session_triggers_ht = session_triggers_ht;
1391
1392 rcu_read_lock();
1393 /* Publish the list through the session_triggers_ht. */
1394 cds_lfht_add(session_triggers_ht,
1395 hash_key_str(session_name, lttng_ht_seed),
1396 &list->session_triggers_ht_node);
1397 rcu_read_unlock();
1398 end:
1399 return list;
1400 }
1401
1402 static
1403 void free_session_trigger_list_rcu(struct rcu_head *node)
1404 {
1405 free(caa_container_of(node, struct lttng_session_trigger_list,
1406 rcu_node));
1407 }
1408
1409 static
1410 void lttng_session_trigger_list_destroy(struct lttng_session_trigger_list *list)
1411 {
1412 struct lttng_trigger_list_element *trigger_list_element, *tmp;
1413
1414 /* Empty the list element by element, and then free the list itself. */
1415 cds_list_for_each_entry_safe(trigger_list_element, tmp,
1416 &list->list, node) {
1417 cds_list_del(&trigger_list_element->node);
1418 free(trigger_list_element);
1419 }
1420 rcu_read_lock();
1421 /* Unpublish the list from the session_triggers_ht. */
1422 cds_lfht_del(list->session_triggers_ht,
1423 &list->session_triggers_ht_node);
1424 rcu_read_unlock();
1425 call_rcu(&list->rcu_node, free_session_trigger_list_rcu);
1426 }
1427
1428 static
1429 int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
1430 const struct lttng_trigger *trigger)
1431 {
1432 int ret = 0;
1433 struct lttng_trigger_list_element *new_element =
1434 zmalloc(sizeof(*new_element));
1435
1436 if (!new_element) {
1437 ret = -1;
1438 goto end;
1439 }
1440 CDS_INIT_LIST_HEAD(&new_element->node);
1441 new_element->trigger = trigger;
1442 cds_list_add(&new_element->node, &list->list);
1443 end:
1444 return ret;
1445 }
1446
1447 static
1448 bool trigger_applies_to_session(const struct lttng_trigger *trigger,
1449 const char *session_name)
1450 {
1451 bool applies = false;
1452 const struct lttng_condition *condition;
1453
1454 condition = lttng_trigger_get_const_condition(trigger);
1455 switch (lttng_condition_get_type(condition)) {
1456 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
1457 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
1458 {
1459 enum lttng_condition_status condition_status;
1460 const char *condition_session_name;
1461
1462 condition_status = lttng_condition_session_rotation_get_session_name(
1463 condition, &condition_session_name);
1464 if (condition_status != LTTNG_CONDITION_STATUS_OK) {
1465 ERR("[notification-thread] Failed to retrieve session rotation condition's session name");
1466 goto end;
1467 }
1468
1469 assert(condition_session_name);
1470 applies = !strcmp(condition_session_name, session_name);
1471 break;
1472 }
1473 default:
1474 goto end;
1475 }
1476 end:
1477 return applies;
1478 }
1479
1480 /*
1481 * Allocate and initialize an lttng_session_trigger_list which contains
1482 * all triggers that apply to the session named 'session_name'.
1483 *
1484 * No ownership of 'session_name' is assumed by the session trigger list.
1485 * It is the caller's responsability to ensure the session name is alive
1486 * for as long as this list is.
1487 */
1488 static
1489 struct lttng_session_trigger_list *lttng_session_trigger_list_build(
1490 const struct notification_thread_state *state,
1491 const char *session_name)
1492 {
1493 int trigger_count = 0;
1494 struct lttng_session_trigger_list *session_trigger_list = NULL;
1495 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1496 struct cds_lfht_iter iter;
1497
1498 session_trigger_list = lttng_session_trigger_list_create(session_name,
1499 state->session_triggers_ht);
1500
1501 /* Add all triggers applying to the session named 'session_name'. */
1502 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1503 node) {
1504 int ret;
1505
1506 if (!trigger_applies_to_session(trigger_ht_element->trigger,
1507 session_name)) {
1508 continue;
1509 }
1510
1511 ret = lttng_session_trigger_list_add(session_trigger_list,
1512 trigger_ht_element->trigger);
1513 if (ret) {
1514 goto error;
1515 }
1516
1517 trigger_count++;
1518 }
1519
1520 DBG("[notification-thread] Found %i triggers that apply to newly created session",
1521 trigger_count);
1522 return session_trigger_list;
1523 error:
1524 lttng_session_trigger_list_destroy(session_trigger_list);
1525 return NULL;
1526 }
1527
1528 static
1529 struct session_info *find_or_create_session_info(
1530 struct notification_thread_state *state,
1531 const char *name, uid_t uid, gid_t gid)
1532 {
1533 struct session_info *session = NULL;
1534 struct cds_lfht_node *node;
1535 struct cds_lfht_iter iter;
1536 struct lttng_session_trigger_list *trigger_list;
1537
1538 rcu_read_lock();
1539 cds_lfht_lookup(state->sessions_ht,
1540 hash_key_str(name, lttng_ht_seed),
1541 match_session,
1542 name,
1543 &iter);
1544 node = cds_lfht_iter_get_node(&iter);
1545 if (node) {
1546 DBG("[notification-thread] Found session info of session \"%s\" (uid = %i, gid = %i)",
1547 name, uid, gid);
1548 session = caa_container_of(node, struct session_info,
1549 sessions_ht_node);
1550 assert(session->uid == uid);
1551 assert(session->gid == gid);
1552 session_info_get(session);
1553 goto end;
1554 }
1555
1556 trigger_list = lttng_session_trigger_list_build(state, name);
1557 if (!trigger_list) {
1558 goto error;
1559 }
1560
1561 session = session_info_create(name, uid, gid, trigger_list,
1562 state->sessions_ht);
1563 if (!session) {
1564 ERR("[notification-thread] Failed to allocation session info for session \"%s\" (uid = %i, gid = %i)",
1565 name, uid, gid);
1566 lttng_session_trigger_list_destroy(trigger_list);
1567 goto error;
1568 }
1569 trigger_list = NULL;
1570
1571 cds_lfht_add(state->sessions_ht, hash_key_str(name, lttng_ht_seed),
1572 &session->sessions_ht_node);
1573 end:
1574 rcu_read_unlock();
1575 return session;
1576 error:
1577 rcu_read_unlock();
1578 session_info_put(session);
1579 return NULL;
1580 }
1581
1582 static
1583 int handle_notification_thread_command_add_channel(
1584 struct notification_thread_state *state,
1585 const char *session_name, uid_t session_uid, gid_t session_gid,
1586 const char *channel_name, enum lttng_domain_type channel_domain,
1587 uint64_t channel_key_int, uint64_t channel_capacity,
1588 enum lttng_error_code *cmd_result)
1589 {
1590 struct cds_list_head trigger_list;
1591 struct channel_info *new_channel_info = NULL;
1592 struct channel_key channel_key = {
1593 .key = channel_key_int,
1594 .domain = channel_domain,
1595 };
1596 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
1597 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1598 int trigger_count = 0;
1599 struct cds_lfht_iter iter;
1600 struct session_info *session_info = NULL;
1601
1602 DBG("[notification-thread] Adding channel %s from session %s, channel key = %" PRIu64 " in %s domain",
1603 channel_name, session_name, channel_key_int,
1604 channel_domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
1605
1606 CDS_INIT_LIST_HEAD(&trigger_list);
1607
1608 session_info = find_or_create_session_info(state, session_name,
1609 session_uid, session_gid);
1610 if (!session_info) {
1611 /* Allocation error or an internal error occurred. */
1612 goto error;
1613 }
1614
1615 new_channel_info = channel_info_create(channel_name, &channel_key,
1616 channel_capacity, session_info);
1617 if (!new_channel_info) {
1618 goto error;
1619 }
1620
1621 rcu_read_lock();
1622 /* Build a list of all triggers applying to the new channel. */
1623 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1624 node) {
1625 struct lttng_trigger_list_element *new_element;
1626
1627 if (!trigger_applies_to_channel(trigger_ht_element->trigger,
1628 new_channel_info)) {
1629 continue;
1630 }
1631
1632 new_element = zmalloc(sizeof(*new_element));
1633 if (!new_element) {
1634 rcu_read_unlock();
1635 goto error;
1636 }
1637 CDS_INIT_LIST_HEAD(&new_element->node);
1638 new_element->trigger = trigger_ht_element->trigger;
1639 cds_list_add(&new_element->node, &trigger_list);
1640 trigger_count++;
1641 }
1642 rcu_read_unlock();
1643
1644 DBG("[notification-thread] Found %i triggers that apply to newly added channel",
1645 trigger_count);
1646 channel_trigger_list = zmalloc(sizeof(*channel_trigger_list));
1647 if (!channel_trigger_list) {
1648 goto error;
1649 }
1650 channel_trigger_list->channel_key = new_channel_info->key;
1651 CDS_INIT_LIST_HEAD(&channel_trigger_list->list);
1652 cds_lfht_node_init(&channel_trigger_list->channel_triggers_ht_node);
1653 cds_list_splice(&trigger_list, &channel_trigger_list->list);
1654
1655 rcu_read_lock();
1656 /* Add channel to the channel_ht which owns the channel_infos. */
1657 cds_lfht_add(state->channels_ht,
1658 hash_channel_key(&new_channel_info->key),
1659 &new_channel_info->channels_ht_node);
1660 /*
1661 * Add the list of triggers associated with this channel to the
1662 * channel_triggers_ht.
1663 */
1664 cds_lfht_add(state->channel_triggers_ht,
1665 hash_channel_key(&new_channel_info->key),
1666 &channel_trigger_list->channel_triggers_ht_node);
1667 rcu_read_unlock();
1668 session_info_put(session_info);
1669 *cmd_result = LTTNG_OK;
1670 return 0;
1671 error:
1672 channel_info_destroy(new_channel_info);
1673 session_info_put(session_info);
1674 return 1;
1675 }
1676
1677 static
1678 void free_channel_trigger_list_rcu(struct rcu_head *node)
1679 {
1680 free(caa_container_of(node, struct lttng_channel_trigger_list,
1681 rcu_node));
1682 }
1683
1684 static
1685 void free_channel_state_sample_rcu(struct rcu_head *node)
1686 {
1687 free(caa_container_of(node, struct channel_state_sample,
1688 rcu_node));
1689 }
1690
1691 static
1692 int handle_notification_thread_command_remove_channel(
1693 struct notification_thread_state *state,
1694 uint64_t channel_key, enum lttng_domain_type domain,
1695 enum lttng_error_code *cmd_result)
1696 {
1697 struct cds_lfht_node *node;
1698 struct cds_lfht_iter iter;
1699 struct lttng_channel_trigger_list *trigger_list;
1700 struct lttng_trigger_list_element *trigger_list_element, *tmp;
1701 struct channel_key key = { .key = channel_key, .domain = domain };
1702 struct channel_info *channel_info;
1703
1704 DBG("[notification-thread] Removing channel key = %" PRIu64 " in %s domain",
1705 channel_key, domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
1706
1707 rcu_read_lock();
1708
1709 cds_lfht_lookup(state->channel_triggers_ht,
1710 hash_channel_key(&key),
1711 match_channel_trigger_list,
1712 &key,
1713 &iter);
1714 node = cds_lfht_iter_get_node(&iter);
1715 /*
1716 * There is a severe internal error if we are being asked to remove a
1717 * channel that doesn't exist.
1718 */
1719 if (!node) {
1720 ERR("[notification-thread] Channel being removed is unknown to the notification thread");
1721 goto end;
1722 }
1723
1724 /* Free the list of triggers associated with this channel. */
1725 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
1726 channel_triggers_ht_node);
1727 cds_list_for_each_entry_safe(trigger_list_element, tmp,
1728 &trigger_list->list, node) {
1729 cds_list_del(&trigger_list_element->node);
1730 free(trigger_list_element);
1731 }
1732 cds_lfht_del(state->channel_triggers_ht, node);
1733 call_rcu(&trigger_list->rcu_node, free_channel_trigger_list_rcu);
1734
1735 /* Free sampled channel state. */
1736 cds_lfht_lookup(state->channel_state_ht,
1737 hash_channel_key(&key),
1738 match_channel_state_sample,
1739 &key,
1740 &iter);
1741 node = cds_lfht_iter_get_node(&iter);
1742 /*
1743 * This is expected to be NULL if the channel is destroyed before we
1744 * received a sample.
1745 */
1746 if (node) {
1747 struct channel_state_sample *sample = caa_container_of(node,
1748 struct channel_state_sample,
1749 channel_state_ht_node);
1750
1751 cds_lfht_del(state->channel_state_ht, node);
1752 call_rcu(&sample->rcu_node, free_channel_state_sample_rcu);
1753 }
1754
1755 /* Remove the channel from the channels_ht and free it. */
1756 cds_lfht_lookup(state->channels_ht,
1757 hash_channel_key(&key),
1758 match_channel_info,
1759 &key,
1760 &iter);
1761 node = cds_lfht_iter_get_node(&iter);
1762 assert(node);
1763 channel_info = caa_container_of(node, struct channel_info,
1764 channels_ht_node);
1765 cds_lfht_del(state->channels_ht, node);
1766 channel_info_destroy(channel_info);
1767 end:
1768 rcu_read_unlock();
1769 *cmd_result = LTTNG_OK;
1770 return 0;
1771 }
1772
1773 static
1774 int handle_notification_thread_command_session_rotation(
1775 struct notification_thread_state *state,
1776 enum notification_thread_command_type cmd_type,
1777 const char *session_name, uid_t session_uid, gid_t session_gid,
1778 uint64_t trace_archive_chunk_id,
1779 struct lttng_trace_archive_location *location,
1780 enum lttng_error_code *_cmd_result)
1781 {
1782 int ret = 0;
1783 enum lttng_error_code cmd_result = LTTNG_OK;
1784 struct lttng_session_trigger_list *trigger_list;
1785 struct lttng_trigger_list_element *trigger_list_element;
1786 struct session_info *session_info;
1787
1788 rcu_read_lock();
1789
1790 session_info = find_or_create_session_info(state, session_name,
1791 session_uid, session_gid);
1792 if (!session_info) {
1793 /* Allocation error or an internal error occurred. */
1794 ret = -1;
1795 cmd_result = LTTNG_ERR_NOMEM;
1796 goto end;
1797 }
1798
1799 session_info->rotation.ongoing =
1800 cmd_type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING;
1801 session_info->rotation.id = trace_archive_chunk_id;
1802 trigger_list = get_session_trigger_list(state, session_name);
1803 if (!trigger_list) {
1804 DBG("[notification-thread] No triggers applying to session \"%s\" found",
1805 session_name);
1806 goto end;
1807 }
1808
1809 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
1810 node) {
1811 const struct lttng_condition *condition;
1812 const struct lttng_action *action;
1813 const struct lttng_trigger *trigger;
1814 struct notification_client_list *client_list;
1815 struct lttng_evaluation *evaluation = NULL;
1816 enum lttng_condition_type condition_type;
1817
1818 trigger = trigger_list_element->trigger;
1819 condition = lttng_trigger_get_const_condition(trigger);
1820 assert(condition);
1821 condition_type = lttng_condition_get_type(condition);
1822
1823 if (condition_type == LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING &&
1824 cmd_type != NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING) {
1825 continue;
1826 } else if (condition_type == LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED &&
1827 cmd_type != NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED) {
1828 continue;
1829 }
1830
1831 action = lttng_trigger_get_const_action(trigger);
1832
1833 /* Notify actions are the only type currently supported. */
1834 assert(lttng_action_get_type_const(action) ==
1835 LTTNG_ACTION_TYPE_NOTIFY);
1836
1837 client_list = get_client_list_from_condition(state, condition);
1838 assert(client_list);
1839
1840 if (cds_list_empty(&client_list->list)) {
1841 /*
1842 * No clients interested in the evaluation's result,
1843 * skip it.
1844 */
1845 continue;
1846 }
1847
1848 if (cmd_type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING) {
1849 evaluation = lttng_evaluation_session_rotation_ongoing_create(
1850 trace_archive_chunk_id);
1851 } else {
1852 evaluation = lttng_evaluation_session_rotation_completed_create(
1853 trace_archive_chunk_id, location);
1854 }
1855
1856 if (!evaluation) {
1857 /* Internal error */
1858 ret = -1;
1859 cmd_result = LTTNG_ERR_UNK;
1860 goto end;
1861 }
1862
1863 /* Dispatch evaluation result to all clients. */
1864 ret = send_evaluation_to_clients(trigger_list_element->trigger,
1865 evaluation, client_list, state,
1866 session_info->uid,
1867 session_info->gid);
1868 lttng_evaluation_destroy(evaluation);
1869 if (caa_unlikely(ret)) {
1870 goto end;
1871 }
1872 }
1873 end:
1874 session_info_put(session_info);
1875 *_cmd_result = cmd_result;
1876 rcu_read_unlock();
1877 return ret;
1878 }
1879
1880 static
1881 int condition_is_supported(struct lttng_condition *condition)
1882 {
1883 int ret;
1884
1885 switch (lttng_condition_get_type(condition)) {
1886 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
1887 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
1888 {
1889 enum lttng_domain_type domain;
1890
1891 ret = lttng_condition_buffer_usage_get_domain_type(condition,
1892 &domain);
1893 if (ret) {
1894 ret = -1;
1895 goto end;
1896 }
1897
1898 if (domain != LTTNG_DOMAIN_KERNEL) {
1899 ret = 1;
1900 goto end;
1901 }
1902
1903 /*
1904 * Older kernel tracers don't expose the API to monitor their
1905 * buffers. Therefore, we reject triggers that require that
1906 * mechanism to be available to be evaluated.
1907 */
1908 ret = kernel_supports_ring_buffer_snapshot_sample_positions();
1909 break;
1910 }
1911 default:
1912 ret = 1;
1913 }
1914 end:
1915 return ret;
1916 }
1917
1918 /* Must be called with RCU read lock held. */
1919 static
1920 int bind_trigger_to_matching_session(const struct lttng_trigger *trigger,
1921 struct notification_thread_state *state)
1922 {
1923 int ret = 0;
1924 const struct lttng_condition *condition;
1925 const char *session_name;
1926 struct lttng_session_trigger_list *trigger_list;
1927
1928 condition = lttng_trigger_get_const_condition(trigger);
1929 switch (lttng_condition_get_type(condition)) {
1930 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
1931 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
1932 {
1933 enum lttng_condition_status status;
1934
1935 status = lttng_condition_session_rotation_get_session_name(
1936 condition, &session_name);
1937 if (status != LTTNG_CONDITION_STATUS_OK) {
1938 ERR("[notification-thread] Failed to bind trigger to session: unable to get 'session_rotation' condition's session name");
1939 ret = -1;
1940 goto end;
1941 }
1942 break;
1943 }
1944 default:
1945 ret = -1;
1946 goto end;
1947 }
1948
1949 trigger_list = get_session_trigger_list(state, session_name);
1950 if (!trigger_list) {
1951 DBG("[notification-thread] Unable to bind trigger applying to session \"%s\" as it is not yet known to the notification system",
1952 session_name);
1953 goto end;
1954
1955 }
1956
1957 DBG("[notification-thread] Newly registered trigger bound to session \"%s\"",
1958 session_name);
1959 ret = lttng_session_trigger_list_add(trigger_list, trigger);
1960 end:
1961 return ret;
1962 }
1963
1964 /* Must be called with RCU read lock held. */
1965 static
1966 int bind_trigger_to_matching_channels(const struct lttng_trigger *trigger,
1967 struct notification_thread_state *state)
1968 {
1969 int ret = 0;
1970 struct cds_lfht_node *node;
1971 struct cds_lfht_iter iter;
1972 struct channel_info *channel;
1973
1974 cds_lfht_for_each_entry(state->channels_ht, &iter, channel,
1975 channels_ht_node) {
1976 struct lttng_trigger_list_element *trigger_list_element;
1977 struct lttng_channel_trigger_list *trigger_list;
1978 struct cds_lfht_iter lookup_iter;
1979
1980 if (!trigger_applies_to_channel(trigger, channel)) {
1981 continue;
1982 }
1983
1984 cds_lfht_lookup(state->channel_triggers_ht,
1985 hash_channel_key(&channel->key),
1986 match_channel_trigger_list,
1987 &channel->key,
1988 &lookup_iter);
1989 node = cds_lfht_iter_get_node(&lookup_iter);
1990 assert(node);
1991 trigger_list = caa_container_of(node,
1992 struct lttng_channel_trigger_list,
1993 channel_triggers_ht_node);
1994
1995 trigger_list_element = zmalloc(sizeof(*trigger_list_element));
1996 if (!trigger_list_element) {
1997 ret = -1;
1998 goto end;
1999 }
2000 CDS_INIT_LIST_HEAD(&trigger_list_element->node);
2001 trigger_list_element->trigger = trigger;
2002 cds_list_add(&trigger_list_element->node, &trigger_list->list);
2003 DBG("[notification-thread] Newly registered trigger bound to channel \"%s\"",
2004 channel->name);
2005 }
2006 end:
2007 return ret;
2008 }
2009
2010 /*
2011 * FIXME A client's credentials are not checked when registering a trigger, nor
2012 * are they stored alongside with the trigger.
2013 *
2014 * The effects of this are benign since:
2015 * - The client will succeed in registering the trigger, as it is valid,
2016 * - The trigger will, internally, be bound to the channel/session,
2017 * - The notifications will not be sent since the client's credentials
2018 * are checked against the channel at that moment.
2019 *
2020 * If this function returns a non-zero value, it means something is
2021 * fundamentally broken and the whole subsystem/thread will be torn down.
2022 *
2023 * If a non-fatal error occurs, just set the cmd_result to the appropriate
2024 * error code.
2025 */
2026 static
2027 int handle_notification_thread_command_register_trigger(
2028 struct notification_thread_state *state,
2029 struct lttng_trigger *trigger,
2030 enum lttng_error_code *cmd_result)
2031 {
2032 int ret = 0;
2033 struct lttng_condition *condition;
2034 struct notification_client *client;
2035 struct notification_client_list *client_list = NULL;
2036 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
2037 struct notification_client_list_element *client_list_element, *tmp;
2038 struct cds_lfht_node *node;
2039 struct cds_lfht_iter iter;
2040 bool free_trigger = true;
2041
2042 rcu_read_lock();
2043
2044 condition = lttng_trigger_get_condition(trigger);
2045 assert(condition);
2046
2047 ret = condition_is_supported(condition);
2048 if (ret < 0) {
2049 goto error;
2050 } else if (ret == 0) {
2051 *cmd_result = LTTNG_ERR_NOT_SUPPORTED;
2052 goto error;
2053 } else {
2054 /* Feature is supported, continue. */
2055 ret = 0;
2056 }
2057
2058 trigger_ht_element = zmalloc(sizeof(*trigger_ht_element));
2059 if (!trigger_ht_element) {
2060 ret = -1;
2061 goto error;
2062 }
2063
2064 /* Add trigger to the trigger_ht. */
2065 cds_lfht_node_init(&trigger_ht_element->node);
2066 trigger_ht_element->trigger = trigger;
2067
2068 node = cds_lfht_add_unique(state->triggers_ht,
2069 lttng_condition_hash(condition),
2070 match_condition,
2071 condition,
2072 &trigger_ht_element->node);
2073 if (node != &trigger_ht_element->node) {
2074 /* Not a fatal error, simply report it to the client. */
2075 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
2076 goto error_free_ht_element;
2077 }
2078
2079 /*
2080 * Ownership of the trigger and of its wrapper was transfered to
2081 * the triggers_ht.
2082 */
2083 trigger_ht_element = NULL;
2084 free_trigger = false;
2085
2086 /*
2087 * The rest only applies to triggers that have a "notify" action.
2088 * It is not skipped as this is the only action type currently
2089 * supported.
2090 */
2091 client_list = zmalloc(sizeof(*client_list));
2092 if (!client_list) {
2093 ret = -1;
2094 goto error_free_ht_element;
2095 }
2096 cds_lfht_node_init(&client_list->notification_trigger_ht_node);
2097 CDS_INIT_LIST_HEAD(&client_list->list);
2098 client_list->trigger = trigger;
2099
2100 /* Build a list of clients to which this new trigger applies. */
2101 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
2102 client_socket_ht_node) {
2103 if (!trigger_applies_to_client(trigger, client)) {
2104 continue;
2105 }
2106
2107 client_list_element = zmalloc(sizeof(*client_list_element));
2108 if (!client_list_element) {
2109 ret = -1;
2110 goto error_free_client_list;
2111 }
2112 CDS_INIT_LIST_HEAD(&client_list_element->node);
2113 client_list_element->client = client;
2114 cds_list_add(&client_list_element->node, &client_list->list);
2115 }
2116
2117 cds_lfht_add(state->notification_trigger_clients_ht,
2118 lttng_condition_hash(condition),
2119 &client_list->notification_trigger_ht_node);
2120
2121 switch (get_condition_binding_object(condition)) {
2122 case LTTNG_OBJECT_TYPE_SESSION:
2123 /* Add the trigger to the list if it matches a known session. */
2124 ret = bind_trigger_to_matching_session(trigger, state);
2125 if (ret) {
2126 goto error_free_client_list;
2127 }
2128 break;
2129 case LTTNG_OBJECT_TYPE_CHANNEL:
2130 /*
2131 * Add the trigger to list of triggers bound to the channels
2132 * currently known.
2133 */
2134 ret = bind_trigger_to_matching_channels(trigger, state);
2135 if (ret) {
2136 goto error_free_client_list;
2137 }
2138 break;
2139 case LTTNG_OBJECT_TYPE_NONE:
2140 break;
2141 default:
2142 ERR("[notification-thread] Unknown object type on which to bind a newly registered trigger was encountered");
2143 ret = -1;
2144 goto error_free_client_list;
2145 }
2146
2147 /*
2148 * Since there is nothing preventing clients from subscribing to a
2149 * condition before the corresponding trigger is registered, we have
2150 * to evaluate this new condition right away.
2151 *
2152 * At some point, we were waiting for the next "evaluation" (e.g. on
2153 * reception of a channel sample) to evaluate this new condition, but
2154 * that was broken.
2155 *
2156 * The reason it was broken is that waiting for the next sample
2157 * does not allow us to properly handle transitions for edge-triggered
2158 * conditions.
2159 *
2160 * Consider this example: when we handle a new channel sample, we
2161 * evaluate each conditions twice: once with the previous state, and
2162 * again with the newest state. We then use those two results to
2163 * determine whether a state change happened: a condition was false and
2164 * became true. If a state change happened, we have to notify clients.
2165 *
2166 * Now, if a client subscribes to a given notification and registers
2167 * a trigger *after* that subscription, we have to make sure the
2168 * condition is evaluated at this point while considering only the
2169 * current state. Otherwise, the next evaluation cycle may only see
2170 * that the evaluations remain the same (true for samples n-1 and n) and
2171 * the client will never know that the condition has been met.
2172 */
2173 cds_list_for_each_entry_safe(client_list_element, tmp,
2174 &client_list->list, node) {
2175 ret = evaluate_condition_for_client(trigger, condition,
2176 client_list_element->client, state);
2177 if (ret) {
2178 goto error_free_client_list;
2179 }
2180 }
2181
2182 /*
2183 * Client list ownership transferred to the
2184 * notification_trigger_clients_ht.
2185 */
2186 client_list = NULL;
2187
2188 *cmd_result = LTTNG_OK;
2189 error_free_client_list:
2190 if (client_list) {
2191 cds_list_for_each_entry_safe(client_list_element, tmp,
2192 &client_list->list, node) {
2193 free(client_list_element);
2194 }
2195 free(client_list);
2196 }
2197 error_free_ht_element:
2198 free(trigger_ht_element);
2199 error:
2200 if (free_trigger) {
2201 lttng_trigger_destroy(trigger);
2202 }
2203 rcu_read_unlock();
2204 return ret;
2205 }
2206
2207 static
2208 void free_notification_client_list_rcu(struct rcu_head *node)
2209 {
2210 free(caa_container_of(node, struct notification_client_list,
2211 rcu_node));
2212 }
2213
2214 static
2215 void free_lttng_trigger_ht_element_rcu(struct rcu_head *node)
2216 {
2217 free(caa_container_of(node, struct lttng_trigger_ht_element,
2218 rcu_node));
2219 }
2220
2221 static
2222 int handle_notification_thread_command_unregister_trigger(
2223 struct notification_thread_state *state,
2224 struct lttng_trigger *trigger,
2225 enum lttng_error_code *_cmd_reply)
2226 {
2227 struct cds_lfht_iter iter;
2228 struct cds_lfht_node *triggers_ht_node;
2229 struct lttng_channel_trigger_list *trigger_list;
2230 struct notification_client_list *client_list;
2231 struct notification_client_list_element *client_list_element, *tmp;
2232 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
2233 struct lttng_condition *condition = lttng_trigger_get_condition(
2234 trigger);
2235 enum lttng_error_code cmd_reply;
2236
2237 rcu_read_lock();
2238
2239 cds_lfht_lookup(state->triggers_ht,
2240 lttng_condition_hash(condition),
2241 match_condition,
2242 condition,
2243 &iter);
2244 triggers_ht_node = cds_lfht_iter_get_node(&iter);
2245 if (!triggers_ht_node) {
2246 cmd_reply = LTTNG_ERR_TRIGGER_NOT_FOUND;
2247 goto end;
2248 } else {
2249 cmd_reply = LTTNG_OK;
2250 }
2251
2252 /* Remove trigger from channel_triggers_ht. */
2253 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter, trigger_list,
2254 channel_triggers_ht_node) {
2255 struct lttng_trigger_list_element *trigger_element, *tmp;
2256
2257 cds_list_for_each_entry_safe(trigger_element, tmp,
2258 &trigger_list->list, node) {
2259 const struct lttng_condition *current_condition =
2260 lttng_trigger_get_const_condition(
2261 trigger_element->trigger);
2262
2263 assert(current_condition);
2264 if (!lttng_condition_is_equal(condition,
2265 current_condition)) {
2266 continue;
2267 }
2268
2269 DBG("[notification-thread] Removed trigger from channel_triggers_ht");
2270 cds_list_del(&trigger_element->node);
2271 /* A trigger can only appear once per channel */
2272 break;
2273 }
2274 }
2275
2276 /*
2277 * Remove and release the client list from
2278 * notification_trigger_clients_ht.
2279 */
2280 client_list = get_client_list_from_condition(state, condition);
2281 assert(client_list);
2282
2283 cds_list_for_each_entry_safe(client_list_element, tmp,
2284 &client_list->list, node) {
2285 free(client_list_element);
2286 }
2287 cds_lfht_del(state->notification_trigger_clients_ht,
2288 &client_list->notification_trigger_ht_node);
2289 call_rcu(&client_list->rcu_node, free_notification_client_list_rcu);
2290
2291 /* Remove trigger from triggers_ht. */
2292 trigger_ht_element = caa_container_of(triggers_ht_node,
2293 struct lttng_trigger_ht_element, node);
2294 cds_lfht_del(state->triggers_ht, triggers_ht_node);
2295
2296 /* Release the ownership of the trigger. */
2297 lttng_trigger_destroy(trigger_ht_element->trigger);
2298 call_rcu(&trigger_ht_element->rcu_node, free_lttng_trigger_ht_element_rcu);
2299 end:
2300 rcu_read_unlock();
2301 if (_cmd_reply) {
2302 *_cmd_reply = cmd_reply;
2303 }
2304 return 0;
2305 }
2306
2307 /* Returns 0 on success, 1 on exit requested, negative value on error. */
2308 int handle_notification_thread_command(
2309 struct notification_thread_handle *handle,
2310 struct notification_thread_state *state)
2311 {
2312 int ret;
2313 uint64_t counter;
2314 struct notification_thread_command *cmd;
2315
2316 /* Read the event pipe to put it back into a quiescent state. */
2317 ret = lttng_read(lttng_pipe_get_readfd(handle->cmd_queue.event_pipe), &counter,
2318 sizeof(counter));
2319 if (ret != sizeof(counter)) {
2320 goto error;
2321 }
2322
2323 pthread_mutex_lock(&handle->cmd_queue.lock);
2324 cmd = cds_list_first_entry(&handle->cmd_queue.list,
2325 struct notification_thread_command, cmd_list_node);
2326 switch (cmd->type) {
2327 case NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER:
2328 DBG("[notification-thread] Received register trigger command");
2329 ret = handle_notification_thread_command_register_trigger(
2330 state, cmd->parameters.trigger,
2331 &cmd->reply_code);
2332 break;
2333 case NOTIFICATION_COMMAND_TYPE_UNREGISTER_TRIGGER:
2334 DBG("[notification-thread] Received unregister trigger command");
2335 ret = handle_notification_thread_command_unregister_trigger(
2336 state, cmd->parameters.trigger,
2337 &cmd->reply_code);
2338 break;
2339 case NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL:
2340 DBG("[notification-thread] Received add channel command");
2341 ret = handle_notification_thread_command_add_channel(
2342 state,
2343 cmd->parameters.add_channel.session.name,
2344 cmd->parameters.add_channel.session.uid,
2345 cmd->parameters.add_channel.session.gid,
2346 cmd->parameters.add_channel.channel.name,
2347 cmd->parameters.add_channel.channel.domain,
2348 cmd->parameters.add_channel.channel.key,
2349 cmd->parameters.add_channel.channel.capacity,
2350 &cmd->reply_code);
2351 break;
2352 case NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL:
2353 DBG("[notification-thread] Received remove channel command");
2354 ret = handle_notification_thread_command_remove_channel(
2355 state, cmd->parameters.remove_channel.key,
2356 cmd->parameters.remove_channel.domain,
2357 &cmd->reply_code);
2358 break;
2359 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING:
2360 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED:
2361 DBG("[notification-thread] Received session rotation %s command",
2362 cmd->type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING ?
2363 "ongoing" : "completed");
2364 ret = handle_notification_thread_command_session_rotation(
2365 state,
2366 cmd->type,
2367 cmd->parameters.session_rotation.session_name,
2368 cmd->parameters.session_rotation.uid,
2369 cmd->parameters.session_rotation.gid,
2370 cmd->parameters.session_rotation.trace_archive_chunk_id,
2371 cmd->parameters.session_rotation.location,
2372 &cmd->reply_code);
2373 break;
2374 case NOTIFICATION_COMMAND_TYPE_QUIT:
2375 DBG("[notification-thread] Received quit command");
2376 cmd->reply_code = LTTNG_OK;
2377 ret = 1;
2378 goto end;
2379 default:
2380 ERR("[notification-thread] Unknown internal command received");
2381 goto error_unlock;
2382 }
2383
2384 if (ret) {
2385 goto error_unlock;
2386 }
2387 end:
2388 cds_list_del(&cmd->cmd_list_node);
2389 lttng_waiter_wake_up(&cmd->reply_waiter);
2390 pthread_mutex_unlock(&handle->cmd_queue.lock);
2391 return ret;
2392 error_unlock:
2393 /* Wake-up and return a fatal error to the calling thread. */
2394 lttng_waiter_wake_up(&cmd->reply_waiter);
2395 pthread_mutex_unlock(&handle->cmd_queue.lock);
2396 cmd->reply_code = LTTNG_ERR_FATAL;
2397 error:
2398 /* Indicate a fatal error to the caller. */
2399 return -1;
2400 }
2401
2402 static
2403 int socket_set_non_blocking(int socket)
2404 {
2405 int ret, flags;
2406
2407 /* Set the pipe as non-blocking. */
2408 ret = fcntl(socket, F_GETFL, 0);
2409 if (ret == -1) {
2410 PERROR("fcntl get socket flags");
2411 goto end;
2412 }
2413 flags = ret;
2414
2415 ret = fcntl(socket, F_SETFL, flags | O_NONBLOCK);
2416 if (ret == -1) {
2417 PERROR("fcntl set O_NONBLOCK socket flag");
2418 goto end;
2419 }
2420 DBG("Client socket (fd = %i) set as non-blocking", socket);
2421 end:
2422 return ret;
2423 }
2424
2425 static
2426 int client_reset_inbound_state(struct notification_client *client)
2427 {
2428 int ret;
2429
2430 ret = lttng_dynamic_buffer_set_size(
2431 &client->communication.inbound.buffer, 0);
2432 assert(!ret);
2433
2434 client->communication.inbound.bytes_to_receive =
2435 sizeof(struct lttng_notification_channel_message);
2436 client->communication.inbound.msg_type =
2437 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN;
2438 LTTNG_SOCK_SET_UID_CRED(&client->communication.inbound.creds, -1);
2439 LTTNG_SOCK_SET_GID_CRED(&client->communication.inbound.creds, -1);
2440 ret = lttng_dynamic_buffer_set_size(
2441 &client->communication.inbound.buffer,
2442 client->communication.inbound.bytes_to_receive);
2443 return ret;
2444 }
2445
2446 int handle_notification_thread_client_connect(
2447 struct notification_thread_state *state)
2448 {
2449 int ret;
2450 struct notification_client *client;
2451
2452 DBG("[notification-thread] Handling new notification channel client connection");
2453
2454 client = zmalloc(sizeof(*client));
2455 if (!client) {
2456 /* Fatal error. */
2457 ret = -1;
2458 goto error;
2459 }
2460 client->id = state->next_notification_client_id++;
2461 CDS_INIT_LIST_HEAD(&client->condition_list);
2462 lttng_dynamic_buffer_init(&client->communication.inbound.buffer);
2463 lttng_dynamic_buffer_init(&client->communication.outbound.buffer);
2464 client->communication.inbound.expect_creds = true;
2465 ret = client_reset_inbound_state(client);
2466 if (ret) {
2467 ERR("[notification-thread] Failed to reset client communication's inbound state");
2468 ret = 0;
2469 goto error;
2470 }
2471
2472 ret = lttcomm_accept_unix_sock(state->notification_channel_socket);
2473 if (ret < 0) {
2474 ERR("[notification-thread] Failed to accept new notification channel client connection");
2475 ret = 0;
2476 goto error;
2477 }
2478
2479 client->socket = ret;
2480
2481 ret = socket_set_non_blocking(client->socket);
2482 if (ret) {
2483 ERR("[notification-thread] Failed to set new notification channel client connection socket as non-blocking");
2484 goto error;
2485 }
2486
2487 ret = lttcomm_setsockopt_creds_unix_sock(client->socket);
2488 if (ret < 0) {
2489 ERR("[notification-thread] Failed to set socket options on new notification channel client socket");
2490 ret = 0;
2491 goto error;
2492 }
2493
2494 ret = lttng_poll_add(&state->events, client->socket,
2495 LPOLLIN | LPOLLERR |
2496 LPOLLHUP | LPOLLRDHUP);
2497 if (ret < 0) {
2498 ERR("[notification-thread] Failed to add notification channel client socket to poll set");
2499 ret = 0;
2500 goto error;
2501 }
2502 DBG("[notification-thread] Added new notification channel client socket (%i) to poll set",
2503 client->socket);
2504
2505 rcu_read_lock();
2506 cds_lfht_add(state->client_socket_ht,
2507 hash_client_socket(client->socket),
2508 &client->client_socket_ht_node);
2509 cds_lfht_add(state->client_id_ht,
2510 hash_client_id(client->id),
2511 &client->client_id_ht_node);
2512 rcu_read_unlock();
2513
2514 return ret;
2515 error:
2516 notification_client_destroy(client, state);
2517 return ret;
2518 }
2519
2520 int handle_notification_thread_client_disconnect(
2521 int client_socket,
2522 struct notification_thread_state *state)
2523 {
2524 int ret = 0;
2525 struct notification_client *client;
2526
2527 rcu_read_lock();
2528 DBG("[notification-thread] Closing client connection (socket fd = %i)",
2529 client_socket);
2530 client = get_client_from_socket(client_socket, state);
2531 if (!client) {
2532 /* Internal state corruption, fatal error. */
2533 ERR("[notification-thread] Unable to find client (socket fd = %i)",
2534 client_socket);
2535 ret = -1;
2536 goto end;
2537 }
2538
2539 ret = lttng_poll_del(&state->events, client_socket);
2540 if (ret) {
2541 ERR("[notification-thread] Failed to remove client socket from poll set");
2542 }
2543 cds_lfht_del(state->client_socket_ht,
2544 &client->client_socket_ht_node);
2545 cds_lfht_del(state->client_id_ht,
2546 &client->client_id_ht_node);
2547 notification_client_destroy(client, state);
2548 end:
2549 rcu_read_unlock();
2550 return ret;
2551 }
2552
2553 int handle_notification_thread_client_disconnect_all(
2554 struct notification_thread_state *state)
2555 {
2556 struct cds_lfht_iter iter;
2557 struct notification_client *client;
2558 bool error_encoutered = false;
2559
2560 rcu_read_lock();
2561 DBG("[notification-thread] Closing all client connections");
2562 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
2563 client_socket_ht_node) {
2564 int ret;
2565
2566 ret = handle_notification_thread_client_disconnect(
2567 client->socket, state);
2568 if (ret) {
2569 error_encoutered = true;
2570 }
2571 }
2572 rcu_read_unlock();
2573 return error_encoutered ? 1 : 0;
2574 }
2575
2576 int handle_notification_thread_trigger_unregister_all(
2577 struct notification_thread_state *state)
2578 {
2579 bool error_occurred = false;
2580 struct cds_lfht_iter iter;
2581 struct lttng_trigger_ht_element *trigger_ht_element;
2582
2583 rcu_read_lock();
2584 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
2585 node) {
2586 int ret = handle_notification_thread_command_unregister_trigger(
2587 state, trigger_ht_element->trigger, NULL);
2588 if (ret) {
2589 error_occurred = true;
2590 }
2591 }
2592 rcu_read_unlock();
2593 return error_occurred ? -1 : 0;
2594 }
2595
2596 static
2597 int client_flush_outgoing_queue(struct notification_client *client,
2598 struct notification_thread_state *state)
2599 {
2600 ssize_t ret;
2601 size_t to_send_count;
2602
2603 assert(client->communication.outbound.buffer.size != 0);
2604 to_send_count = client->communication.outbound.buffer.size;
2605 DBG("[notification-thread] Flushing client (socket fd = %i) outgoing queue",
2606 client->socket);
2607
2608 ret = lttcomm_send_unix_sock_non_block(client->socket,
2609 client->communication.outbound.buffer.data,
2610 to_send_count);
2611 if ((ret >= 0 && ret < to_send_count)) {
2612 DBG("[notification-thread] Client (socket fd = %i) outgoing queue could not be completely flushed",
2613 client->socket);
2614 to_send_count -= max(ret, 0);
2615
2616 memcpy(client->communication.outbound.buffer.data,
2617 client->communication.outbound.buffer.data +
2618 client->communication.outbound.buffer.size - to_send_count,
2619 to_send_count);
2620 ret = lttng_dynamic_buffer_set_size(
2621 &client->communication.outbound.buffer,
2622 to_send_count);
2623 if (ret) {
2624 goto error;
2625 }
2626
2627 /*
2628 * We want to be notified whenever there is buffer space
2629 * available to send the rest of the payload.
2630 */
2631 ret = lttng_poll_mod(&state->events, client->socket,
2632 CLIENT_POLL_MASK_IN_OUT);
2633 if (ret) {
2634 goto error;
2635 }
2636 } else if (ret < 0) {
2637 /* Generic error, disconnect the client. */
2638 ERR("[notification-thread] Failed to send flush outgoing queue, disconnecting client (socket fd = %i)",
2639 client->socket);
2640 ret = handle_notification_thread_client_disconnect(
2641 client->socket, state);
2642 if (ret) {
2643 goto error;
2644 }
2645 } else {
2646 /* No error and flushed the queue completely. */
2647 ret = lttng_dynamic_buffer_set_size(
2648 &client->communication.outbound.buffer, 0);
2649 if (ret) {
2650 goto error;
2651 }
2652 ret = lttng_poll_mod(&state->events, client->socket,
2653 CLIENT_POLL_MASK_IN);
2654 if (ret) {
2655 goto error;
2656 }
2657
2658 client->communication.outbound.queued_command_reply = false;
2659 client->communication.outbound.dropped_notification = false;
2660 }
2661
2662 return 0;
2663 error:
2664 return -1;
2665 }
2666
2667 static
2668 int client_send_command_reply(struct notification_client *client,
2669 struct notification_thread_state *state,
2670 enum lttng_notification_channel_status status)
2671 {
2672 int ret;
2673 struct lttng_notification_channel_command_reply reply = {
2674 .status = (int8_t) status,
2675 };
2676 struct lttng_notification_channel_message msg = {
2677 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY,
2678 .size = sizeof(reply),
2679 };
2680 char buffer[sizeof(msg) + sizeof(reply)];
2681
2682 if (client->communication.outbound.queued_command_reply) {
2683 /* Protocol error. */
2684 goto error;
2685 }
2686
2687 memcpy(buffer, &msg, sizeof(msg));
2688 memcpy(buffer + sizeof(msg), &reply, sizeof(reply));
2689 DBG("[notification-thread] Send command reply (%i)", (int) status);
2690
2691 /* Enqueue buffer to outgoing queue and flush it. */
2692 ret = lttng_dynamic_buffer_append(
2693 &client->communication.outbound.buffer,
2694 buffer, sizeof(buffer));
2695 if (ret) {
2696 goto error;
2697 }
2698
2699 ret = client_flush_outgoing_queue(client, state);
2700 if (ret) {
2701 goto error;
2702 }
2703
2704 if (client->communication.outbound.buffer.size != 0) {
2705 /* Queue could not be emptied. */
2706 client->communication.outbound.queued_command_reply = true;
2707 }
2708
2709 return 0;
2710 error:
2711 return -1;
2712 }
2713
2714 static
2715 int client_dispatch_message(struct notification_client *client,
2716 struct notification_thread_state *state)
2717 {
2718 int ret = 0;
2719
2720 if (client->communication.inbound.msg_type !=
2721 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE &&
2722 client->communication.inbound.msg_type !=
2723 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN &&
2724 !client->validated) {
2725 WARN("[notification-thread] client attempted a command before handshake");
2726 ret = -1;
2727 goto end;
2728 }
2729
2730 switch (client->communication.inbound.msg_type) {
2731 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN:
2732 {
2733 /*
2734 * Receiving message header. The function will be called again
2735 * once the rest of the message as been received and can be
2736 * interpreted.
2737 */
2738 const struct lttng_notification_channel_message *msg;
2739
2740 assert(sizeof(*msg) ==
2741 client->communication.inbound.buffer.size);
2742 msg = (const struct lttng_notification_channel_message *)
2743 client->communication.inbound.buffer.data;
2744
2745 if (msg->size == 0 || msg->size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) {
2746 ERR("[notification-thread] Invalid notification channel message: length = %u", msg->size);
2747 ret = -1;
2748 goto end;
2749 }
2750
2751 switch (msg->type) {
2752 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
2753 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
2754 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
2755 break;
2756 default:
2757 ret = -1;
2758 ERR("[notification-thread] Invalid notification channel message: unexpected message type");
2759 goto end;
2760 }
2761
2762 client->communication.inbound.bytes_to_receive = msg->size;
2763 client->communication.inbound.msg_type =
2764 (enum lttng_notification_channel_message_type) msg->type;
2765 ret = lttng_dynamic_buffer_set_size(
2766 &client->communication.inbound.buffer, msg->size);
2767 if (ret) {
2768 goto end;
2769 }
2770 break;
2771 }
2772 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
2773 {
2774 struct lttng_notification_channel_command_handshake *handshake_client;
2775 struct lttng_notification_channel_command_handshake handshake_reply = {
2776 .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
2777 .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
2778 };
2779 struct lttng_notification_channel_message msg_header = {
2780 .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE,
2781 .size = sizeof(handshake_reply),
2782 };
2783 enum lttng_notification_channel_status status =
2784 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
2785 char send_buffer[sizeof(msg_header) + sizeof(handshake_reply)];
2786
2787 memcpy(send_buffer, &msg_header, sizeof(msg_header));
2788 memcpy(send_buffer + sizeof(msg_header), &handshake_reply,
2789 sizeof(handshake_reply));
2790
2791 handshake_client =
2792 (struct lttng_notification_channel_command_handshake *)
2793 client->communication.inbound.buffer.data;
2794 client->major = handshake_client->major;
2795 client->minor = handshake_client->minor;
2796 if (!client->communication.inbound.creds_received) {
2797 ERR("[notification-thread] No credentials received from client");
2798 ret = -1;
2799 goto end;
2800 }
2801
2802 client->uid = LTTNG_SOCK_GET_UID_CRED(
2803 &client->communication.inbound.creds);
2804 client->gid = LTTNG_SOCK_GET_GID_CRED(
2805 &client->communication.inbound.creds);
2806 DBG("[notification-thread] Received handshake from client (uid = %u, gid = %u) with version %i.%i",
2807 client->uid, client->gid, (int) client->major,
2808 (int) client->minor);
2809
2810 if (handshake_client->major != LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) {
2811 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNSUPPORTED_VERSION;
2812 }
2813
2814 ret = lttng_dynamic_buffer_append(&client->communication.outbound.buffer,
2815 send_buffer, sizeof(send_buffer));
2816 if (ret) {
2817 ERR("[notification-thread] Failed to send protocol version to notification channel client");
2818 goto end;
2819 }
2820
2821 ret = client_flush_outgoing_queue(client, state);
2822 if (ret) {
2823 goto end;
2824 }
2825
2826 ret = client_send_command_reply(client, state, status);
2827 if (ret) {
2828 ERR("[notification-thread] Failed to send reply to notification channel client");
2829 goto end;
2830 }
2831
2832 /* Set reception state to receive the next message header. */
2833 ret = client_reset_inbound_state(client);
2834 if (ret) {
2835 ERR("[notification-thread] Failed to reset client communication's inbound state");
2836 goto end;
2837 }
2838 client->validated = true;
2839 break;
2840 }
2841 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
2842 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
2843 {
2844 struct lttng_condition *condition;
2845 enum lttng_notification_channel_status status =
2846 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
2847 struct lttng_payload_view condition_view =
2848 lttng_payload_view_from_dynamic_buffer(
2849 &client->communication.inbound.buffer,
2850 0, -1);
2851 size_t expected_condition_size =
2852 client->communication.inbound.buffer.size;
2853
2854 ret = lttng_condition_create_from_payload(&condition_view,
2855 &condition);
2856 if (ret != expected_condition_size) {
2857 ERR("[notification-thread] Malformed condition received from client");
2858 goto end;
2859 }
2860
2861 if (client->communication.inbound.msg_type ==
2862 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE) {
2863 ret = notification_thread_client_subscribe(client,
2864 condition, state, &status);
2865 } else {
2866 ret = notification_thread_client_unsubscribe(client,
2867 condition, state, &status);
2868 }
2869 if (ret) {
2870 goto end;
2871 }
2872
2873 ret = client_send_command_reply(client, state, status);
2874 if (ret) {
2875 ERR("[notification-thread] Failed to send reply to notification channel client");
2876 goto end;
2877 }
2878
2879 /* Set reception state to receive the next message header. */
2880 ret = client_reset_inbound_state(client);
2881 if (ret) {
2882 ERR("[notification-thread] Failed to reset client communication's inbound state");
2883 goto end;
2884 }
2885 break;
2886 }
2887 default:
2888 abort();
2889 }
2890 end:
2891 return ret;
2892 }
2893
2894 /* Incoming data from client. */
2895 int handle_notification_thread_client_in(
2896 struct notification_thread_state *state, int socket)
2897 {
2898 int ret = 0;
2899 struct notification_client *client;
2900 ssize_t recv_ret;
2901 size_t offset;
2902
2903 client = get_client_from_socket(socket, state);
2904 if (!client) {
2905 /* Internal error, abort. */
2906 ret = -1;
2907 goto end;
2908 }
2909
2910 offset = client->communication.inbound.buffer.size -
2911 client->communication.inbound.bytes_to_receive;
2912 if (client->communication.inbound.expect_creds) {
2913 recv_ret = lttcomm_recv_creds_unix_sock(socket,
2914 client->communication.inbound.buffer.data + offset,
2915 client->communication.inbound.bytes_to_receive,
2916 &client->communication.inbound.creds);
2917 if (recv_ret > 0) {
2918 client->communication.inbound.expect_creds = false;
2919 client->communication.inbound.creds_received = true;
2920 }
2921 } else {
2922 recv_ret = lttcomm_recv_unix_sock_non_block(socket,
2923 client->communication.inbound.buffer.data + offset,
2924 client->communication.inbound.bytes_to_receive);
2925 }
2926 if (recv_ret < 0) {
2927 goto error_disconnect_client;
2928 }
2929
2930 client->communication.inbound.bytes_to_receive -= recv_ret;
2931 if (client->communication.inbound.bytes_to_receive == 0) {
2932 ret = client_dispatch_message(client, state);
2933 if (ret) {
2934 /*
2935 * Only returns an error if this client must be
2936 * disconnected.
2937 */
2938 goto error_disconnect_client;
2939 }
2940 } else {
2941 goto end;
2942 }
2943 end:
2944 return ret;
2945 error_disconnect_client:
2946 ret = handle_notification_thread_client_disconnect(socket, state);
2947 return ret;
2948 }
2949
2950 /* Client ready to receive outgoing data. */
2951 int handle_notification_thread_client_out(
2952 struct notification_thread_state *state, int socket)
2953 {
2954 int ret;
2955 struct notification_client *client;
2956
2957 client = get_client_from_socket(socket, state);
2958 if (!client) {
2959 /* Internal error, abort. */
2960 ret = -1;
2961 goto end;
2962 }
2963
2964 ret = client_flush_outgoing_queue(client, state);
2965 if (ret) {
2966 goto end;
2967 }
2968 end:
2969 return ret;
2970 }
2971
2972 static
2973 bool evaluate_buffer_usage_condition(const struct lttng_condition *condition,
2974 const struct channel_state_sample *sample,
2975 uint64_t buffer_capacity)
2976 {
2977 bool result = false;
2978 uint64_t threshold;
2979 enum lttng_condition_type condition_type;
2980 const struct lttng_condition_buffer_usage *use_condition = container_of(
2981 condition, struct lttng_condition_buffer_usage,
2982 parent);
2983
2984 if (use_condition->threshold_bytes.set) {
2985 threshold = use_condition->threshold_bytes.value;
2986 } else {
2987 /*
2988 * Threshold was expressed as a ratio.
2989 *
2990 * TODO the threshold (in bytes) of conditions expressed
2991 * as a ratio of total buffer size could be cached to
2992 * forego this double-multiplication or it could be performed
2993 * as fixed-point math.
2994 *
2995 * Note that caching should accommodates the case where the
2996 * condition applies to multiple channels (i.e. don't assume
2997 * that all channels matching my_chann* have the same size...)
2998 */
2999 threshold = (uint64_t) (use_condition->threshold_ratio.value *
3000 (double) buffer_capacity);
3001 }
3002
3003 condition_type = lttng_condition_get_type(condition);
3004 if (condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW) {
3005 DBG("[notification-thread] Low buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
3006 threshold, sample->highest_usage);
3007
3008 /*
3009 * The low condition should only be triggered once _all_ of the
3010 * streams in a channel have gone below the "low" threshold.
3011 */
3012 if (sample->highest_usage <= threshold) {
3013 result = true;
3014 }
3015 } else {
3016 DBG("[notification-thread] High buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
3017 threshold, sample->highest_usage);
3018
3019 /*
3020 * For high buffer usage scenarios, we want to trigger whenever
3021 * _any_ of the streams has reached the "high" threshold.
3022 */
3023 if (sample->highest_usage >= threshold) {
3024 result = true;
3025 }
3026 }
3027
3028 return result;
3029 }
3030
3031 static
3032 bool evaluate_session_consumed_size_condition(
3033 const struct lttng_condition *condition,
3034 uint64_t session_consumed_size)
3035 {
3036 uint64_t threshold;
3037 const struct lttng_condition_session_consumed_size *size_condition =
3038 container_of(condition,
3039 struct lttng_condition_session_consumed_size,
3040 parent);
3041
3042 threshold = size_condition->consumed_threshold_bytes.value;
3043 DBG("[notification-thread] Session consumed size condition being evaluated: threshold = %" PRIu64 ", current size = %" PRIu64,
3044 threshold, session_consumed_size);
3045 return session_consumed_size >= threshold;
3046 }
3047
3048 static
3049 int evaluate_buffer_condition(const struct lttng_condition *condition,
3050 struct lttng_evaluation **evaluation,
3051 const struct notification_thread_state *state,
3052 const struct channel_state_sample *previous_sample,
3053 const struct channel_state_sample *latest_sample,
3054 uint64_t previous_session_consumed_total,
3055 uint64_t latest_session_consumed_total,
3056 struct channel_info *channel_info)
3057 {
3058 int ret = 0;
3059 enum lttng_condition_type condition_type;
3060 const bool previous_sample_available = !!previous_sample;
3061 bool previous_sample_result = false;
3062 bool latest_sample_result;
3063
3064 condition_type = lttng_condition_get_type(condition);
3065
3066 switch (condition_type) {
3067 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
3068 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
3069 if (caa_likely(previous_sample_available)) {
3070 previous_sample_result =
3071 evaluate_buffer_usage_condition(condition,
3072 previous_sample, channel_info->capacity);
3073 }
3074 latest_sample_result = evaluate_buffer_usage_condition(
3075 condition, latest_sample,
3076 channel_info->capacity);
3077 break;
3078 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
3079 if (caa_likely(previous_sample_available)) {
3080 previous_sample_result =
3081 evaluate_session_consumed_size_condition(
3082 condition,
3083 previous_session_consumed_total);
3084 }
3085 latest_sample_result =
3086 evaluate_session_consumed_size_condition(
3087 condition,
3088 latest_session_consumed_total);
3089 break;
3090 default:
3091 /* Unknown condition type; internal error. */
3092 abort();
3093 }
3094
3095 if (!latest_sample_result ||
3096 (previous_sample_result == latest_sample_result)) {
3097 /*
3098 * Only trigger on a condition evaluation transition.
3099 *
3100 * NOTE: This edge-triggered logic may not be appropriate for
3101 * future condition types.
3102 */
3103 goto end;
3104 }
3105
3106 if (!evaluation || !latest_sample_result) {
3107 goto end;
3108 }
3109
3110 switch (condition_type) {
3111 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
3112 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
3113 *evaluation = lttng_evaluation_buffer_usage_create(
3114 condition_type,
3115 latest_sample->highest_usage,
3116 channel_info->capacity);
3117 break;
3118 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
3119 *evaluation = lttng_evaluation_session_consumed_size_create(
3120 latest_session_consumed_total);
3121 break;
3122 default:
3123 abort();
3124 }
3125
3126 if (!*evaluation) {
3127 ret = -1;
3128 goto end;
3129 }
3130 end:
3131 return ret;
3132 }
3133
3134 static
3135 int client_enqueue_dropped_notification(struct notification_client *client)
3136 {
3137 int ret;
3138 struct lttng_notification_channel_message msg = {
3139 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED,
3140 .size = 0,
3141 };
3142
3143 ret = lttng_dynamic_buffer_append(
3144 &client->communication.outbound.buffer, &msg,
3145 sizeof(msg));
3146 return ret;
3147 }
3148
3149 /*
3150 * Permission checks relative to notification channel clients are performed
3151 * here. Notice how object, client, and trigger credentials are involved in
3152 * this check.
3153 *
3154 * The `object` credentials are the credentials associated with the "subject"
3155 * of a condition. For instance, a `rotation completed` condition applies
3156 * to a session. When that condition is met, it will produce an evaluation
3157 * against a session. Hence, in this case, the `object` credentials are the
3158 * credentials of the "subject" session.
3159 *
3160 * The `trigger` credentials are the credentials of the user that registered the
3161 * trigger.
3162 *
3163 * The `client` credentials are the credentials of the user that created a given
3164 * notification channel.
3165 *
3166 * In terms of visibility, it is expected that non-privilieged users can only
3167 * register triggers against "their" objects (their own sessions and
3168 * applications they are allowed to interact with). They can then open a
3169 * notification channel and subscribe to notifications associated with those
3170 * triggers.
3171 *
3172 * As for privilieged users, they can register triggers against the objects of
3173 * other users. They can then subscribe to the notifications associated to their
3174 * triggers. Privilieged users _can't_ subscribe to the notifications of
3175 * triggers owned by other users; they must create their own triggers.
3176 *
3177 * This is more a concern of usability than security. It would be difficult for
3178 * a root user reliably subscribe to a specific set of conditions without
3179 * interference from external users (those could, for instance, unregister
3180 * their triggers).
3181 */
3182 static
3183 int send_evaluation_to_clients(const struct lttng_trigger *trigger,
3184 const struct lttng_evaluation *evaluation,
3185 struct notification_client_list* client_list,
3186 struct notification_thread_state *state,
3187 uid_t object_uid, gid_t object_gid)
3188 {
3189 int ret = 0;
3190 struct lttng_payload msg_payload;
3191 struct notification_client_list_element *client_list_element, *tmp;
3192 const struct lttng_notification notification = {
3193 .condition = (struct lttng_condition *) lttng_trigger_get_const_condition(trigger),
3194 .evaluation = (struct lttng_evaluation *) evaluation,
3195 };
3196 struct lttng_notification_channel_message msg_header = {
3197 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION,
3198 };
3199 const struct lttng_credentials *trigger_creds = lttng_trigger_get_credentials(trigger);
3200
3201 lttng_payload_init(&msg_payload);
3202
3203 ret = lttng_dynamic_buffer_append(&msg_payload.buffer, &msg_header,
3204 sizeof(msg_header));
3205 if (ret) {
3206 goto end;
3207 }
3208
3209 ret = lttng_notification_serialize(&notification, &msg_payload);
3210 if (ret) {
3211 ERR("[notification-thread] Failed to serialize notification");
3212 ret = -1;
3213 goto end;
3214 }
3215
3216 /* Update payload size. */
3217 ((struct lttng_notification_channel_message * ) msg_payload.buffer.data)->size =
3218 (uint32_t) (msg_payload.buffer.size - sizeof(msg_header));
3219
3220 cds_list_for_each_entry_safe(client_list_element, tmp,
3221 &client_list->list, node) {
3222 struct notification_client *client =
3223 client_list_element->client;
3224
3225 if (client->uid != object_uid && client->gid != object_gid &&
3226 client->uid != 0) {
3227 /* Client is not allowed to monitor this channel. */
3228 DBG("[notification-thread] Skipping client at it does not have the object permission to receive notification for this trigger");
3229 continue;
3230 }
3231
3232 if (client->uid != trigger_creds->uid && client->gid != trigger_creds->gid) {
3233 DBG("[notification-thread] Skipping client at it does not have the permission to receive notification for this trigger");
3234 continue;
3235 }
3236
3237 DBG("[notification-thread] Sending notification to client (fd = %i, %zu bytes)",
3238 client->socket, msg_payload.buffer.size);
3239 if (client->communication.outbound.buffer.size) {
3240 /*
3241 * Outgoing data is already buffered for this client;
3242 * drop the notification and enqueue a "dropped
3243 * notification" message if this is the first dropped
3244 * notification since the socket spilled-over to the
3245 * queue.
3246 */
3247 DBG("[notification-thread] Dropping notification addressed to client (socket fd = %i)",
3248 client->socket);
3249 if (!client->communication.outbound.dropped_notification) {
3250 client->communication.outbound.dropped_notification = true;
3251 ret = client_enqueue_dropped_notification(
3252 client);
3253 if (ret) {
3254 goto end;
3255 }
3256 }
3257 continue;
3258 }
3259
3260 ret = lttng_dynamic_buffer_append_buffer(
3261 &client->communication.outbound.buffer,
3262 &msg_payload.buffer);
3263 if (ret) {
3264 goto end;
3265 }
3266
3267 ret = client_flush_outgoing_queue(client, state);
3268 if (ret) {
3269 goto end;
3270 }
3271 }
3272 ret = 0;
3273 end:
3274 lttng_payload_reset(&msg_payload);
3275 return ret;
3276 }
3277
3278 int handle_notification_thread_channel_sample(
3279 struct notification_thread_state *state, int pipe,
3280 enum lttng_domain_type domain)
3281 {
3282 int ret = 0;
3283 struct lttcomm_consumer_channel_monitor_msg sample_msg;
3284 struct channel_info *channel_info;
3285 struct cds_lfht_node *node;
3286 struct cds_lfht_iter iter;
3287 struct lttng_channel_trigger_list *trigger_list;
3288 struct lttng_trigger_list_element *trigger_list_element;
3289 bool previous_sample_available = false;
3290 struct channel_state_sample previous_sample, latest_sample;
3291 uint64_t previous_session_consumed_total, latest_session_consumed_total;
3292
3293 /*
3294 * The monitoring pipe only holds messages smaller than PIPE_BUF,
3295 * ensuring that read/write of sampling messages are atomic.
3296 */
3297 ret = lttng_read(pipe, &sample_msg, sizeof(sample_msg));
3298 if (ret != sizeof(sample_msg)) {
3299 ERR("[notification-thread] Failed to read from monitoring pipe (fd = %i)",
3300 pipe);
3301 ret = -1;
3302 goto end;
3303 }
3304
3305 ret = 0;
3306 latest_sample.key.key = sample_msg.key;
3307 latest_sample.key.domain = domain;
3308 latest_sample.highest_usage = sample_msg.highest;
3309 latest_sample.lowest_usage = sample_msg.lowest;
3310 latest_sample.channel_total_consumed = sample_msg.total_consumed;
3311
3312 rcu_read_lock();
3313
3314 /* Retrieve the channel's informations */
3315 cds_lfht_lookup(state->channels_ht,
3316 hash_channel_key(&latest_sample.key),
3317 match_channel_info,
3318 &latest_sample.key,
3319 &iter);
3320 node = cds_lfht_iter_get_node(&iter);
3321 if (caa_unlikely(!node)) {
3322 /*
3323 * Not an error since the consumer can push a sample to the pipe
3324 * and the rest of the session daemon could notify us of the
3325 * channel's destruction before we get a chance to process that
3326 * sample.
3327 */
3328 DBG("[notification-thread] Received a sample for an unknown channel from consumerd, key = %" PRIu64 " in %s domain",
3329 latest_sample.key.key,
3330 domain == LTTNG_DOMAIN_KERNEL ? "kernel" :
3331 "user space");
3332 goto end_unlock;
3333 }
3334 channel_info = caa_container_of(node, struct channel_info,
3335 channels_ht_node);
3336 DBG("[notification-thread] Handling channel sample for channel %s (key = %" PRIu64 ") in session %s (highest usage = %" PRIu64 ", lowest usage = %" PRIu64", total consumed = %" PRIu64")",
3337 channel_info->name,
3338 latest_sample.key.key,
3339 channel_info->session_info->name,
3340 latest_sample.highest_usage,
3341 latest_sample.lowest_usage,
3342 latest_sample.channel_total_consumed);
3343
3344 previous_session_consumed_total =
3345 channel_info->session_info->consumed_data_size;
3346
3347 /* Retrieve the channel's last sample, if it exists, and update it. */
3348 cds_lfht_lookup(state->channel_state_ht,
3349 hash_channel_key(&latest_sample.key),
3350 match_channel_state_sample,
3351 &latest_sample.key,
3352 &iter);
3353 node = cds_lfht_iter_get_node(&iter);
3354 if (caa_likely(node)) {
3355 struct channel_state_sample *stored_sample;
3356
3357 /* Update the sample stored. */
3358 stored_sample = caa_container_of(node,
3359 struct channel_state_sample,
3360 channel_state_ht_node);
3361
3362 memcpy(&previous_sample, stored_sample,
3363 sizeof(previous_sample));
3364 stored_sample->highest_usage = latest_sample.highest_usage;
3365 stored_sample->lowest_usage = latest_sample.lowest_usage;
3366 stored_sample->channel_total_consumed = latest_sample.channel_total_consumed;
3367 previous_sample_available = true;
3368
3369 latest_session_consumed_total =
3370 previous_session_consumed_total +
3371 (latest_sample.channel_total_consumed - previous_sample.channel_total_consumed);
3372 } else {
3373 /*
3374 * This is the channel's first sample, allocate space for and
3375 * store the new sample.
3376 */
3377 struct channel_state_sample *stored_sample;
3378
3379 stored_sample = zmalloc(sizeof(*stored_sample));
3380 if (!stored_sample) {
3381 ret = -1;
3382 goto end_unlock;
3383 }
3384
3385 memcpy(stored_sample, &latest_sample, sizeof(*stored_sample));
3386 cds_lfht_node_init(&stored_sample->channel_state_ht_node);
3387 cds_lfht_add(state->channel_state_ht,
3388 hash_channel_key(&stored_sample->key),
3389 &stored_sample->channel_state_ht_node);
3390
3391 latest_session_consumed_total =
3392 previous_session_consumed_total +
3393 latest_sample.channel_total_consumed;
3394 }
3395
3396 channel_info->session_info->consumed_data_size =
3397 latest_session_consumed_total;
3398
3399 /* Find triggers associated with this channel. */
3400 cds_lfht_lookup(state->channel_triggers_ht,
3401 hash_channel_key(&latest_sample.key),
3402 match_channel_trigger_list,
3403 &latest_sample.key,
3404 &iter);
3405 node = cds_lfht_iter_get_node(&iter);
3406 if (caa_likely(!node)) {
3407 goto end_unlock;
3408 }
3409
3410 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
3411 channel_triggers_ht_node);
3412 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
3413 node) {
3414 const struct lttng_condition *condition;
3415 const struct lttng_action *action;
3416 const struct lttng_trigger *trigger;
3417 struct notification_client_list *client_list;
3418 struct lttng_evaluation *evaluation = NULL;
3419
3420 trigger = trigger_list_element->trigger;
3421 condition = lttng_trigger_get_const_condition(trigger);
3422 assert(condition);
3423 action = lttng_trigger_get_const_action(trigger);
3424
3425 /* Notify actions are the only type currently supported. */
3426 assert(lttng_action_get_type_const(action) ==
3427 LTTNG_ACTION_TYPE_NOTIFY);
3428
3429 /*
3430 * Check if any client is subscribed to the result of this
3431 * evaluation.
3432 */
3433 client_list = get_client_list_from_condition(state, condition);
3434 assert(client_list);
3435 if (cds_list_empty(&client_list->list)) {
3436 /*
3437 * No clients interested in the evaluation's result,
3438 * skip it.
3439 */
3440 continue;
3441 }
3442
3443 ret = evaluate_buffer_condition(condition, &evaluation, state,
3444 previous_sample_available ? &previous_sample : NULL,
3445 &latest_sample,
3446 previous_session_consumed_total,
3447 latest_session_consumed_total,
3448 channel_info);
3449 if (caa_unlikely(ret)) {
3450 goto end_unlock;
3451 }
3452
3453 if (caa_likely(!evaluation)) {
3454 continue;
3455 }
3456
3457 /* Dispatch evaluation result to all clients. */
3458 ret = send_evaluation_to_clients(trigger_list_element->trigger,
3459 evaluation, client_list, state,
3460 channel_info->session_info->uid,
3461 channel_info->session_info->gid);
3462 lttng_evaluation_destroy(evaluation);
3463 if (caa_unlikely(ret)) {
3464 goto end_unlock;
3465 }
3466 }
3467 end_unlock:
3468 rcu_read_unlock();
3469 end:
3470 return ret;
3471 }
This page took 0.20017 seconds and 4 git commands to generate.