Fix: remove session_info from sessions_ht on destruction
[lttng-tools.git] / src / bin / lttng-sessiond / notification-thread-events.c
1 /*
2 * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18 #define _LGPL_SOURCE
19 #include <urcu.h>
20 #include <urcu/rculfhash.h>
21
22 #include <common/defaults.h>
23 #include <common/error.h>
24 #include <common/futex.h>
25 #include <common/unix.h>
26 #include <common/dynamic-buffer.h>
27 #include <common/hashtable/utils.h>
28 #include <common/sessiond-comm/sessiond-comm.h>
29 #include <common/macros.h>
30 #include <lttng/condition/condition.h>
31 #include <lttng/action/action-internal.h>
32 #include <lttng/notification/notification-internal.h>
33 #include <lttng/condition/condition-internal.h>
34 #include <lttng/condition/buffer-usage-internal.h>
35 #include <lttng/condition/session-consumed-size-internal.h>
36 #include <lttng/condition/session-rotation-internal.h>
37 #include <lttng/notification/channel-internal.h>
38
39 #include <time.h>
40 #include <unistd.h>
41 #include <assert.h>
42 #include <inttypes.h>
43 #include <fcntl.h>
44
45 #include "notification-thread.h"
46 #include "notification-thread-events.h"
47 #include "notification-thread-commands.h"
48 #include "lttng-sessiond.h"
49 #include "kernel.h"
50
51 #define CLIENT_POLL_MASK_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
52 #define CLIENT_POLL_MASK_IN_OUT (CLIENT_POLL_MASK_IN | LPOLLOUT)
53
54 enum lttng_object_type {
55 LTTNG_OBJECT_TYPE_UNKNOWN,
56 LTTNG_OBJECT_TYPE_NONE,
57 LTTNG_OBJECT_TYPE_CHANNEL,
58 LTTNG_OBJECT_TYPE_SESSION,
59 };
60
61 struct lttng_trigger_list_element {
62 /* No ownership of the trigger object is assumed. */
63 const struct lttng_trigger *trigger;
64 struct cds_list_head node;
65 };
66
67 struct lttng_channel_trigger_list {
68 struct channel_key channel_key;
69 /* List of struct lttng_trigger_list_element. */
70 struct cds_list_head list;
71 /* Node in the channel_triggers_ht */
72 struct cds_lfht_node channel_triggers_ht_node;
73 };
74
75 /*
76 * List of triggers applying to a given session.
77 *
78 * See:
79 * - lttng_session_trigger_list_create()
80 * - lttng_session_trigger_list_build()
81 * - lttng_session_trigger_list_destroy()
82 * - lttng_session_trigger_list_add()
83 */
84 struct lttng_session_trigger_list {
85 /*
86 * Not owned by this; points to the session_info structure's
87 * session name.
88 */
89 const char *session_name;
90 /* List of struct lttng_trigger_list_element. */
91 struct cds_list_head list;
92 /* Node in the session_triggers_ht */
93 struct cds_lfht_node session_triggers_ht_node;
94 /*
95 * Weak reference to the notification system's session triggers
96 * hashtable.
97 *
98 * The session trigger list structure structure is owned by
99 * the session's session_info.
100 *
101 * The session_info is kept alive the the channel_infos holding a
102 * reference to it (reference counting). When those channels are
103 * destroyed (at runtime or on teardown), the reference they hold
104 * to the session_info are released. On destruction of session_info,
105 * session_info_destroy() will remove the list of triggers applying
106 * to this session from the notification system's state.
107 *
108 * This implies that the session_triggers_ht must be destroyed
109 * after the channels.
110 */
111 struct cds_lfht *session_triggers_ht;
112 /* Used for delayed RCU reclaim. */
113 struct rcu_head rcu_node;
114 };
115
116 struct lttng_trigger_ht_element {
117 struct lttng_trigger *trigger;
118 struct cds_lfht_node node;
119 };
120
121 struct lttng_condition_list_element {
122 struct lttng_condition *condition;
123 struct cds_list_head node;
124 };
125
126 struct notification_client_list_element {
127 struct notification_client *client;
128 struct cds_list_head node;
129 };
130
131 struct notification_client_list {
132 struct lttng_trigger *trigger;
133 struct cds_list_head list;
134 struct cds_lfht_node notification_trigger_ht_node;
135 };
136
137 struct notification_client {
138 int socket;
139 /* Client protocol version. */
140 uint8_t major, minor;
141 uid_t uid;
142 gid_t gid;
143 /*
144 * Indicates if the credentials and versions of the client have been
145 * checked.
146 */
147 bool validated;
148 /*
149 * Conditions to which the client's notification channel is subscribed.
150 * List of struct lttng_condition_list_node. The condition member is
151 * owned by the client.
152 */
153 struct cds_list_head condition_list;
154 struct cds_lfht_node client_socket_ht_node;
155 struct {
156 struct {
157 /*
158 * During the reception of a message, the reception
159 * buffers' "size" is set to contain the current
160 * message's complete payload.
161 */
162 struct lttng_dynamic_buffer buffer;
163 /* Bytes left to receive for the current message. */
164 size_t bytes_to_receive;
165 /* Type of the message being received. */
166 enum lttng_notification_channel_message_type msg_type;
167 /*
168 * Indicates whether or not credentials are expected
169 * from the client.
170 */
171 bool expect_creds;
172 /*
173 * Indicates whether or not credentials were received
174 * from the client.
175 */
176 bool creds_received;
177 /* Only used during credentials reception. */
178 lttng_sock_cred creds;
179 } inbound;
180 struct {
181 /*
182 * Indicates whether or not a notification addressed to
183 * this client was dropped because a command reply was
184 * already buffered.
185 *
186 * A notification is dropped whenever the buffer is not
187 * empty.
188 */
189 bool dropped_notification;
190 /*
191 * Indicates whether or not a command reply is already
192 * buffered. In this case, it means that the client is
193 * not consuming command replies before emitting a new
194 * one. This could be caused by a protocol error or a
195 * misbehaving/malicious client.
196 */
197 bool queued_command_reply;
198 struct lttng_dynamic_buffer buffer;
199 } outbound;
200 } communication;
201 };
202
203 struct channel_state_sample {
204 struct channel_key key;
205 struct cds_lfht_node channel_state_ht_node;
206 uint64_t highest_usage;
207 uint64_t lowest_usage;
208 uint64_t channel_total_consumed;
209 };
210
211 static unsigned long hash_channel_key(struct channel_key *key);
212 static int evaluate_condition(const struct lttng_condition *condition,
213 struct lttng_evaluation **evaluation,
214 const struct notification_thread_state *state,
215 const struct channel_state_sample *previous_sample,
216 const struct channel_state_sample *latest_sample,
217 uint64_t previous_session_consumed_total,
218 uint64_t latest_session_consumed_total,
219 struct channel_info *channel_info);
220 static
221 int send_evaluation_to_clients(const struct lttng_trigger *trigger,
222 const struct lttng_evaluation *evaluation,
223 struct notification_client_list *client_list,
224 struct notification_thread_state *state,
225 uid_t channel_uid, gid_t channel_gid);
226
227
228 /* session_info API */
229 static
230 void session_info_destroy(void *_data);
231 static
232 void session_info_get(struct session_info *session_info);
233 static
234 void session_info_put(struct session_info *session_info);
235 static
236 struct session_info *session_info_create(const char *name,
237 uid_t uid, gid_t gid,
238 struct lttng_session_trigger_list *trigger_list,
239 struct cds_lfht *sessions_ht);
240 static
241 void session_info_add_channel(struct session_info *session_info,
242 struct channel_info *channel_info);
243 static
244 void session_info_remove_channel(struct session_info *session_info,
245 struct channel_info *channel_info);
246
247 /* lttng_session_trigger_list API */
248 static
249 struct lttng_session_trigger_list *lttng_session_trigger_list_create(
250 const char *session_name,
251 struct cds_lfht *session_triggers_ht);
252 static
253 struct lttng_session_trigger_list *lttng_session_trigger_list_build(
254 const struct notification_thread_state *state,
255 const char *session_name);
256 static
257 void lttng_session_trigger_list_destroy(
258 struct lttng_session_trigger_list *list);
259 static
260 int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
261 const struct lttng_trigger *trigger);
262
263
264 static
265 int match_client(struct cds_lfht_node *node, const void *key)
266 {
267 /* This double-cast is intended to supress pointer-to-cast warning. */
268 int socket = (int) (intptr_t) key;
269 struct notification_client *client;
270
271 client = caa_container_of(node, struct notification_client,
272 client_socket_ht_node);
273
274 return !!(client->socket == socket);
275 }
276
277 static
278 int match_channel_trigger_list(struct cds_lfht_node *node, const void *key)
279 {
280 struct channel_key *channel_key = (struct channel_key *) key;
281 struct lttng_channel_trigger_list *trigger_list;
282
283 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
284 channel_triggers_ht_node);
285
286 return !!((channel_key->key == trigger_list->channel_key.key) &&
287 (channel_key->domain == trigger_list->channel_key.domain));
288 }
289
290 static
291 int match_session_trigger_list(struct cds_lfht_node *node, const void *key)
292 {
293 const char *session_name = (const char *) key;
294 struct lttng_session_trigger_list *trigger_list;
295
296 trigger_list = caa_container_of(node, struct lttng_session_trigger_list,
297 session_triggers_ht_node);
298
299 return !!(strcmp(trigger_list->session_name, session_name) == 0);
300 }
301
302 static
303 int match_channel_state_sample(struct cds_lfht_node *node, const void *key)
304 {
305 struct channel_key *channel_key = (struct channel_key *) key;
306 struct channel_state_sample *sample;
307
308 sample = caa_container_of(node, struct channel_state_sample,
309 channel_state_ht_node);
310
311 return !!((channel_key->key == sample->key.key) &&
312 (channel_key->domain == sample->key.domain));
313 }
314
315 static
316 int match_channel_info(struct cds_lfht_node *node, const void *key)
317 {
318 struct channel_key *channel_key = (struct channel_key *) key;
319 struct channel_info *channel_info;
320
321 channel_info = caa_container_of(node, struct channel_info,
322 channels_ht_node);
323
324 return !!((channel_key->key == channel_info->key.key) &&
325 (channel_key->domain == channel_info->key.domain));
326 }
327
328 static
329 int match_condition(struct cds_lfht_node *node, const void *key)
330 {
331 struct lttng_condition *condition_key = (struct lttng_condition *) key;
332 struct lttng_trigger_ht_element *trigger;
333 struct lttng_condition *condition;
334
335 trigger = caa_container_of(node, struct lttng_trigger_ht_element,
336 node);
337 condition = lttng_trigger_get_condition(trigger->trigger);
338 assert(condition);
339
340 return !!lttng_condition_is_equal(condition_key, condition);
341 }
342
343 static
344 int match_client_list(struct cds_lfht_node *node, const void *key)
345 {
346 struct lttng_trigger *trigger_key = (struct lttng_trigger *) key;
347 struct notification_client_list *client_list;
348 struct lttng_condition *condition;
349 struct lttng_condition *condition_key = lttng_trigger_get_condition(
350 trigger_key);
351
352 assert(condition_key);
353
354 client_list = caa_container_of(node, struct notification_client_list,
355 notification_trigger_ht_node);
356 condition = lttng_trigger_get_condition(client_list->trigger);
357
358 return !!lttng_condition_is_equal(condition_key, condition);
359 }
360
361 static
362 int match_client_list_condition(struct cds_lfht_node *node, const void *key)
363 {
364 struct lttng_condition *condition_key = (struct lttng_condition *) key;
365 struct notification_client_list *client_list;
366 struct lttng_condition *condition;
367
368 assert(condition_key);
369
370 client_list = caa_container_of(node, struct notification_client_list,
371 notification_trigger_ht_node);
372 condition = lttng_trigger_get_condition(client_list->trigger);
373
374 return !!lttng_condition_is_equal(condition_key, condition);
375 }
376
377 static
378 unsigned long lttng_condition_buffer_usage_hash(
379 const struct lttng_condition *_condition)
380 {
381 unsigned long hash;
382 unsigned long condition_type;
383 struct lttng_condition_buffer_usage *condition;
384
385 condition = container_of(_condition,
386 struct lttng_condition_buffer_usage, parent);
387
388 condition_type = (unsigned long) condition->parent.type;
389 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
390 if (condition->session_name) {
391 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
392 }
393 if (condition->channel_name) {
394 hash ^= hash_key_str(condition->channel_name, lttng_ht_seed);
395 }
396 if (condition->domain.set) {
397 hash ^= hash_key_ulong(
398 (void *) condition->domain.type,
399 lttng_ht_seed);
400 }
401 if (condition->threshold_ratio.set) {
402 uint64_t val;
403
404 val = condition->threshold_ratio.value * (double) UINT32_MAX;
405 hash ^= hash_key_u64(&val, lttng_ht_seed);
406 } else if (condition->threshold_bytes.set) {
407 uint64_t val;
408
409 val = condition->threshold_bytes.value;
410 hash ^= hash_key_u64(&val, lttng_ht_seed);
411 }
412 return hash;
413 }
414
415 static
416 unsigned long lttng_condition_session_consumed_size_hash(
417 const struct lttng_condition *_condition)
418 {
419 unsigned long hash;
420 unsigned long condition_type =
421 (unsigned long) LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE;
422 struct lttng_condition_session_consumed_size *condition;
423 uint64_t val;
424
425 condition = container_of(_condition,
426 struct lttng_condition_session_consumed_size, parent);
427
428 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
429 if (condition->session_name) {
430 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
431 }
432 val = condition->consumed_threshold_bytes.value;
433 hash ^= hash_key_u64(&val, lttng_ht_seed);
434 return hash;
435 }
436
437 static
438 unsigned long lttng_condition_session_rotation_hash(
439 const struct lttng_condition *_condition)
440 {
441 unsigned long hash, condition_type;
442 struct lttng_condition_session_rotation *condition;
443
444 condition = container_of(_condition,
445 struct lttng_condition_session_rotation, parent);
446 condition_type = (unsigned long) condition->parent.type;
447 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
448 assert(condition->session_name);
449 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
450 return hash;
451 }
452
453 /*
454 * The lttng_condition hashing code is kept in this file (rather than
455 * condition.c) since it makes use of GPLv2 code (hashtable utils), which we
456 * don't want to link in liblttng-ctl.
457 */
458 static
459 unsigned long lttng_condition_hash(const struct lttng_condition *condition)
460 {
461 switch (condition->type) {
462 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
463 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
464 return lttng_condition_buffer_usage_hash(condition);
465 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
466 return lttng_condition_session_consumed_size_hash(condition);
467 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
468 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
469 return lttng_condition_session_rotation_hash(condition);
470 default:
471 ERR("[notification-thread] Unexpected condition type caught");
472 abort();
473 }
474 }
475
476 static
477 unsigned long hash_channel_key(struct channel_key *key)
478 {
479 unsigned long key_hash = hash_key_u64(&key->key, lttng_ht_seed);
480 unsigned long domain_hash = hash_key_ulong(
481 (void *) (unsigned long) key->domain, lttng_ht_seed);
482
483 return key_hash ^ domain_hash;
484 }
485
486 static
487 void channel_info_destroy(struct channel_info *channel_info)
488 {
489 if (!channel_info) {
490 return;
491 }
492
493 if (channel_info->session_info) {
494 session_info_remove_channel(channel_info->session_info,
495 channel_info);
496 session_info_put(channel_info->session_info);
497 }
498 if (channel_info->name) {
499 free(channel_info->name);
500 }
501 free(channel_info);
502 }
503
504 /* Don't call directly, use the ref-counting mechanism. */
505 static
506 void session_info_destroy(void *_data)
507 {
508 struct session_info *session_info = _data;
509 int ret;
510
511 assert(session_info);
512 if (session_info->channel_infos_ht) {
513 ret = cds_lfht_destroy(session_info->channel_infos_ht, NULL);
514 if (ret) {
515 ERR("[notification-thread] Failed to destroy channel information hash table");
516 }
517 }
518 lttng_session_trigger_list_destroy(session_info->trigger_list);
519
520 rcu_read_lock();
521 cds_lfht_del(session_info->sessions_ht,
522 &session_info->sessions_ht_node);
523 rcu_read_unlock();
524 free(session_info->name);
525 free(session_info);
526 }
527
528 static
529 void session_info_get(struct session_info *session_info)
530 {
531 if (!session_info) {
532 return;
533 }
534 lttng_ref_get(&session_info->ref);
535 }
536
537 static
538 void session_info_put(struct session_info *session_info)
539 {
540 if (!session_info) {
541 return;
542 }
543 lttng_ref_put(&session_info->ref);
544 }
545
546 static
547 struct session_info *session_info_create(const char *name, uid_t uid, gid_t gid,
548 struct lttng_session_trigger_list *trigger_list,
549 struct cds_lfht *sessions_ht)
550 {
551 struct session_info *session_info;
552
553 assert(name);
554
555 session_info = zmalloc(sizeof(*session_info));
556 if (!session_info) {
557 goto end;
558 }
559 lttng_ref_init(&session_info->ref, session_info_destroy);
560
561 session_info->channel_infos_ht = cds_lfht_new(DEFAULT_HT_SIZE,
562 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
563 if (!session_info->channel_infos_ht) {
564 goto error;
565 }
566
567 cds_lfht_node_init(&session_info->sessions_ht_node);
568 session_info->name = strdup(name);
569 if (!session_info->name) {
570 goto error;
571 }
572 session_info->uid = uid;
573 session_info->gid = gid;
574 session_info->trigger_list = trigger_list;
575 session_info->sessions_ht = sessions_ht;
576 end:
577 return session_info;
578 error:
579 session_info_put(session_info);
580 return NULL;
581 }
582
583 static
584 void session_info_add_channel(struct session_info *session_info,
585 struct channel_info *channel_info)
586 {
587 rcu_read_lock();
588 cds_lfht_add(session_info->channel_infos_ht,
589 hash_channel_key(&channel_info->key),
590 &channel_info->session_info_channels_ht_node);
591 rcu_read_unlock();
592 }
593
594 static
595 void session_info_remove_channel(struct session_info *session_info,
596 struct channel_info *channel_info)
597 {
598 rcu_read_lock();
599 cds_lfht_del(session_info->channel_infos_ht,
600 &channel_info->session_info_channels_ht_node);
601 rcu_read_unlock();
602 }
603
604 static
605 struct channel_info *channel_info_create(const char *channel_name,
606 struct channel_key *channel_key, uint64_t channel_capacity,
607 struct session_info *session_info)
608 {
609 struct channel_info *channel_info = zmalloc(sizeof(*channel_info));
610
611 if (!channel_info) {
612 goto end;
613 }
614
615 cds_lfht_node_init(&channel_info->channels_ht_node);
616 cds_lfht_node_init(&channel_info->session_info_channels_ht_node);
617 memcpy(&channel_info->key, channel_key, sizeof(*channel_key));
618 channel_info->capacity = channel_capacity;
619
620 channel_info->name = strdup(channel_name);
621 if (!channel_info->name) {
622 goto error;
623 }
624
625 /*
626 * Set the references between session and channel infos:
627 * - channel_info holds a strong reference to session_info
628 * - session_info holds a weak reference to channel_info
629 */
630 session_info_get(session_info);
631 session_info_add_channel(session_info, channel_info);
632 channel_info->session_info = session_info;
633 end:
634 return channel_info;
635 error:
636 channel_info_destroy(channel_info);
637 return NULL;
638 }
639
640 /* This function must be called with the RCU read lock held. */
641 static
642 int evaluate_condition_for_client(struct lttng_trigger *trigger,
643 struct lttng_condition *condition,
644 struct notification_client *client,
645 struct notification_thread_state *state)
646 {
647 int ret;
648 struct cds_lfht_iter iter;
649 struct cds_lfht_node *node;
650 struct channel_info *channel_info = NULL;
651 struct channel_key *channel_key = NULL;
652 struct channel_state_sample *last_sample = NULL;
653 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
654 struct lttng_evaluation *evaluation = NULL;
655 struct notification_client_list client_list = { 0 };
656 struct notification_client_list_element client_list_element = { 0 };
657
658 assert(trigger);
659 assert(condition);
660 assert(client);
661 assert(state);
662
663 /* Find the channel associated with the trigger. */
664 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter,
665 channel_trigger_list , channel_triggers_ht_node) {
666 struct lttng_trigger_list_element *element;
667
668 cds_list_for_each_entry(element, &channel_trigger_list->list, node) {
669 const struct lttng_condition *current_condition =
670 lttng_trigger_get_const_condition(
671 element->trigger);
672
673 assert(current_condition);
674 if (!lttng_condition_is_equal(condition,
675 current_condition)) {
676 continue;
677 }
678
679 /* Found the trigger, save the channel key. */
680 channel_key = &channel_trigger_list->channel_key;
681 break;
682 }
683 if (channel_key) {
684 /* The channel key was found stop iteration. */
685 break;
686 }
687 }
688
689 if (!channel_key){
690 /* No channel found; normal exit. */
691 DBG("[notification-thread] No channel associated with newly subscribed-to condition");
692 ret = 0;
693 goto end;
694 }
695
696 /* Fetch channel info for the matching channel. */
697 cds_lfht_lookup(state->channels_ht,
698 hash_channel_key(channel_key),
699 match_channel_info,
700 channel_key,
701 &iter);
702 node = cds_lfht_iter_get_node(&iter);
703 assert(node);
704 channel_info = caa_container_of(node, struct channel_info,
705 channels_ht_node);
706
707 /* Retrieve the channel's last sample, if it exists. */
708 cds_lfht_lookup(state->channel_state_ht,
709 hash_channel_key(channel_key),
710 match_channel_state_sample,
711 channel_key,
712 &iter);
713 node = cds_lfht_iter_get_node(&iter);
714 if (node) {
715 last_sample = caa_container_of(node,
716 struct channel_state_sample,
717 channel_state_ht_node);
718 } else {
719 /* Nothing to evaluate, no sample was ever taken. Normal exit */
720 DBG("[notification-thread] No channel sample associated with newly subscribed-to condition");
721 ret = 0;
722 goto end;
723 }
724
725 ret = evaluate_condition(condition, &evaluation, state,
726 NULL, last_sample,
727 0, channel_info->session_info->consumed_data_size,
728 channel_info);
729 if (ret) {
730 WARN("[notification-thread] Fatal error occurred while evaluating a newly subscribed-to condition");
731 goto end;
732 }
733
734 if (!evaluation) {
735 /* Evaluation yielded nothing. Normal exit. */
736 DBG("[notification-thread] Newly subscribed-to condition evaluated to false, nothing to report to client");
737 ret = 0;
738 goto end;
739 }
740
741 /*
742 * Create a temporary client list with the client currently
743 * subscribing.
744 */
745 cds_lfht_node_init(&client_list.notification_trigger_ht_node);
746 CDS_INIT_LIST_HEAD(&client_list.list);
747 client_list.trigger = trigger;
748
749 CDS_INIT_LIST_HEAD(&client_list_element.node);
750 client_list_element.client = client;
751 cds_list_add(&client_list_element.node, &client_list.list);
752
753 /* Send evaluation result to the newly-subscribed client. */
754 DBG("[notification-thread] Newly subscribed-to condition evaluated to true, notifying client");
755 ret = send_evaluation_to_clients(trigger, evaluation, &client_list,
756 state, channel_info->session_info->uid,
757 channel_info->session_info->gid);
758
759 end:
760 return ret;
761 }
762
763 static
764 int notification_thread_client_subscribe(struct notification_client *client,
765 struct lttng_condition *condition,
766 struct notification_thread_state *state,
767 enum lttng_notification_channel_status *_status)
768 {
769 int ret = 0;
770 struct cds_lfht_iter iter;
771 struct cds_lfht_node *node;
772 struct notification_client_list *client_list;
773 struct lttng_condition_list_element *condition_list_element = NULL;
774 struct notification_client_list_element *client_list_element = NULL;
775 enum lttng_notification_channel_status status =
776 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
777
778 /*
779 * Ensure that the client has not already subscribed to this condition
780 * before.
781 */
782 cds_list_for_each_entry(condition_list_element, &client->condition_list, node) {
783 if (lttng_condition_is_equal(condition_list_element->condition,
784 condition)) {
785 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ALREADY_SUBSCRIBED;
786 goto end;
787 }
788 }
789
790 condition_list_element = zmalloc(sizeof(*condition_list_element));
791 if (!condition_list_element) {
792 ret = -1;
793 goto error;
794 }
795 client_list_element = zmalloc(sizeof(*client_list_element));
796 if (!client_list_element) {
797 ret = -1;
798 goto error;
799 }
800
801 rcu_read_lock();
802
803 /*
804 * Add the newly-subscribed condition to the client's subscription list.
805 */
806 CDS_INIT_LIST_HEAD(&condition_list_element->node);
807 condition_list_element->condition = condition;
808 cds_list_add(&condition_list_element->node, &client->condition_list);
809
810 cds_lfht_lookup(state->notification_trigger_clients_ht,
811 lttng_condition_hash(condition),
812 match_client_list_condition,
813 condition,
814 &iter);
815 node = cds_lfht_iter_get_node(&iter);
816 if (!node) {
817 /*
818 * No notification-emiting trigger registered with this
819 * condition. We don't evaluate the condition right away
820 * since this trigger is not registered yet.
821 */
822 free(client_list_element);
823 goto end_unlock;
824 }
825
826 client_list = caa_container_of(node, struct notification_client_list,
827 notification_trigger_ht_node);
828 /*
829 * The condition to which the client just subscribed is evaluated
830 * at this point so that conditions that are already TRUE result
831 * in a notification being sent out.
832 */
833 if (evaluate_condition_for_client(client_list->trigger, condition,
834 client, state)) {
835 WARN("[notification-thread] Evaluation of a condition on client subscription failed, aborting.");
836 ret = -1;
837 free(client_list_element);
838 goto end_unlock;
839 }
840
841 /*
842 * Add the client to the list of clients interested in a given trigger
843 * if a "notification" trigger with a corresponding condition was
844 * added prior.
845 */
846 client_list_element->client = client;
847 CDS_INIT_LIST_HEAD(&client_list_element->node);
848 cds_list_add(&client_list_element->node, &client_list->list);
849 end_unlock:
850 rcu_read_unlock();
851 end:
852 if (_status) {
853 *_status = status;
854 }
855 return ret;
856 error:
857 free(condition_list_element);
858 free(client_list_element);
859 return ret;
860 }
861
862 static
863 int notification_thread_client_unsubscribe(
864 struct notification_client *client,
865 struct lttng_condition *condition,
866 struct notification_thread_state *state,
867 enum lttng_notification_channel_status *_status)
868 {
869 struct cds_lfht_iter iter;
870 struct cds_lfht_node *node;
871 struct notification_client_list *client_list;
872 struct lttng_condition_list_element *condition_list_element,
873 *condition_tmp;
874 struct notification_client_list_element *client_list_element,
875 *client_tmp;
876 bool condition_found = false;
877 enum lttng_notification_channel_status status =
878 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
879
880 /* Remove the condition from the client's condition list. */
881 cds_list_for_each_entry_safe(condition_list_element, condition_tmp,
882 &client->condition_list, node) {
883 if (!lttng_condition_is_equal(condition_list_element->condition,
884 condition)) {
885 continue;
886 }
887
888 cds_list_del(&condition_list_element->node);
889 /*
890 * The caller may be iterating on the client's conditions to
891 * tear down a client's connection. In this case, the condition
892 * will be destroyed at the end.
893 */
894 if (condition != condition_list_element->condition) {
895 lttng_condition_destroy(
896 condition_list_element->condition);
897 }
898 free(condition_list_element);
899 condition_found = true;
900 break;
901 }
902
903 if (!condition_found) {
904 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNKNOWN_CONDITION;
905 goto end;
906 }
907
908 /*
909 * Remove the client from the list of clients interested the trigger
910 * matching the condition.
911 */
912 rcu_read_lock();
913 cds_lfht_lookup(state->notification_trigger_clients_ht,
914 lttng_condition_hash(condition),
915 match_client_list_condition,
916 condition,
917 &iter);
918 node = cds_lfht_iter_get_node(&iter);
919 if (!node) {
920 goto end_unlock;
921 }
922
923 client_list = caa_container_of(node, struct notification_client_list,
924 notification_trigger_ht_node);
925 cds_list_for_each_entry_safe(client_list_element, client_tmp,
926 &client_list->list, node) {
927 if (client_list_element->client->socket != client->socket) {
928 continue;
929 }
930 cds_list_del(&client_list_element->node);
931 free(client_list_element);
932 break;
933 }
934 end_unlock:
935 rcu_read_unlock();
936 end:
937 lttng_condition_destroy(condition);
938 if (_status) {
939 *_status = status;
940 }
941 return 0;
942 }
943
944 static
945 void notification_client_destroy(struct notification_client *client,
946 struct notification_thread_state *state)
947 {
948 struct lttng_condition_list_element *condition_list_element, *tmp;
949
950 if (!client) {
951 return;
952 }
953
954 /* Release all conditions to which the client was subscribed. */
955 cds_list_for_each_entry_safe(condition_list_element, tmp,
956 &client->condition_list, node) {
957 (void) notification_thread_client_unsubscribe(client,
958 condition_list_element->condition, state, NULL);
959 }
960
961 if (client->socket >= 0) {
962 (void) lttcomm_close_unix_sock(client->socket);
963 }
964 lttng_dynamic_buffer_reset(&client->communication.inbound.buffer);
965 lttng_dynamic_buffer_reset(&client->communication.outbound.buffer);
966 free(client);
967 }
968
969 /*
970 * Call with rcu_read_lock held (and hold for the lifetime of the returned
971 * client pointer).
972 */
973 static
974 struct notification_client *get_client_from_socket(int socket,
975 struct notification_thread_state *state)
976 {
977 struct cds_lfht_iter iter;
978 struct cds_lfht_node *node;
979 struct notification_client *client = NULL;
980
981 cds_lfht_lookup(state->client_socket_ht,
982 hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed),
983 match_client,
984 (void *) (unsigned long) socket,
985 &iter);
986 node = cds_lfht_iter_get_node(&iter);
987 if (!node) {
988 goto end;
989 }
990
991 client = caa_container_of(node, struct notification_client,
992 client_socket_ht_node);
993 end:
994 return client;
995 }
996
997 static
998 bool buffer_usage_condition_applies_to_channel(
999 const struct lttng_condition *condition,
1000 const struct channel_info *channel_info)
1001 {
1002 enum lttng_condition_status status;
1003 enum lttng_domain_type condition_domain;
1004 const char *condition_session_name = NULL;
1005 const char *condition_channel_name = NULL;
1006
1007 status = lttng_condition_buffer_usage_get_domain_type(condition,
1008 &condition_domain);
1009 assert(status == LTTNG_CONDITION_STATUS_OK);
1010 if (channel_info->key.domain != condition_domain) {
1011 goto fail;
1012 }
1013
1014 status = lttng_condition_buffer_usage_get_session_name(
1015 condition, &condition_session_name);
1016 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
1017
1018 status = lttng_condition_buffer_usage_get_channel_name(
1019 condition, &condition_channel_name);
1020 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_channel_name);
1021
1022 if (strcmp(channel_info->session_info->name, condition_session_name)) {
1023 goto fail;
1024 }
1025 if (strcmp(channel_info->name, condition_channel_name)) {
1026 goto fail;
1027 }
1028
1029 return true;
1030 fail:
1031 return false;
1032 }
1033
1034 static
1035 bool session_consumed_size_condition_applies_to_channel(
1036 const struct lttng_condition *condition,
1037 const struct channel_info *channel_info)
1038 {
1039 enum lttng_condition_status status;
1040 const char *condition_session_name = NULL;
1041
1042 status = lttng_condition_session_consumed_size_get_session_name(
1043 condition, &condition_session_name);
1044 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
1045
1046 if (strcmp(channel_info->session_info->name, condition_session_name)) {
1047 goto fail;
1048 }
1049
1050 return true;
1051 fail:
1052 return false;
1053 }
1054
1055 /*
1056 * Get the type of object to which a given trigger applies. Binding lets
1057 * the notification system evaluate a trigger's condition when a given
1058 * object's state is updated.
1059 *
1060 * For instance, a condition bound to a channel will be evaluated everytime
1061 * the channel's state is changed by a channel monitoring sample.
1062 */
1063 static
1064 enum lttng_object_type get_trigger_binding_object(
1065 const struct lttng_trigger *trigger)
1066 {
1067 const struct lttng_condition *condition;
1068
1069 condition = lttng_trigger_get_const_condition(trigger);
1070 switch (lttng_condition_get_type(condition)) {
1071 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
1072 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
1073 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
1074 return LTTNG_OBJECT_TYPE_CHANNEL;
1075 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
1076 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
1077 return LTTNG_OBJECT_TYPE_SESSION;
1078 default:
1079 return LTTNG_OBJECT_TYPE_UNKNOWN;
1080 }
1081 }
1082
1083 static
1084 bool trigger_applies_to_channel(const struct lttng_trigger *trigger,
1085 const struct channel_info *channel_info)
1086 {
1087 const struct lttng_condition *condition;
1088 bool trigger_applies;
1089
1090 condition = lttng_trigger_get_const_condition(trigger);
1091 if (!condition) {
1092 goto fail;
1093 }
1094
1095 switch (lttng_condition_get_type(condition)) {
1096 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
1097 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
1098 trigger_applies = buffer_usage_condition_applies_to_channel(
1099 condition, channel_info);
1100 break;
1101 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
1102 trigger_applies = session_consumed_size_condition_applies_to_channel(
1103 condition, channel_info);
1104 break;
1105 default:
1106 goto fail;
1107 }
1108
1109 return trigger_applies;
1110 fail:
1111 return false;
1112 }
1113
1114 static
1115 bool trigger_applies_to_client(struct lttng_trigger *trigger,
1116 struct notification_client *client)
1117 {
1118 bool applies = false;
1119 struct lttng_condition_list_element *condition_list_element;
1120
1121 cds_list_for_each_entry(condition_list_element, &client->condition_list,
1122 node) {
1123 applies = lttng_condition_is_equal(
1124 condition_list_element->condition,
1125 lttng_trigger_get_condition(trigger));
1126 if (applies) {
1127 break;
1128 }
1129 }
1130 return applies;
1131 }
1132
1133 static
1134 int match_session(struct cds_lfht_node *node, const void *key)
1135 {
1136 const char *name = key;
1137 struct session_info *session_info = caa_container_of(
1138 node, struct session_info, sessions_ht_node);
1139
1140 return !strcmp(session_info->name, name);
1141 }
1142
1143 /*
1144 * Allocate an empty lttng_session_trigger_list for the session named
1145 * 'session_name'.
1146 *
1147 * No ownership of 'session_name' is assumed by the session trigger list.
1148 * It is the caller's responsability to ensure the session name is alive
1149 * for as long as this list is.
1150 */
1151 static
1152 struct lttng_session_trigger_list *lttng_session_trigger_list_create(
1153 const char *session_name,
1154 struct cds_lfht *session_triggers_ht)
1155 {
1156 struct lttng_session_trigger_list *list;
1157
1158 list = zmalloc(sizeof(*list));
1159 if (!list) {
1160 goto end;
1161 }
1162 list->session_name = session_name;
1163 CDS_INIT_LIST_HEAD(&list->list);
1164 cds_lfht_node_init(&list->session_triggers_ht_node);
1165 list->session_triggers_ht = session_triggers_ht;
1166
1167 rcu_read_lock();
1168 /* Publish the list through the session_triggers_ht. */
1169 cds_lfht_add(session_triggers_ht,
1170 hash_key_str(session_name, lttng_ht_seed),
1171 &list->session_triggers_ht_node);
1172 rcu_read_unlock();
1173 end:
1174 return list;
1175 }
1176
1177 static
1178 void free_session_trigger_list_rcu(struct rcu_head *node)
1179 {
1180 free(caa_container_of(node, struct lttng_session_trigger_list,
1181 rcu_node));
1182 }
1183
1184 static
1185 void lttng_session_trigger_list_destroy(struct lttng_session_trigger_list *list)
1186 {
1187 struct lttng_trigger_list_element *trigger_list_element, *tmp;
1188
1189 /* Empty the list element by element, and then free the list itself. */
1190 cds_list_for_each_entry_safe(trigger_list_element, tmp,
1191 &list->list, node) {
1192 cds_list_del(&trigger_list_element->node);
1193 free(trigger_list_element);
1194 }
1195 rcu_read_lock();
1196 /* Unpublish the list from the session_triggers_ht. */
1197 cds_lfht_del(list->session_triggers_ht,
1198 &list->session_triggers_ht_node);
1199 rcu_read_unlock();
1200 call_rcu(&list->rcu_node, free_session_trigger_list_rcu);
1201 }
1202
1203 static
1204 int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
1205 const struct lttng_trigger *trigger)
1206 {
1207 int ret = 0;
1208 struct lttng_trigger_list_element *new_element =
1209 zmalloc(sizeof(*new_element));
1210
1211 if (!new_element) {
1212 ret = -1;
1213 goto end;
1214 }
1215 CDS_INIT_LIST_HEAD(&new_element->node);
1216 new_element->trigger = trigger;
1217 cds_list_add(&new_element->node, &list->list);
1218 end:
1219 return ret;
1220 }
1221
1222 static
1223 bool trigger_applies_to_session(const struct lttng_trigger *trigger,
1224 const char *session_name)
1225 {
1226 bool applies = false;
1227 const struct lttng_condition *condition;
1228
1229 condition = lttng_trigger_get_const_condition(trigger);
1230 switch (lttng_condition_get_type(condition)) {
1231 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
1232 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
1233 {
1234 enum lttng_condition_status condition_status;
1235 const char *condition_session_name;
1236
1237 condition_status = lttng_condition_session_rotation_get_session_name(
1238 condition, &condition_session_name);
1239 if (condition_status != LTTNG_CONDITION_STATUS_OK) {
1240 ERR("[notification-thread] Failed to retrieve session rotation condition's session name");
1241 goto end;
1242 }
1243
1244 assert(condition_session_name);
1245 applies = !strcmp(condition_session_name, session_name);
1246 break;
1247 }
1248 default:
1249 goto end;
1250 }
1251 end:
1252 return applies;
1253 }
1254
1255 /*
1256 * Allocate and initialize an lttng_session_trigger_list which contains
1257 * all triggers that apply to the session named 'session_name'.
1258 *
1259 * No ownership of 'session_name' is assumed by the session trigger list.
1260 * It is the caller's responsability to ensure the session name is alive
1261 * for as long as this list is.
1262 */
1263 static
1264 struct lttng_session_trigger_list *lttng_session_trigger_list_build(
1265 const struct notification_thread_state *state,
1266 const char *session_name)
1267 {
1268 int trigger_count = 0;
1269 struct lttng_session_trigger_list *session_trigger_list = NULL;
1270 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1271 struct cds_lfht_iter iter;
1272
1273 session_trigger_list = lttng_session_trigger_list_create(session_name,
1274 state->session_triggers_ht);
1275
1276 /* Add all triggers applying to the session named 'session_name'. */
1277 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1278 node) {
1279 int ret;
1280
1281 if (!trigger_applies_to_session(trigger_ht_element->trigger,
1282 session_name)) {
1283 continue;
1284 }
1285
1286 ret = lttng_session_trigger_list_add(session_trigger_list,
1287 trigger_ht_element->trigger);
1288 if (ret) {
1289 goto error;
1290 }
1291
1292 trigger_count++;
1293 }
1294
1295 DBG("[notification-thread] Found %i triggers that apply to newly created session",
1296 trigger_count);
1297 return session_trigger_list;
1298 error:
1299 lttng_session_trigger_list_destroy(session_trigger_list);
1300 return NULL;
1301 }
1302
1303 static
1304 struct session_info *find_or_create_session_info(
1305 struct notification_thread_state *state,
1306 const char *name, uid_t uid, gid_t gid)
1307 {
1308 struct session_info *session = NULL;
1309 struct cds_lfht_node *node;
1310 struct cds_lfht_iter iter;
1311 struct lttng_session_trigger_list *trigger_list;
1312
1313 rcu_read_lock();
1314 cds_lfht_lookup(state->sessions_ht,
1315 hash_key_str(name, lttng_ht_seed),
1316 match_session,
1317 name,
1318 &iter);
1319 node = cds_lfht_iter_get_node(&iter);
1320 if (node) {
1321 DBG("[notification-thread] Found session info of session \"%s\" (uid = %i, gid = %i)",
1322 name, uid, gid);
1323 session = caa_container_of(node, struct session_info,
1324 sessions_ht_node);
1325 assert(session->uid == uid);
1326 assert(session->gid == gid);
1327 session_info_get(session);
1328 goto end;
1329 }
1330
1331 trigger_list = lttng_session_trigger_list_build(state, name);
1332 if (!trigger_list) {
1333 goto error;
1334 }
1335
1336 session = session_info_create(name, uid, gid, trigger_list,
1337 state->sessions_ht);
1338 if (!session) {
1339 ERR("[notification-thread] Failed to allocation session info for session \"%s\" (uid = %i, gid = %i)",
1340 name, uid, gid);
1341 goto error;
1342 }
1343 trigger_list = NULL;
1344
1345 cds_lfht_add(state->sessions_ht, hash_key_str(name, lttng_ht_seed),
1346 &session->sessions_ht_node);
1347 end:
1348 rcu_read_unlock();
1349 return session;
1350 error:
1351 rcu_read_unlock();
1352 session_info_put(session);
1353 return NULL;
1354 }
1355
1356 static
1357 int handle_notification_thread_command_add_channel(
1358 struct notification_thread_state *state,
1359 const char *session_name, uid_t session_uid, gid_t session_gid,
1360 const char *channel_name, enum lttng_domain_type channel_domain,
1361 uint64_t channel_key_int, uint64_t channel_capacity,
1362 enum lttng_error_code *cmd_result)
1363 {
1364 struct cds_list_head trigger_list;
1365 struct channel_info *new_channel_info = NULL;
1366 struct channel_key channel_key = {
1367 .key = channel_key_int,
1368 .domain = channel_domain,
1369 };
1370 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
1371 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1372 int trigger_count = 0;
1373 struct cds_lfht_iter iter;
1374 struct session_info *session_info = NULL;
1375
1376 DBG("[notification-thread] Adding channel %s from session %s, channel key = %" PRIu64 " in %s domain",
1377 channel_name, session_name, channel_key_int,
1378 channel_domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
1379
1380 CDS_INIT_LIST_HEAD(&trigger_list);
1381
1382 session_info = find_or_create_session_info(state, session_name,
1383 session_uid, session_gid);
1384 if (!session_info) {
1385 /* Allocation error or an internal error occured. */
1386 goto error;
1387 }
1388
1389 new_channel_info = channel_info_create(channel_name, &channel_key,
1390 channel_capacity, session_info);
1391 if (!new_channel_info) {
1392 goto error;
1393 }
1394
1395 /* Build a list of all triggers applying to the new channel. */
1396 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1397 node) {
1398 struct lttng_trigger_list_element *new_element;
1399
1400 if (!trigger_applies_to_channel(trigger_ht_element->trigger,
1401 new_channel_info)) {
1402 continue;
1403 }
1404
1405 new_element = zmalloc(sizeof(*new_element));
1406 if (!new_element) {
1407 goto error;
1408 }
1409 CDS_INIT_LIST_HEAD(&new_element->node);
1410 new_element->trigger = trigger_ht_element->trigger;
1411 cds_list_add(&new_element->node, &trigger_list);
1412 trigger_count++;
1413 }
1414
1415 DBG("[notification-thread] Found %i triggers that apply to newly added channel",
1416 trigger_count);
1417 channel_trigger_list = zmalloc(sizeof(*channel_trigger_list));
1418 if (!channel_trigger_list) {
1419 goto error;
1420 }
1421 channel_trigger_list->channel_key = new_channel_info->key;
1422 CDS_INIT_LIST_HEAD(&channel_trigger_list->list);
1423 cds_lfht_node_init(&channel_trigger_list->channel_triggers_ht_node);
1424 cds_list_splice(&trigger_list, &channel_trigger_list->list);
1425
1426 rcu_read_lock();
1427 /* Add channel to the channel_ht which owns the channel_infos. */
1428 cds_lfht_add(state->channels_ht,
1429 hash_channel_key(&new_channel_info->key),
1430 &new_channel_info->channels_ht_node);
1431 /*
1432 * Add the list of triggers associated with this channel to the
1433 * channel_triggers_ht.
1434 */
1435 cds_lfht_add(state->channel_triggers_ht,
1436 hash_channel_key(&new_channel_info->key),
1437 &channel_trigger_list->channel_triggers_ht_node);
1438 rcu_read_unlock();
1439 session_info_put(session_info);
1440 *cmd_result = LTTNG_OK;
1441 return 0;
1442 error:
1443 channel_info_destroy(new_channel_info);
1444 session_info_put(session_info);
1445 return 1;
1446 }
1447
1448 static
1449 int handle_notification_thread_command_remove_channel(
1450 struct notification_thread_state *state,
1451 uint64_t channel_key, enum lttng_domain_type domain,
1452 enum lttng_error_code *cmd_result)
1453 {
1454 struct cds_lfht_node *node;
1455 struct cds_lfht_iter iter;
1456 struct lttng_channel_trigger_list *trigger_list;
1457 struct lttng_trigger_list_element *trigger_list_element, *tmp;
1458 struct channel_key key = { .key = channel_key, .domain = domain };
1459 struct channel_info *channel_info;
1460
1461 DBG("[notification-thread] Removing channel key = %" PRIu64 " in %s domain",
1462 channel_key, domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
1463
1464 rcu_read_lock();
1465
1466 cds_lfht_lookup(state->channel_triggers_ht,
1467 hash_channel_key(&key),
1468 match_channel_trigger_list,
1469 &key,
1470 &iter);
1471 node = cds_lfht_iter_get_node(&iter);
1472 /*
1473 * There is a severe internal error if we are being asked to remove a
1474 * channel that doesn't exist.
1475 */
1476 if (!node) {
1477 ERR("[notification-thread] Channel being removed is unknown to the notification thread");
1478 goto end;
1479 }
1480
1481 /* Free the list of triggers associated with this channel. */
1482 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
1483 channel_triggers_ht_node);
1484 cds_list_for_each_entry_safe(trigger_list_element, tmp,
1485 &trigger_list->list, node) {
1486 cds_list_del(&trigger_list_element->node);
1487 free(trigger_list_element);
1488 }
1489 cds_lfht_del(state->channel_triggers_ht, node);
1490 free(trigger_list);
1491
1492 /* Free sampled channel state. */
1493 cds_lfht_lookup(state->channel_state_ht,
1494 hash_channel_key(&key),
1495 match_channel_state_sample,
1496 &key,
1497 &iter);
1498 node = cds_lfht_iter_get_node(&iter);
1499 /*
1500 * This is expected to be NULL if the channel is destroyed before we
1501 * received a sample.
1502 */
1503 if (node) {
1504 struct channel_state_sample *sample = caa_container_of(node,
1505 struct channel_state_sample,
1506 channel_state_ht_node);
1507
1508 cds_lfht_del(state->channel_state_ht, node);
1509 free(sample);
1510 }
1511
1512 /* Remove the channel from the channels_ht and free it. */
1513 cds_lfht_lookup(state->channels_ht,
1514 hash_channel_key(&key),
1515 match_channel_info,
1516 &key,
1517 &iter);
1518 node = cds_lfht_iter_get_node(&iter);
1519 assert(node);
1520 channel_info = caa_container_of(node, struct channel_info,
1521 channels_ht_node);
1522 cds_lfht_del(state->channels_ht, node);
1523 channel_info_destroy(channel_info);
1524 end:
1525 rcu_read_unlock();
1526 *cmd_result = LTTNG_OK;
1527 return 0;
1528 }
1529
1530 static
1531 int condition_is_supported(struct lttng_condition *condition)
1532 {
1533 int ret;
1534
1535 switch (lttng_condition_get_type(condition)) {
1536 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
1537 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
1538 {
1539 enum lttng_domain_type domain;
1540
1541 ret = lttng_condition_buffer_usage_get_domain_type(condition,
1542 &domain);
1543 if (ret) {
1544 ret = -1;
1545 goto end;
1546 }
1547
1548 if (domain != LTTNG_DOMAIN_KERNEL) {
1549 ret = 1;
1550 goto end;
1551 }
1552
1553 /*
1554 * Older kernel tracers don't expose the API to monitor their
1555 * buffers. Therefore, we reject triggers that require that
1556 * mechanism to be available to be evaluated.
1557 */
1558 ret = kernel_supports_ring_buffer_snapshot_sample_positions(
1559 kernel_tracer_fd);
1560 break;
1561 }
1562 default:
1563 ret = 1;
1564 }
1565 end:
1566 return ret;
1567 }
1568
1569 /* Must be called with RCU read lock held. */
1570 static
1571 int bind_trigger_to_matching_session(const struct lttng_trigger *trigger,
1572 struct notification_thread_state *state)
1573 {
1574 int ret = 0;
1575 struct cds_lfht_node *node;
1576 struct cds_lfht_iter iter;
1577 const struct lttng_condition *condition;
1578 const char *session_name;
1579 struct lttng_session_trigger_list *trigger_list;
1580
1581 condition = lttng_trigger_get_const_condition(trigger);
1582 switch (lttng_condition_get_type(condition)) {
1583 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
1584 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
1585 {
1586 enum lttng_condition_status status;
1587
1588 status = lttng_condition_session_rotation_get_session_name(
1589 condition, &session_name);
1590 if (status != LTTNG_CONDITION_STATUS_OK) {
1591 ERR("[notification-thread] Failed to bind trigger to session: unable to get 'session_rotation' condition's session name");
1592 ret = -1;
1593 goto end;
1594 }
1595 break;
1596 }
1597 default:
1598 ret = -1;
1599 goto end;
1600 }
1601
1602 cds_lfht_lookup(state->session_triggers_ht,
1603 hash_key_str(session_name, lttng_ht_seed),
1604 match_session_trigger_list,
1605 session_name,
1606 &iter);
1607 node = cds_lfht_iter_get_node(&iter);
1608 if (!node) {
1609 /*
1610 * Not an error, the list of triggers applying to that session
1611 * will be initialized when the session is created.
1612 */
1613 DBG("[notification-thread] Unable to bind trigger applying to session \"%s\" as it is not yet known to the notification system",
1614 session_name);
1615 goto end;
1616 }
1617
1618 trigger_list = caa_container_of(node,
1619 struct lttng_session_trigger_list,
1620 session_triggers_ht_node);
1621
1622 DBG("[notification-thread] Newly registered trigger bound to session \"%s\"",
1623 session_name);
1624 ret = lttng_session_trigger_list_add(trigger_list, trigger);
1625 end:
1626 return ret;
1627 }
1628
1629 /* Must be called with RCU read lock held. */
1630 static
1631 int bind_trigger_to_matching_channels(const struct lttng_trigger *trigger,
1632 struct notification_thread_state *state)
1633 {
1634 int ret = 0;
1635 struct cds_lfht_node *node;
1636 struct cds_lfht_iter iter;
1637 struct channel_info *channel;
1638
1639 cds_lfht_for_each_entry(state->channels_ht, &iter, channel,
1640 channels_ht_node) {
1641 struct lttng_trigger_list_element *trigger_list_element;
1642 struct lttng_channel_trigger_list *trigger_list;
1643
1644 if (!trigger_applies_to_channel(trigger, channel)) {
1645 continue;
1646 }
1647
1648 cds_lfht_lookup(state->channel_triggers_ht,
1649 hash_channel_key(&channel->key),
1650 match_channel_trigger_list,
1651 &channel->key,
1652 &iter);
1653 node = cds_lfht_iter_get_node(&iter);
1654 assert(node);
1655 trigger_list = caa_container_of(node,
1656 struct lttng_channel_trigger_list,
1657 channel_triggers_ht_node);
1658
1659 trigger_list_element = zmalloc(sizeof(*trigger_list_element));
1660 if (!trigger_list_element) {
1661 ret = -1;
1662 goto end;
1663 }
1664 CDS_INIT_LIST_HEAD(&trigger_list_element->node);
1665 trigger_list_element->trigger = trigger;
1666 cds_list_add(&trigger_list_element->node, &trigger_list->list);
1667 DBG("[notification-thread] Newly registered trigger bound to channel \"%s\"",
1668 channel->name);
1669 }
1670 end:
1671 return ret;
1672 }
1673
1674 /*
1675 * FIXME A client's credentials are not checked when registering a trigger, nor
1676 * are they stored alongside with the trigger.
1677 *
1678 * The effects of this are benign since:
1679 * - The client will succeed in registering the trigger, as it is valid,
1680 * - The trigger will, internally, be bound to the channel/session,
1681 * - The notifications will not be sent since the client's credentials
1682 * are checked against the channel at that moment.
1683 *
1684 * If this function returns a non-zero value, it means something is
1685 * fundamentally broken and the whole subsystem/thread will be torn down.
1686 *
1687 * If a non-fatal error occurs, just set the cmd_result to the appropriate
1688 * error code.
1689 */
1690 static
1691 int handle_notification_thread_command_register_trigger(
1692 struct notification_thread_state *state,
1693 struct lttng_trigger *trigger,
1694 enum lttng_error_code *cmd_result)
1695 {
1696 int ret = 0;
1697 struct lttng_condition *condition;
1698 struct notification_client *client;
1699 struct notification_client_list *client_list = NULL;
1700 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1701 struct notification_client_list_element *client_list_element, *tmp;
1702 struct cds_lfht_node *node;
1703 struct cds_lfht_iter iter;
1704 bool free_trigger = true;
1705
1706 rcu_read_lock();
1707
1708 condition = lttng_trigger_get_condition(trigger);
1709 assert(condition);
1710
1711 ret = condition_is_supported(condition);
1712 if (ret < 0) {
1713 goto error;
1714 } else if (ret == 0) {
1715 *cmd_result = LTTNG_ERR_NOT_SUPPORTED;
1716 goto error;
1717 } else {
1718 /* Feature is supported, continue. */
1719 ret = 0;
1720 }
1721
1722 trigger_ht_element = zmalloc(sizeof(*trigger_ht_element));
1723 if (!trigger_ht_element) {
1724 ret = -1;
1725 goto error;
1726 }
1727
1728 /* Add trigger to the trigger_ht. */
1729 cds_lfht_node_init(&trigger_ht_element->node);
1730 trigger_ht_element->trigger = trigger;
1731
1732 node = cds_lfht_add_unique(state->triggers_ht,
1733 lttng_condition_hash(condition),
1734 match_condition,
1735 condition,
1736 &trigger_ht_element->node);
1737 if (node != &trigger_ht_element->node) {
1738 /* Not a fatal error, simply report it to the client. */
1739 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
1740 goto error_free_ht_element;
1741 }
1742
1743 /*
1744 * Ownership of the trigger and of its wrapper was transfered to
1745 * the triggers_ht.
1746 */
1747 trigger_ht_element = NULL;
1748 free_trigger = false;
1749
1750 /*
1751 * The rest only applies to triggers that have a "notify" action.
1752 * It is not skipped as this is the only action type currently
1753 * supported.
1754 */
1755 client_list = zmalloc(sizeof(*client_list));
1756 if (!client_list) {
1757 ret = -1;
1758 goto error_free_ht_element;
1759 }
1760 cds_lfht_node_init(&client_list->notification_trigger_ht_node);
1761 CDS_INIT_LIST_HEAD(&client_list->list);
1762 client_list->trigger = trigger;
1763
1764 /* Build a list of clients to which this new trigger applies. */
1765 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
1766 client_socket_ht_node) {
1767 if (!trigger_applies_to_client(trigger, client)) {
1768 continue;
1769 }
1770
1771 client_list_element = zmalloc(sizeof(*client_list_element));
1772 if (!client_list_element) {
1773 ret = -1;
1774 goto error_free_client_list;
1775 }
1776 CDS_INIT_LIST_HEAD(&client_list_element->node);
1777 client_list_element->client = client;
1778 cds_list_add(&client_list_element->node, &client_list->list);
1779 }
1780
1781 cds_lfht_add(state->notification_trigger_clients_ht,
1782 lttng_condition_hash(condition),
1783 &client_list->notification_trigger_ht_node);
1784
1785 switch (get_trigger_binding_object(trigger)) {
1786 case LTTNG_OBJECT_TYPE_SESSION:
1787 /* Add the trigger to the list if it matches a known session. */
1788 ret = bind_trigger_to_matching_session(trigger, state);
1789 if (ret) {
1790 goto error_free_client_list;
1791 }
1792 case LTTNG_OBJECT_TYPE_CHANNEL:
1793 /*
1794 * Add the trigger to list of triggers bound to the channels
1795 * currently known.
1796 */
1797 ret = bind_trigger_to_matching_channels(trigger, state);
1798 if (ret) {
1799 goto error_free_client_list;
1800 }
1801 break;
1802 case LTTNG_OBJECT_TYPE_NONE:
1803 break;
1804 default:
1805 ERR("[notification-thread] Unknown object type on which to bind a newly registered trigger was encountered");
1806 ret = -1;
1807 goto error_free_client_list;
1808 }
1809
1810 /*
1811 * Since there is nothing preventing clients from subscribing to a
1812 * condition before the corresponding trigger is registered, we have
1813 * to evaluate this new condition right away.
1814 *
1815 * At some point, we were waiting for the next "evaluation" (e.g. on
1816 * reception of a channel sample) to evaluate this new condition, but
1817 * that was broken.
1818 *
1819 * The reason it was broken is that waiting for the next sample
1820 * does not allow us to properly handle transitions for edge-triggered
1821 * conditions.
1822 *
1823 * Consider this example: when we handle a new channel sample, we
1824 * evaluate each conditions twice: once with the previous state, and
1825 * again with the newest state. We then use those two results to
1826 * determine whether a state change happened: a condition was false and
1827 * became true. If a state change happened, we have to notify clients.
1828 *
1829 * Now, if a client subscribes to a given notification and registers
1830 * a trigger *after* that subscription, we have to make sure the
1831 * condition is evaluated at this point while considering only the
1832 * current state. Otherwise, the next evaluation cycle may only see
1833 * that the evaluations remain the same (true for samples n-1 and n) and
1834 * the client will never know that the condition has been met.
1835 */
1836 cds_list_for_each_entry_safe(client_list_element, tmp,
1837 &client_list->list, node) {
1838 ret = evaluate_condition_for_client(trigger, condition,
1839 client_list_element->client, state);
1840 if (ret) {
1841 goto error_free_client_list;
1842 }
1843 }
1844
1845 /*
1846 * Client list ownership transferred to the
1847 * notification_trigger_clients_ht.
1848 */
1849 client_list = NULL;
1850
1851 *cmd_result = LTTNG_OK;
1852 error_free_client_list:
1853 if (client_list) {
1854 cds_list_for_each_entry_safe(client_list_element, tmp,
1855 &client_list->list, node) {
1856 free(client_list_element);
1857 }
1858 free(client_list);
1859 }
1860 error_free_ht_element:
1861 free(trigger_ht_element);
1862 error:
1863 if (free_trigger) {
1864 struct lttng_action *action = lttng_trigger_get_action(trigger);
1865
1866 lttng_condition_destroy(condition);
1867 lttng_action_destroy(action);
1868 lttng_trigger_destroy(trigger);
1869 }
1870 rcu_read_unlock();
1871 return ret;
1872 }
1873
1874 static
1875 int handle_notification_thread_command_unregister_trigger(
1876 struct notification_thread_state *state,
1877 struct lttng_trigger *trigger,
1878 enum lttng_error_code *_cmd_reply)
1879 {
1880 struct cds_lfht_iter iter;
1881 struct cds_lfht_node *node, *triggers_ht_node;
1882 struct lttng_channel_trigger_list *trigger_list;
1883 struct notification_client_list *client_list;
1884 struct notification_client_list_element *client_list_element, *tmp;
1885 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1886 struct lttng_condition *condition = lttng_trigger_get_condition(
1887 trigger);
1888 struct lttng_action *action;
1889 enum lttng_error_code cmd_reply;
1890
1891 rcu_read_lock();
1892
1893 cds_lfht_lookup(state->triggers_ht,
1894 lttng_condition_hash(condition),
1895 match_condition,
1896 condition,
1897 &iter);
1898 triggers_ht_node = cds_lfht_iter_get_node(&iter);
1899 if (!triggers_ht_node) {
1900 cmd_reply = LTTNG_ERR_TRIGGER_NOT_FOUND;
1901 goto end;
1902 } else {
1903 cmd_reply = LTTNG_OK;
1904 }
1905
1906 /* Remove trigger from channel_triggers_ht. */
1907 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter, trigger_list,
1908 channel_triggers_ht_node) {
1909 struct lttng_trigger_list_element *trigger_element, *tmp;
1910
1911 cds_list_for_each_entry_safe(trigger_element, tmp,
1912 &trigger_list->list, node) {
1913 const struct lttng_condition *current_condition =
1914 lttng_trigger_get_const_condition(
1915 trigger_element->trigger);
1916
1917 assert(current_condition);
1918 if (!lttng_condition_is_equal(condition,
1919 current_condition)) {
1920 continue;
1921 }
1922
1923 DBG("[notification-thread] Removed trigger from channel_triggers_ht");
1924 cds_list_del(&trigger_element->node);
1925 /* A trigger can only appear once per channel */
1926 break;
1927 }
1928 }
1929
1930 /*
1931 * Remove and release the client list from
1932 * notification_trigger_clients_ht.
1933 */
1934 cds_lfht_lookup(state->notification_trigger_clients_ht,
1935 lttng_condition_hash(condition),
1936 match_client_list,
1937 trigger,
1938 &iter);
1939 node = cds_lfht_iter_get_node(&iter);
1940 assert(node);
1941 client_list = caa_container_of(node, struct notification_client_list,
1942 notification_trigger_ht_node);
1943 cds_list_for_each_entry_safe(client_list_element, tmp,
1944 &client_list->list, node) {
1945 free(client_list_element);
1946 }
1947 cds_lfht_del(state->notification_trigger_clients_ht, node);
1948 free(client_list);
1949
1950 /* Remove trigger from triggers_ht. */
1951 trigger_ht_element = caa_container_of(triggers_ht_node,
1952 struct lttng_trigger_ht_element, node);
1953 cds_lfht_del(state->triggers_ht, triggers_ht_node);
1954
1955 condition = lttng_trigger_get_condition(trigger_ht_element->trigger);
1956 lttng_condition_destroy(condition);
1957 action = lttng_trigger_get_action(trigger_ht_element->trigger);
1958 lttng_action_destroy(action);
1959 lttng_trigger_destroy(trigger_ht_element->trigger);
1960 free(trigger_ht_element);
1961 end:
1962 rcu_read_unlock();
1963 if (_cmd_reply) {
1964 *_cmd_reply = cmd_reply;
1965 }
1966 return 0;
1967 }
1968
1969 /* Returns 0 on success, 1 on exit requested, negative value on error. */
1970 int handle_notification_thread_command(
1971 struct notification_thread_handle *handle,
1972 struct notification_thread_state *state)
1973 {
1974 int ret;
1975 uint64_t counter;
1976 struct notification_thread_command *cmd;
1977
1978 /* Read the event pipe to put it back into a quiescent state. */
1979 ret = read(lttng_pipe_get_readfd(handle->cmd_queue.event_pipe), &counter,
1980 sizeof(counter));
1981 if (ret == -1) {
1982 goto error;
1983 }
1984
1985 pthread_mutex_lock(&handle->cmd_queue.lock);
1986 cmd = cds_list_first_entry(&handle->cmd_queue.list,
1987 struct notification_thread_command, cmd_list_node);
1988 switch (cmd->type) {
1989 case NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER:
1990 DBG("[notification-thread] Received register trigger command");
1991 ret = handle_notification_thread_command_register_trigger(
1992 state, cmd->parameters.trigger,
1993 &cmd->reply_code);
1994 break;
1995 case NOTIFICATION_COMMAND_TYPE_UNREGISTER_TRIGGER:
1996 DBG("[notification-thread] Received unregister trigger command");
1997 ret = handle_notification_thread_command_unregister_trigger(
1998 state, cmd->parameters.trigger,
1999 &cmd->reply_code);
2000 break;
2001 case NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL:
2002 DBG("[notification-thread] Received add channel command");
2003 ret = handle_notification_thread_command_add_channel(
2004 state,
2005 cmd->parameters.add_channel.session.name,
2006 cmd->parameters.add_channel.session.uid,
2007 cmd->parameters.add_channel.session.gid,
2008 cmd->parameters.add_channel.channel.name,
2009 cmd->parameters.add_channel.channel.domain,
2010 cmd->parameters.add_channel.channel.key,
2011 cmd->parameters.add_channel.channel.capacity,
2012 &cmd->reply_code);
2013 break;
2014 case NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL:
2015 DBG("[notification-thread] Received remove channel command");
2016 ret = handle_notification_thread_command_remove_channel(
2017 state, cmd->parameters.remove_channel.key,
2018 cmd->parameters.remove_channel.domain,
2019 &cmd->reply_code);
2020 break;
2021 case NOTIFICATION_COMMAND_TYPE_QUIT:
2022 DBG("[notification-thread] Received quit command");
2023 cmd->reply_code = LTTNG_OK;
2024 ret = 1;
2025 goto end;
2026 default:
2027 ERR("[notification-thread] Unknown internal command received");
2028 goto error_unlock;
2029 }
2030
2031 if (ret) {
2032 goto error_unlock;
2033 }
2034 end:
2035 cds_list_del(&cmd->cmd_list_node);
2036 lttng_waiter_wake_up(&cmd->reply_waiter);
2037 pthread_mutex_unlock(&handle->cmd_queue.lock);
2038 return ret;
2039 error_unlock:
2040 /* Wake-up and return a fatal error to the calling thread. */
2041 lttng_waiter_wake_up(&cmd->reply_waiter);
2042 pthread_mutex_unlock(&handle->cmd_queue.lock);
2043 cmd->reply_code = LTTNG_ERR_FATAL;
2044 error:
2045 /* Indicate a fatal error to the caller. */
2046 return -1;
2047 }
2048
2049 static
2050 unsigned long hash_client_socket(int socket)
2051 {
2052 return hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed);
2053 }
2054
2055 static
2056 int socket_set_non_blocking(int socket)
2057 {
2058 int ret, flags;
2059
2060 /* Set the pipe as non-blocking. */
2061 ret = fcntl(socket, F_GETFL, 0);
2062 if (ret == -1) {
2063 PERROR("fcntl get socket flags");
2064 goto end;
2065 }
2066 flags = ret;
2067
2068 ret = fcntl(socket, F_SETFL, flags | O_NONBLOCK);
2069 if (ret == -1) {
2070 PERROR("fcntl set O_NONBLOCK socket flag");
2071 goto end;
2072 }
2073 DBG("Client socket (fd = %i) set as non-blocking", socket);
2074 end:
2075 return ret;
2076 }
2077
2078 static
2079 int client_reset_inbound_state(struct notification_client *client)
2080 {
2081 int ret;
2082
2083 ret = lttng_dynamic_buffer_set_size(
2084 &client->communication.inbound.buffer, 0);
2085 assert(!ret);
2086
2087 client->communication.inbound.bytes_to_receive =
2088 sizeof(struct lttng_notification_channel_message);
2089 client->communication.inbound.msg_type =
2090 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN;
2091 LTTNG_SOCK_SET_UID_CRED(&client->communication.inbound.creds, -1);
2092 LTTNG_SOCK_SET_GID_CRED(&client->communication.inbound.creds, -1);
2093 ret = lttng_dynamic_buffer_set_size(
2094 &client->communication.inbound.buffer,
2095 client->communication.inbound.bytes_to_receive);
2096 return ret;
2097 }
2098
2099 int handle_notification_thread_client_connect(
2100 struct notification_thread_state *state)
2101 {
2102 int ret;
2103 struct notification_client *client;
2104
2105 DBG("[notification-thread] Handling new notification channel client connection");
2106
2107 client = zmalloc(sizeof(*client));
2108 if (!client) {
2109 /* Fatal error. */
2110 ret = -1;
2111 goto error;
2112 }
2113 CDS_INIT_LIST_HEAD(&client->condition_list);
2114 lttng_dynamic_buffer_init(&client->communication.inbound.buffer);
2115 lttng_dynamic_buffer_init(&client->communication.outbound.buffer);
2116 client->communication.inbound.expect_creds = true;
2117 ret = client_reset_inbound_state(client);
2118 if (ret) {
2119 ERR("[notification-thread] Failed to reset client communication's inbound state");
2120 ret = 0;
2121 goto error;
2122 }
2123
2124 ret = lttcomm_accept_unix_sock(state->notification_channel_socket);
2125 if (ret < 0) {
2126 ERR("[notification-thread] Failed to accept new notification channel client connection");
2127 ret = 0;
2128 goto error;
2129 }
2130
2131 client->socket = ret;
2132
2133 ret = socket_set_non_blocking(client->socket);
2134 if (ret) {
2135 ERR("[notification-thread] Failed to set new notification channel client connection socket as non-blocking");
2136 goto error;
2137 }
2138
2139 ret = lttcomm_setsockopt_creds_unix_sock(client->socket);
2140 if (ret < 0) {
2141 ERR("[notification-thread] Failed to set socket options on new notification channel client socket");
2142 ret = 0;
2143 goto error;
2144 }
2145
2146 ret = lttng_poll_add(&state->events, client->socket,
2147 LPOLLIN | LPOLLERR |
2148 LPOLLHUP | LPOLLRDHUP);
2149 if (ret < 0) {
2150 ERR("[notification-thread] Failed to add notification channel client socket to poll set");
2151 ret = 0;
2152 goto error;
2153 }
2154 DBG("[notification-thread] Added new notification channel client socket (%i) to poll set",
2155 client->socket);
2156
2157 rcu_read_lock();
2158 cds_lfht_add(state->client_socket_ht,
2159 hash_client_socket(client->socket),
2160 &client->client_socket_ht_node);
2161 rcu_read_unlock();
2162
2163 return ret;
2164 error:
2165 notification_client_destroy(client, state);
2166 return ret;
2167 }
2168
2169 int handle_notification_thread_client_disconnect(
2170 int client_socket,
2171 struct notification_thread_state *state)
2172 {
2173 int ret = 0;
2174 struct notification_client *client;
2175
2176 rcu_read_lock();
2177 DBG("[notification-thread] Closing client connection (socket fd = %i)",
2178 client_socket);
2179 client = get_client_from_socket(client_socket, state);
2180 if (!client) {
2181 /* Internal state corruption, fatal error. */
2182 ERR("[notification-thread] Unable to find client (socket fd = %i)",
2183 client_socket);
2184 ret = -1;
2185 goto end;
2186 }
2187
2188 ret = lttng_poll_del(&state->events, client_socket);
2189 if (ret) {
2190 ERR("[notification-thread] Failed to remove client socket from poll set");
2191 }
2192 cds_lfht_del(state->client_socket_ht,
2193 &client->client_socket_ht_node);
2194 notification_client_destroy(client, state);
2195 end:
2196 rcu_read_unlock();
2197 return ret;
2198 }
2199
2200 int handle_notification_thread_client_disconnect_all(
2201 struct notification_thread_state *state)
2202 {
2203 struct cds_lfht_iter iter;
2204 struct notification_client *client;
2205 bool error_encoutered = false;
2206
2207 rcu_read_lock();
2208 DBG("[notification-thread] Closing all client connections");
2209 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
2210 client_socket_ht_node) {
2211 int ret;
2212
2213 ret = handle_notification_thread_client_disconnect(
2214 client->socket, state);
2215 if (ret) {
2216 error_encoutered = true;
2217 }
2218 }
2219 rcu_read_unlock();
2220 return error_encoutered ? 1 : 0;
2221 }
2222
2223 int handle_notification_thread_trigger_unregister_all(
2224 struct notification_thread_state *state)
2225 {
2226 bool error_occurred = false;
2227 struct cds_lfht_iter iter;
2228 struct lttng_trigger_ht_element *trigger_ht_element;
2229
2230 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
2231 node) {
2232 int ret = handle_notification_thread_command_unregister_trigger(
2233 state, trigger_ht_element->trigger, NULL);
2234 if (ret) {
2235 error_occurred = true;
2236 }
2237 }
2238 return error_occurred ? -1 : 0;
2239 }
2240
2241 static
2242 int client_flush_outgoing_queue(struct notification_client *client,
2243 struct notification_thread_state *state)
2244 {
2245 ssize_t ret;
2246 size_t to_send_count;
2247
2248 assert(client->communication.outbound.buffer.size != 0);
2249 to_send_count = client->communication.outbound.buffer.size;
2250 DBG("[notification-thread] Flushing client (socket fd = %i) outgoing queue",
2251 client->socket);
2252
2253 ret = lttcomm_send_unix_sock_non_block(client->socket,
2254 client->communication.outbound.buffer.data,
2255 to_send_count);
2256 if ((ret < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) ||
2257 (ret > 0 && ret < to_send_count)) {
2258 DBG("[notification-thread] Client (socket fd = %i) outgoing queue could not be completely flushed",
2259 client->socket);
2260 to_send_count -= max(ret, 0);
2261
2262 memcpy(client->communication.outbound.buffer.data,
2263 client->communication.outbound.buffer.data +
2264 client->communication.outbound.buffer.size - to_send_count,
2265 to_send_count);
2266 ret = lttng_dynamic_buffer_set_size(
2267 &client->communication.outbound.buffer,
2268 to_send_count);
2269 if (ret) {
2270 goto error;
2271 }
2272
2273 /*
2274 * We want to be notified whenever there is buffer space
2275 * available to send the rest of the payload.
2276 */
2277 ret = lttng_poll_mod(&state->events, client->socket,
2278 CLIENT_POLL_MASK_IN_OUT);
2279 if (ret) {
2280 goto error;
2281 }
2282 } else if (ret < 0) {
2283 /* Generic error, disconnect the client. */
2284 ERR("[notification-thread] Failed to send flush outgoing queue, disconnecting client (socket fd = %i)",
2285 client->socket);
2286 ret = handle_notification_thread_client_disconnect(
2287 client->socket, state);
2288 if (ret) {
2289 goto error;
2290 }
2291 } else {
2292 /* No error and flushed the queue completely. */
2293 ret = lttng_dynamic_buffer_set_size(
2294 &client->communication.outbound.buffer, 0);
2295 if (ret) {
2296 goto error;
2297 }
2298 ret = lttng_poll_mod(&state->events, client->socket,
2299 CLIENT_POLL_MASK_IN);
2300 if (ret) {
2301 goto error;
2302 }
2303
2304 client->communication.outbound.queued_command_reply = false;
2305 client->communication.outbound.dropped_notification = false;
2306 }
2307
2308 return 0;
2309 error:
2310 return -1;
2311 }
2312
2313 static
2314 int client_send_command_reply(struct notification_client *client,
2315 struct notification_thread_state *state,
2316 enum lttng_notification_channel_status status)
2317 {
2318 int ret;
2319 struct lttng_notification_channel_command_reply reply = {
2320 .status = (int8_t) status,
2321 };
2322 struct lttng_notification_channel_message msg = {
2323 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY,
2324 .size = sizeof(reply),
2325 };
2326 char buffer[sizeof(msg) + sizeof(reply)];
2327
2328 if (client->communication.outbound.queued_command_reply) {
2329 /* Protocol error. */
2330 goto error;
2331 }
2332
2333 memcpy(buffer, &msg, sizeof(msg));
2334 memcpy(buffer + sizeof(msg), &reply, sizeof(reply));
2335 DBG("[notification-thread] Send command reply (%i)", (int) status);
2336
2337 /* Enqueue buffer to outgoing queue and flush it. */
2338 ret = lttng_dynamic_buffer_append(
2339 &client->communication.outbound.buffer,
2340 buffer, sizeof(buffer));
2341 if (ret) {
2342 goto error;
2343 }
2344
2345 ret = client_flush_outgoing_queue(client, state);
2346 if (ret) {
2347 goto error;
2348 }
2349
2350 if (client->communication.outbound.buffer.size != 0) {
2351 /* Queue could not be emptied. */
2352 client->communication.outbound.queued_command_reply = true;
2353 }
2354
2355 return 0;
2356 error:
2357 return -1;
2358 }
2359
2360 static
2361 int client_dispatch_message(struct notification_client *client,
2362 struct notification_thread_state *state)
2363 {
2364 int ret = 0;
2365
2366 if (client->communication.inbound.msg_type !=
2367 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE &&
2368 client->communication.inbound.msg_type !=
2369 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN &&
2370 !client->validated) {
2371 WARN("[notification-thread] client attempted a command before handshake");
2372 ret = -1;
2373 goto end;
2374 }
2375
2376 switch (client->communication.inbound.msg_type) {
2377 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN:
2378 {
2379 /*
2380 * Receiving message header. The function will be called again
2381 * once the rest of the message as been received and can be
2382 * interpreted.
2383 */
2384 const struct lttng_notification_channel_message *msg;
2385
2386 assert(sizeof(*msg) ==
2387 client->communication.inbound.buffer.size);
2388 msg = (const struct lttng_notification_channel_message *)
2389 client->communication.inbound.buffer.data;
2390
2391 if (msg->size == 0 || msg->size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) {
2392 ERR("[notification-thread] Invalid notification channel message: length = %u", msg->size);
2393 ret = -1;
2394 goto end;
2395 }
2396
2397 switch (msg->type) {
2398 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
2399 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
2400 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
2401 break;
2402 default:
2403 ret = -1;
2404 ERR("[notification-thread] Invalid notification channel message: unexpected message type");
2405 goto end;
2406 }
2407
2408 client->communication.inbound.bytes_to_receive = msg->size;
2409 client->communication.inbound.msg_type =
2410 (enum lttng_notification_channel_message_type) msg->type;
2411 ret = lttng_dynamic_buffer_set_size(
2412 &client->communication.inbound.buffer, msg->size);
2413 if (ret) {
2414 goto end;
2415 }
2416 break;
2417 }
2418 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
2419 {
2420 struct lttng_notification_channel_command_handshake *handshake_client;
2421 struct lttng_notification_channel_command_handshake handshake_reply = {
2422 .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
2423 .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
2424 };
2425 struct lttng_notification_channel_message msg_header = {
2426 .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE,
2427 .size = sizeof(handshake_reply),
2428 };
2429 enum lttng_notification_channel_status status =
2430 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
2431 char send_buffer[sizeof(msg_header) + sizeof(handshake_reply)];
2432
2433 memcpy(send_buffer, &msg_header, sizeof(msg_header));
2434 memcpy(send_buffer + sizeof(msg_header), &handshake_reply,
2435 sizeof(handshake_reply));
2436
2437 handshake_client =
2438 (struct lttng_notification_channel_command_handshake *)
2439 client->communication.inbound.buffer.data;
2440 client->major = handshake_client->major;
2441 client->minor = handshake_client->minor;
2442 if (!client->communication.inbound.creds_received) {
2443 ERR("[notification-thread] No credentials received from client");
2444 ret = -1;
2445 goto end;
2446 }
2447
2448 client->uid = LTTNG_SOCK_GET_UID_CRED(
2449 &client->communication.inbound.creds);
2450 client->gid = LTTNG_SOCK_GET_GID_CRED(
2451 &client->communication.inbound.creds);
2452 DBG("[notification-thread] Received handshake from client (uid = %u, gid = %u) with version %i.%i",
2453 client->uid, client->gid, (int) client->major,
2454 (int) client->minor);
2455
2456 if (handshake_client->major != LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) {
2457 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNSUPPORTED_VERSION;
2458 }
2459
2460 ret = lttng_dynamic_buffer_append(&client->communication.outbound.buffer,
2461 send_buffer, sizeof(send_buffer));
2462 if (ret) {
2463 ERR("[notification-thread] Failed to send protocol version to notification channel client");
2464 goto end;
2465 }
2466
2467 ret = client_flush_outgoing_queue(client, state);
2468 if (ret) {
2469 goto end;
2470 }
2471
2472 ret = client_send_command_reply(client, state, status);
2473 if (ret) {
2474 ERR("[notification-thread] Failed to send reply to notification channel client");
2475 goto end;
2476 }
2477
2478 /* Set reception state to receive the next message header. */
2479 ret = client_reset_inbound_state(client);
2480 if (ret) {
2481 ERR("[notification-thread] Failed to reset client communication's inbound state");
2482 goto end;
2483 }
2484 client->validated = true;
2485 break;
2486 }
2487 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
2488 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
2489 {
2490 struct lttng_condition *condition;
2491 enum lttng_notification_channel_status status =
2492 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
2493 const struct lttng_buffer_view condition_view =
2494 lttng_buffer_view_from_dynamic_buffer(
2495 &client->communication.inbound.buffer,
2496 0, -1);
2497 size_t expected_condition_size =
2498 client->communication.inbound.buffer.size;
2499
2500 ret = lttng_condition_create_from_buffer(&condition_view,
2501 &condition);
2502 if (ret != expected_condition_size) {
2503 ERR("[notification-thread] Malformed condition received from client");
2504 goto end;
2505 }
2506
2507 if (client->communication.inbound.msg_type ==
2508 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE) {
2509 ret = notification_thread_client_subscribe(client,
2510 condition, state, &status);
2511 } else {
2512 ret = notification_thread_client_unsubscribe(client,
2513 condition, state, &status);
2514 }
2515 if (ret) {
2516 goto end;
2517 }
2518
2519 ret = client_send_command_reply(client, state, status);
2520 if (ret) {
2521 ERR("[notification-thread] Failed to send reply to notification channel client");
2522 goto end;
2523 }
2524
2525 /* Set reception state to receive the next message header. */
2526 ret = client_reset_inbound_state(client);
2527 if (ret) {
2528 ERR("[notification-thread] Failed to reset client communication's inbound state");
2529 goto end;
2530 }
2531 break;
2532 }
2533 default:
2534 abort();
2535 }
2536 end:
2537 return ret;
2538 }
2539
2540 /* Incoming data from client. */
2541 int handle_notification_thread_client_in(
2542 struct notification_thread_state *state, int socket)
2543 {
2544 int ret = 0;
2545 struct notification_client *client;
2546 ssize_t recv_ret;
2547 size_t offset;
2548
2549 client = get_client_from_socket(socket, state);
2550 if (!client) {
2551 /* Internal error, abort. */
2552 ret = -1;
2553 goto end;
2554 }
2555
2556 offset = client->communication.inbound.buffer.size -
2557 client->communication.inbound.bytes_to_receive;
2558 if (client->communication.inbound.expect_creds) {
2559 recv_ret = lttcomm_recv_creds_unix_sock(socket,
2560 client->communication.inbound.buffer.data + offset,
2561 client->communication.inbound.bytes_to_receive,
2562 &client->communication.inbound.creds);
2563 if (recv_ret > 0) {
2564 client->communication.inbound.expect_creds = false;
2565 client->communication.inbound.creds_received = true;
2566 }
2567 } else {
2568 recv_ret = lttcomm_recv_unix_sock_non_block(socket,
2569 client->communication.inbound.buffer.data + offset,
2570 client->communication.inbound.bytes_to_receive);
2571 }
2572 if (recv_ret < 0) {
2573 goto error_disconnect_client;
2574 }
2575
2576 client->communication.inbound.bytes_to_receive -= recv_ret;
2577 if (client->communication.inbound.bytes_to_receive == 0) {
2578 ret = client_dispatch_message(client, state);
2579 if (ret) {
2580 /*
2581 * Only returns an error if this client must be
2582 * disconnected.
2583 */
2584 goto error_disconnect_client;
2585 }
2586 } else {
2587 goto end;
2588 }
2589 end:
2590 return ret;
2591 error_disconnect_client:
2592 ret = handle_notification_thread_client_disconnect(socket, state);
2593 return ret;
2594 }
2595
2596 /* Client ready to receive outgoing data. */
2597 int handle_notification_thread_client_out(
2598 struct notification_thread_state *state, int socket)
2599 {
2600 int ret;
2601 struct notification_client *client;
2602
2603 client = get_client_from_socket(socket, state);
2604 if (!client) {
2605 /* Internal error, abort. */
2606 ret = -1;
2607 goto end;
2608 }
2609
2610 ret = client_flush_outgoing_queue(client, state);
2611 if (ret) {
2612 goto end;
2613 }
2614 end:
2615 return ret;
2616 }
2617
2618 static
2619 bool evaluate_buffer_usage_condition(const struct lttng_condition *condition,
2620 const struct channel_state_sample *sample,
2621 uint64_t buffer_capacity)
2622 {
2623 bool result = false;
2624 uint64_t threshold;
2625 enum lttng_condition_type condition_type;
2626 const struct lttng_condition_buffer_usage *use_condition = container_of(
2627 condition, struct lttng_condition_buffer_usage,
2628 parent);
2629
2630 if (use_condition->threshold_bytes.set) {
2631 threshold = use_condition->threshold_bytes.value;
2632 } else {
2633 /*
2634 * Threshold was expressed as a ratio.
2635 *
2636 * TODO the threshold (in bytes) of conditions expressed
2637 * as a ratio of total buffer size could be cached to
2638 * forego this double-multiplication or it could be performed
2639 * as fixed-point math.
2640 *
2641 * Note that caching should accomodate the case where the
2642 * condition applies to multiple channels (i.e. don't assume
2643 * that all channels matching my_chann* have the same size...)
2644 */
2645 threshold = (uint64_t) (use_condition->threshold_ratio.value *
2646 (double) buffer_capacity);
2647 }
2648
2649 condition_type = lttng_condition_get_type(condition);
2650 if (condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW) {
2651 DBG("[notification-thread] Low buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
2652 threshold, sample->highest_usage);
2653
2654 /*
2655 * The low condition should only be triggered once _all_ of the
2656 * streams in a channel have gone below the "low" threshold.
2657 */
2658 if (sample->highest_usage <= threshold) {
2659 result = true;
2660 }
2661 } else {
2662 DBG("[notification-thread] High buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
2663 threshold, sample->highest_usage);
2664
2665 /*
2666 * For high buffer usage scenarios, we want to trigger whenever
2667 * _any_ of the streams has reached the "high" threshold.
2668 */
2669 if (sample->highest_usage >= threshold) {
2670 result = true;
2671 }
2672 }
2673
2674 return result;
2675 }
2676
2677 static
2678 bool evaluate_session_consumed_size_condition(
2679 const struct lttng_condition *condition,
2680 uint64_t session_consumed_size)
2681 {
2682 uint64_t threshold;
2683 const struct lttng_condition_session_consumed_size *size_condition =
2684 container_of(condition,
2685 struct lttng_condition_session_consumed_size,
2686 parent);
2687
2688 threshold = size_condition->consumed_threshold_bytes.value;
2689 DBG("[notification-thread] Session consumed size condition being evaluated: threshold = %" PRIu64 ", current size = %" PRIu64,
2690 threshold, session_consumed_size);
2691 return session_consumed_size >= threshold;
2692 }
2693
2694 static
2695 int evaluate_condition(const struct lttng_condition *condition,
2696 struct lttng_evaluation **evaluation,
2697 const struct notification_thread_state *state,
2698 const struct channel_state_sample *previous_sample,
2699 const struct channel_state_sample *latest_sample,
2700 uint64_t previous_session_consumed_total,
2701 uint64_t latest_session_consumed_total,
2702 struct channel_info *channel_info)
2703 {
2704 int ret = 0;
2705 enum lttng_condition_type condition_type;
2706 const bool previous_sample_available = !!previous_sample;
2707 bool previous_sample_result = false;
2708 bool latest_sample_result;
2709
2710 condition_type = lttng_condition_get_type(condition);
2711
2712 switch (condition_type) {
2713 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
2714 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
2715 if (caa_likely(previous_sample_available)) {
2716 previous_sample_result =
2717 evaluate_buffer_usage_condition(condition,
2718 previous_sample, channel_info->capacity);
2719 }
2720 latest_sample_result = evaluate_buffer_usage_condition(
2721 condition, latest_sample,
2722 channel_info->capacity);
2723 break;
2724 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
2725 if (caa_likely(previous_sample_available)) {
2726 previous_sample_result =
2727 evaluate_session_consumed_size_condition(
2728 condition,
2729 previous_session_consumed_total);
2730 }
2731 latest_sample_result =
2732 evaluate_session_consumed_size_condition(
2733 condition,
2734 latest_session_consumed_total);
2735 break;
2736 default:
2737 /* Unknown condition type; internal error. */
2738 abort();
2739 }
2740
2741 if (!latest_sample_result ||
2742 (previous_sample_result == latest_sample_result)) {
2743 /*
2744 * Only trigger on a condition evaluation transition.
2745 *
2746 * NOTE: This edge-triggered logic may not be appropriate for
2747 * future condition types.
2748 */
2749 goto end;
2750 }
2751
2752 if (!evaluation || !latest_sample_result) {
2753 goto end;
2754 }
2755
2756 switch (condition_type) {
2757 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
2758 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
2759 *evaluation = lttng_evaluation_buffer_usage_create(
2760 condition_type,
2761 latest_sample->highest_usage,
2762 channel_info->capacity);
2763 break;
2764 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
2765 *evaluation = lttng_evaluation_session_consumed_size_create(
2766 latest_session_consumed_total);
2767 break;
2768 default:
2769 abort();
2770 }
2771
2772 if (!*evaluation) {
2773 ret = -1;
2774 goto end;
2775 }
2776 end:
2777 return ret;
2778 }
2779
2780 static
2781 int client_enqueue_dropped_notification(struct notification_client *client,
2782 struct notification_thread_state *state)
2783 {
2784 int ret;
2785 struct lttng_notification_channel_message msg = {
2786 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED,
2787 .size = 0,
2788 };
2789
2790 ret = lttng_dynamic_buffer_append(
2791 &client->communication.outbound.buffer, &msg,
2792 sizeof(msg));
2793 return ret;
2794 }
2795
2796 static
2797 int send_evaluation_to_clients(const struct lttng_trigger *trigger,
2798 const struct lttng_evaluation *evaluation,
2799 struct notification_client_list* client_list,
2800 struct notification_thread_state *state,
2801 uid_t channel_uid, gid_t channel_gid)
2802 {
2803 int ret = 0;
2804 struct lttng_dynamic_buffer msg_buffer;
2805 struct notification_client_list_element *client_list_element, *tmp;
2806 const struct lttng_notification notification = {
2807 .condition = (struct lttng_condition *) lttng_trigger_get_const_condition(trigger),
2808 .evaluation = (struct lttng_evaluation *) evaluation,
2809 };
2810 struct lttng_notification_channel_message msg_header = {
2811 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION,
2812 };
2813
2814 lttng_dynamic_buffer_init(&msg_buffer);
2815
2816 ret = lttng_dynamic_buffer_append(&msg_buffer, &msg_header,
2817 sizeof(msg_header));
2818 if (ret) {
2819 goto end;
2820 }
2821
2822 ret = lttng_notification_serialize(&notification, &msg_buffer);
2823 if (ret) {
2824 ERR("[notification-thread] Failed to serialize notification");
2825 ret = -1;
2826 goto end;
2827 }
2828
2829 /* Update payload size. */
2830 ((struct lttng_notification_channel_message * ) msg_buffer.data)->size =
2831 (uint32_t) (msg_buffer.size - sizeof(msg_header));
2832
2833 cds_list_for_each_entry_safe(client_list_element, tmp,
2834 &client_list->list, node) {
2835 struct notification_client *client =
2836 client_list_element->client;
2837
2838 if (client->uid != channel_uid && client->gid != channel_gid &&
2839 client->uid != 0) {
2840 /* Client is not allowed to monitor this channel. */
2841 DBG("[notification-thread] Skipping client at it does not have the permission to receive notification for this channel");
2842 continue;
2843 }
2844
2845 DBG("[notification-thread] Sending notification to client (fd = %i, %zu bytes)",
2846 client->socket, msg_buffer.size);
2847 if (client->communication.outbound.buffer.size) {
2848 /*
2849 * Outgoing data is already buffered for this client;
2850 * drop the notification and enqueue a "dropped
2851 * notification" message if this is the first dropped
2852 * notification since the socket spilled-over to the
2853 * queue.
2854 */
2855 DBG("[notification-thread] Dropping notification addressed to client (socket fd = %i)",
2856 client->socket);
2857 if (!client->communication.outbound.dropped_notification) {
2858 client->communication.outbound.dropped_notification = true;
2859 ret = client_enqueue_dropped_notification(
2860 client, state);
2861 if (ret) {
2862 goto end;
2863 }
2864 }
2865 continue;
2866 }
2867
2868 ret = lttng_dynamic_buffer_append_buffer(
2869 &client->communication.outbound.buffer,
2870 &msg_buffer);
2871 if (ret) {
2872 goto end;
2873 }
2874
2875 ret = client_flush_outgoing_queue(client, state);
2876 if (ret) {
2877 goto end;
2878 }
2879 }
2880 ret = 0;
2881 end:
2882 lttng_dynamic_buffer_reset(&msg_buffer);
2883 return ret;
2884 }
2885
2886 int handle_notification_thread_channel_sample(
2887 struct notification_thread_state *state, int pipe,
2888 enum lttng_domain_type domain)
2889 {
2890 int ret = 0;
2891 struct lttcomm_consumer_channel_monitor_msg sample_msg;
2892 struct channel_info *channel_info;
2893 struct cds_lfht_node *node;
2894 struct cds_lfht_iter iter;
2895 struct lttng_channel_trigger_list *trigger_list;
2896 struct lttng_trigger_list_element *trigger_list_element;
2897 bool previous_sample_available = false;
2898 struct channel_state_sample previous_sample, latest_sample;
2899 uint64_t previous_session_consumed_total, latest_session_consumed_total;
2900
2901 /*
2902 * The monitoring pipe only holds messages smaller than PIPE_BUF,
2903 * ensuring that read/write of sampling messages are atomic.
2904 */
2905 ret = lttng_read(pipe, &sample_msg, sizeof(sample_msg));
2906 if (ret != sizeof(sample_msg)) {
2907 ERR("[notification-thread] Failed to read from monitoring pipe (fd = %i)",
2908 pipe);
2909 ret = -1;
2910 goto end;
2911 }
2912
2913 ret = 0;
2914 latest_sample.key.key = sample_msg.key;
2915 latest_sample.key.domain = domain;
2916 latest_sample.highest_usage = sample_msg.highest;
2917 latest_sample.lowest_usage = sample_msg.lowest;
2918 latest_sample.channel_total_consumed = sample_msg.total_consumed;
2919
2920 rcu_read_lock();
2921
2922 /* Retrieve the channel's informations */
2923 cds_lfht_lookup(state->channels_ht,
2924 hash_channel_key(&latest_sample.key),
2925 match_channel_info,
2926 &latest_sample.key,
2927 &iter);
2928 node = cds_lfht_iter_get_node(&iter);
2929 if (caa_unlikely(!node)) {
2930 /*
2931 * Not an error since the consumer can push a sample to the pipe
2932 * and the rest of the session daemon could notify us of the
2933 * channel's destruction before we get a chance to process that
2934 * sample.
2935 */
2936 DBG("[notification-thread] Received a sample for an unknown channel from consumerd, key = %" PRIu64 " in %s domain",
2937 latest_sample.key.key,
2938 domain == LTTNG_DOMAIN_KERNEL ? "kernel" :
2939 "user space");
2940 goto end_unlock;
2941 }
2942 channel_info = caa_container_of(node, struct channel_info,
2943 channels_ht_node);
2944 DBG("[notification-thread] Handling channel sample for channel %s (key = %" PRIu64 ") in session %s (highest usage = %" PRIu64 ", lowest usage = %" PRIu64", total consumed = %" PRIu64")",
2945 channel_info->name,
2946 latest_sample.key.key,
2947 channel_info->session_info->name,
2948 latest_sample.highest_usage,
2949 latest_sample.lowest_usage,
2950 latest_sample.channel_total_consumed);
2951
2952 previous_session_consumed_total =
2953 channel_info->session_info->consumed_data_size;
2954
2955 /* Retrieve the channel's last sample, if it exists, and update it. */
2956 cds_lfht_lookup(state->channel_state_ht,
2957 hash_channel_key(&latest_sample.key),
2958 match_channel_state_sample,
2959 &latest_sample.key,
2960 &iter);
2961 node = cds_lfht_iter_get_node(&iter);
2962 if (caa_likely(node)) {
2963 struct channel_state_sample *stored_sample;
2964
2965 /* Update the sample stored. */
2966 stored_sample = caa_container_of(node,
2967 struct channel_state_sample,
2968 channel_state_ht_node);
2969
2970 memcpy(&previous_sample, stored_sample,
2971 sizeof(previous_sample));
2972 stored_sample->highest_usage = latest_sample.highest_usage;
2973 stored_sample->lowest_usage = latest_sample.lowest_usage;
2974 stored_sample->channel_total_consumed = latest_sample.channel_total_consumed;
2975 previous_sample_available = true;
2976
2977 latest_session_consumed_total =
2978 previous_session_consumed_total +
2979 (latest_sample.channel_total_consumed - previous_sample.channel_total_consumed);
2980 } else {
2981 /*
2982 * This is the channel's first sample, allocate space for and
2983 * store the new sample.
2984 */
2985 struct channel_state_sample *stored_sample;
2986
2987 stored_sample = zmalloc(sizeof(*stored_sample));
2988 if (!stored_sample) {
2989 ret = -1;
2990 goto end_unlock;
2991 }
2992
2993 memcpy(stored_sample, &latest_sample, sizeof(*stored_sample));
2994 cds_lfht_node_init(&stored_sample->channel_state_ht_node);
2995 cds_lfht_add(state->channel_state_ht,
2996 hash_channel_key(&stored_sample->key),
2997 &stored_sample->channel_state_ht_node);
2998
2999 latest_session_consumed_total =
3000 previous_session_consumed_total +
3001 latest_sample.channel_total_consumed;
3002 }
3003
3004 channel_info->session_info->consumed_data_size =
3005 latest_session_consumed_total;
3006
3007 /* Find triggers associated with this channel. */
3008 cds_lfht_lookup(state->channel_triggers_ht,
3009 hash_channel_key(&latest_sample.key),
3010 match_channel_trigger_list,
3011 &latest_sample.key,
3012 &iter);
3013 node = cds_lfht_iter_get_node(&iter);
3014 if (caa_likely(!node)) {
3015 goto end_unlock;
3016 }
3017
3018 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
3019 channel_triggers_ht_node);
3020 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
3021 node) {
3022 const struct lttng_condition *condition;
3023 const struct lttng_action *action;
3024 const struct lttng_trigger *trigger;
3025 struct notification_client_list *client_list;
3026 struct lttng_evaluation *evaluation = NULL;
3027
3028 trigger = trigger_list_element->trigger;
3029 condition = lttng_trigger_get_const_condition(trigger);
3030 assert(condition);
3031 action = lttng_trigger_get_const_action(trigger);
3032
3033 /* Notify actions are the only type currently supported. */
3034 assert(lttng_action_get_type_const(action) ==
3035 LTTNG_ACTION_TYPE_NOTIFY);
3036
3037 /*
3038 * Check if any client is subscribed to the result of this
3039 * evaluation.
3040 */
3041 cds_lfht_lookup(state->notification_trigger_clients_ht,
3042 lttng_condition_hash(condition),
3043 match_client_list,
3044 trigger,
3045 &iter);
3046 node = cds_lfht_iter_get_node(&iter);
3047 assert(node);
3048
3049 client_list = caa_container_of(node,
3050 struct notification_client_list,
3051 notification_trigger_ht_node);
3052 if (cds_list_empty(&client_list->list)) {
3053 /*
3054 * No clients interested in the evaluation's result,
3055 * skip it.
3056 */
3057 continue;
3058 }
3059
3060 ret = evaluate_condition(condition, &evaluation, state,
3061 previous_sample_available ? &previous_sample : NULL,
3062 &latest_sample,
3063 previous_session_consumed_total,
3064 latest_session_consumed_total,
3065 channel_info);
3066 if (caa_unlikely(ret)) {
3067 goto end_unlock;
3068 }
3069
3070 if (caa_likely(!evaluation)) {
3071 continue;
3072 }
3073
3074 /* Dispatch evaluation result to all clients. */
3075 ret = send_evaluation_to_clients(trigger_list_element->trigger,
3076 evaluation, client_list, state,
3077 channel_info->session_info->uid,
3078 channel_info->session_info->gid);
3079 lttng_evaluation_destroy(evaluation);
3080 if (caa_unlikely(ret)) {
3081 goto end_unlock;
3082 }
3083 }
3084 end_unlock:
3085 rcu_read_unlock();
3086 end:
3087 return ret;
3088 }
This page took 0.178555 seconds and 4 git commands to generate.