Fix: sessiond: erroneous user check logic in session_access_ok
[lttng-tools.git] / src / bin / lttng-sessiond / notification-thread-events.c
CommitLineData
ab0ee2ca 1/*
ab5be9fa 2 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
ab0ee2ca 3 *
ab5be9fa 4 * SPDX-License-Identifier: GPL-2.0-only
ab0ee2ca 5 *
ab0ee2ca
JG
6 */
7
8#define _LGPL_SOURCE
9#include <urcu.h>
10#include <urcu/rculfhash.h>
11
ab0ee2ca
JG
12#include <common/defaults.h>
13#include <common/error.h>
14#include <common/futex.h>
15#include <common/unix.h>
16#include <common/dynamic-buffer.h>
17#include <common/hashtable/utils.h>
18#include <common/sessiond-comm/sessiond-comm.h>
19#include <common/macros.h>
20#include <lttng/condition/condition.h>
9b63a4aa 21#include <lttng/action/action-internal.h>
ab0ee2ca
JG
22#include <lttng/notification/notification-internal.h>
23#include <lttng/condition/condition-internal.h>
24#include <lttng/condition/buffer-usage-internal.h>
e8360425 25#include <lttng/condition/session-consumed-size-internal.h>
ea9a44f0 26#include <lttng/condition/session-rotation-internal.h>
ab0ee2ca 27#include <lttng/notification/channel-internal.h>
1da26331 28
ab0ee2ca
JG
29#include <time.h>
30#include <unistd.h>
31#include <assert.h>
32#include <inttypes.h>
d53ea4e4 33#include <fcntl.h>
ab0ee2ca 34
1da26331
JG
35#include "notification-thread.h"
36#include "notification-thread-events.h"
37#include "notification-thread-commands.h"
38#include "lttng-sessiond.h"
39#include "kernel.h"
40
ab0ee2ca
JG
41#define CLIENT_POLL_MASK_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
42#define CLIENT_POLL_MASK_IN_OUT (CLIENT_POLL_MASK_IN | LPOLLOUT)
43
51eab943
JG
44enum lttng_object_type {
45 LTTNG_OBJECT_TYPE_UNKNOWN,
46 LTTNG_OBJECT_TYPE_NONE,
47 LTTNG_OBJECT_TYPE_CHANNEL,
48 LTTNG_OBJECT_TYPE_SESSION,
49};
50
ab0ee2ca 51struct lttng_trigger_list_element {
9e0bb80e 52 /* No ownership of the trigger object is assumed. */
d1ba29d2 53 struct lttng_trigger *trigger;
ab0ee2ca
JG
54 struct cds_list_head node;
55};
56
57struct lttng_channel_trigger_list {
58 struct channel_key channel_key;
ea9a44f0 59 /* List of struct lttng_trigger_list_element. */
ab0ee2ca 60 struct cds_list_head list;
ea9a44f0 61 /* Node in the channel_triggers_ht */
ab0ee2ca 62 struct cds_lfht_node channel_triggers_ht_node;
83b934ad
MD
63 /* call_rcu delayed reclaim. */
64 struct rcu_head rcu_node;
ab0ee2ca
JG
65};
66
ea9a44f0
JG
67/*
68 * List of triggers applying to a given session.
69 *
70 * See:
71 * - lttng_session_trigger_list_create()
72 * - lttng_session_trigger_list_build()
73 * - lttng_session_trigger_list_destroy()
74 * - lttng_session_trigger_list_add()
75 */
76struct lttng_session_trigger_list {
77 /*
78 * Not owned by this; points to the session_info structure's
79 * session name.
80 */
81 const char *session_name;
82 /* List of struct lttng_trigger_list_element. */
83 struct cds_list_head list;
84 /* Node in the session_triggers_ht */
85 struct cds_lfht_node session_triggers_ht_node;
86 /*
87 * Weak reference to the notification system's session triggers
88 * hashtable.
89 *
90 * The session trigger list structure structure is owned by
91 * the session's session_info.
92 *
93 * The session_info is kept alive the the channel_infos holding a
94 * reference to it (reference counting). When those channels are
95 * destroyed (at runtime or on teardown), the reference they hold
96 * to the session_info are released. On destruction of session_info,
97 * session_info_destroy() will remove the list of triggers applying
98 * to this session from the notification system's state.
99 *
100 * This implies that the session_triggers_ht must be destroyed
101 * after the channels.
102 */
103 struct cds_lfht *session_triggers_ht;
104 /* Used for delayed RCU reclaim. */
105 struct rcu_head rcu_node;
106};
107
ab0ee2ca
JG
108struct lttng_trigger_ht_element {
109 struct lttng_trigger *trigger;
110 struct cds_lfht_node node;
83b934ad
MD
111 /* call_rcu delayed reclaim. */
112 struct rcu_head rcu_node;
ab0ee2ca
JG
113};
114
115struct lttng_condition_list_element {
116 struct lttng_condition *condition;
117 struct cds_list_head node;
118};
119
ab0ee2ca
JG
120struct channel_state_sample {
121 struct channel_key key;
122 struct cds_lfht_node channel_state_ht_node;
123 uint64_t highest_usage;
124 uint64_t lowest_usage;
e8360425 125 uint64_t channel_total_consumed;
83b934ad
MD
126 /* call_rcu delayed reclaim. */
127 struct rcu_head rcu_node;
ab0ee2ca
JG
128};
129
e4db5ace 130static unsigned long hash_channel_key(struct channel_key *key);
ed327204 131static int evaluate_buffer_condition(const struct lttng_condition *condition,
e4db5ace 132 struct lttng_evaluation **evaluation,
e8360425
JD
133 const struct notification_thread_state *state,
134 const struct channel_state_sample *previous_sample,
135 const struct channel_state_sample *latest_sample,
136 uint64_t previous_session_consumed_total,
137 uint64_t latest_session_consumed_total,
138 struct channel_info *channel_info);
e4db5ace 139static
9b63a4aa
JG
140int send_evaluation_to_clients(const struct lttng_trigger *trigger,
141 const struct lttng_evaluation *evaluation,
e4db5ace
JR
142 struct notification_client_list *client_list,
143 struct notification_thread_state *state,
144 uid_t channel_uid, gid_t channel_gid);
145
8abe313a 146
ea9a44f0 147/* session_info API */
8abe313a
JG
148static
149void session_info_destroy(void *_data);
150static
151void session_info_get(struct session_info *session_info);
152static
153void session_info_put(struct session_info *session_info);
154static
155struct session_info *session_info_create(const char *name,
ea9a44f0 156 uid_t uid, gid_t gid,
1eee26c5
JG
157 struct lttng_session_trigger_list *trigger_list,
158 struct cds_lfht *sessions_ht);
8abe313a
JG
159static
160void session_info_add_channel(struct session_info *session_info,
161 struct channel_info *channel_info);
162static
163void session_info_remove_channel(struct session_info *session_info,
164 struct channel_info *channel_info);
165
ea9a44f0
JG
166/* lttng_session_trigger_list API */
167static
168struct lttng_session_trigger_list *lttng_session_trigger_list_create(
169 const char *session_name,
170 struct cds_lfht *session_triggers_ht);
171static
172struct lttng_session_trigger_list *lttng_session_trigger_list_build(
173 const struct notification_thread_state *state,
174 const char *session_name);
175static
176void lttng_session_trigger_list_destroy(
177 struct lttng_session_trigger_list *list);
178static
179int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
d1ba29d2 180 struct lttng_trigger *trigger);
ea9a44f0 181
d1ba29d2
JG
182static
183int client_handle_transmission_status(
184 struct notification_client *client,
185 enum client_transmission_status transmission_status,
186 struct notification_thread_state *state);
ea9a44f0 187
ab0ee2ca 188static
ac1889bf 189int match_client_socket(struct cds_lfht_node *node, const void *key)
ab0ee2ca
JG
190{
191 /* This double-cast is intended to supress pointer-to-cast warning. */
ac1889bf
JG
192 const int socket = (int) (intptr_t) key;
193 const struct notification_client *client = caa_container_of(node,
194 struct notification_client, client_socket_ht_node);
ab0ee2ca 195
ac1889bf
JG
196 return client->socket == socket;
197}
198
199static
200int match_client_id(struct cds_lfht_node *node, const void *key)
201{
202 /* This double-cast is intended to supress pointer-to-cast warning. */
203 const notification_client_id id = *((notification_client_id *) key);
204 const struct notification_client *client = caa_container_of(
205 node, struct notification_client, client_id_ht_node);
ab0ee2ca 206
ac1889bf 207 return client->id == id;
ab0ee2ca
JG
208}
209
210static
211int match_channel_trigger_list(struct cds_lfht_node *node, const void *key)
212{
213 struct channel_key *channel_key = (struct channel_key *) key;
214 struct lttng_channel_trigger_list *trigger_list;
215
216 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
217 channel_triggers_ht_node);
218
219 return !!((channel_key->key == trigger_list->channel_key.key) &&
220 (channel_key->domain == trigger_list->channel_key.domain));
221}
222
51eab943
JG
223static
224int match_session_trigger_list(struct cds_lfht_node *node, const void *key)
225{
226 const char *session_name = (const char *) key;
227 struct lttng_session_trigger_list *trigger_list;
228
229 trigger_list = caa_container_of(node, struct lttng_session_trigger_list,
230 session_triggers_ht_node);
231
232 return !!(strcmp(trigger_list->session_name, session_name) == 0);
233}
234
ab0ee2ca
JG
235static
236int match_channel_state_sample(struct cds_lfht_node *node, const void *key)
237{
238 struct channel_key *channel_key = (struct channel_key *) key;
239 struct channel_state_sample *sample;
240
241 sample = caa_container_of(node, struct channel_state_sample,
242 channel_state_ht_node);
243
244 return !!((channel_key->key == sample->key.key) &&
245 (channel_key->domain == sample->key.domain));
246}
247
248static
249int match_channel_info(struct cds_lfht_node *node, const void *key)
250{
251 struct channel_key *channel_key = (struct channel_key *) key;
252 struct channel_info *channel_info;
253
254 channel_info = caa_container_of(node, struct channel_info,
255 channels_ht_node);
256
257 return !!((channel_key->key == channel_info->key.key) &&
258 (channel_key->domain == channel_info->key.domain));
259}
260
261static
262int match_condition(struct cds_lfht_node *node, const void *key)
263{
264 struct lttng_condition *condition_key = (struct lttng_condition *) key;
265 struct lttng_trigger_ht_element *trigger;
266 struct lttng_condition *condition;
267
268 trigger = caa_container_of(node, struct lttng_trigger_ht_element,
269 node);
270 condition = lttng_trigger_get_condition(trigger->trigger);
271 assert(condition);
272
273 return !!lttng_condition_is_equal(condition_key, condition);
274}
275
ab0ee2ca
JG
276static
277int match_client_list_condition(struct cds_lfht_node *node, const void *key)
278{
279 struct lttng_condition *condition_key = (struct lttng_condition *) key;
280 struct notification_client_list *client_list;
f82f93a1 281 const struct lttng_condition *condition;
ab0ee2ca
JG
282
283 assert(condition_key);
284
285 client_list = caa_container_of(node, struct notification_client_list,
505b2d90 286 notification_trigger_clients_ht_node);
f82f93a1 287 condition = lttng_trigger_get_const_condition(client_list->trigger);
ab0ee2ca
JG
288
289 return !!lttng_condition_is_equal(condition_key, condition);
290}
291
f82f93a1
JG
292static
293int match_session(struct cds_lfht_node *node, const void *key)
294{
295 const char *name = key;
296 struct session_info *session_info = caa_container_of(
297 node, struct session_info, sessions_ht_node);
298
299 return !strcmp(session_info->name, name);
300}
301
ab0ee2ca
JG
302static
303unsigned long lttng_condition_buffer_usage_hash(
9b63a4aa 304 const struct lttng_condition *_condition)
ab0ee2ca 305{
9a2746aa
JG
306 unsigned long hash;
307 unsigned long condition_type;
ab0ee2ca
JG
308 struct lttng_condition_buffer_usage *condition;
309
310 condition = container_of(_condition,
311 struct lttng_condition_buffer_usage, parent);
312
9a2746aa
JG
313 condition_type = (unsigned long) condition->parent.type;
314 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
ab0ee2ca
JG
315 if (condition->session_name) {
316 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
317 }
318 if (condition->channel_name) {
8f56701f 319 hash ^= hash_key_str(condition->channel_name, lttng_ht_seed);
ab0ee2ca
JG
320 }
321 if (condition->domain.set) {
322 hash ^= hash_key_ulong(
323 (void *) condition->domain.type,
324 lttng_ht_seed);
325 }
326 if (condition->threshold_ratio.set) {
327 uint64_t val;
328
329 val = condition->threshold_ratio.value * (double) UINT32_MAX;
330 hash ^= hash_key_u64(&val, lttng_ht_seed);
6633b0dd 331 } else if (condition->threshold_bytes.set) {
ab0ee2ca
JG
332 uint64_t val;
333
334 val = condition->threshold_bytes.value;
335 hash ^= hash_key_u64(&val, lttng_ht_seed);
336 }
337 return hash;
338}
339
e8360425
JD
340static
341unsigned long lttng_condition_session_consumed_size_hash(
9b63a4aa 342 const struct lttng_condition *_condition)
e8360425 343{
9a2746aa
JG
344 unsigned long hash;
345 unsigned long condition_type =
346 (unsigned long) LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE;
e8360425
JD
347 struct lttng_condition_session_consumed_size *condition;
348 uint64_t val;
349
350 condition = container_of(_condition,
351 struct lttng_condition_session_consumed_size, parent);
352
9a2746aa 353 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
e8360425
JD
354 if (condition->session_name) {
355 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
356 }
357 val = condition->consumed_threshold_bytes.value;
358 hash ^= hash_key_u64(&val, lttng_ht_seed);
359 return hash;
360}
361
a8393880
JG
362static
363unsigned long lttng_condition_session_rotation_hash(
364 const struct lttng_condition *_condition)
365{
366 unsigned long hash, condition_type;
367 struct lttng_condition_session_rotation *condition;
368
369 condition = container_of(_condition,
370 struct lttng_condition_session_rotation, parent);
371 condition_type = (unsigned long) condition->parent.type;
372 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
373 assert(condition->session_name);
374 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
375 return hash;
376}
9a2746aa 377
ab0ee2ca
JG
378/*
379 * The lttng_condition hashing code is kept in this file (rather than
380 * condition.c) since it makes use of GPLv2 code (hashtable utils), which we
381 * don't want to link in liblttng-ctl.
382 */
383static
9b63a4aa 384unsigned long lttng_condition_hash(const struct lttng_condition *condition)
ab0ee2ca
JG
385{
386 switch (condition->type) {
387 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
388 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
389 return lttng_condition_buffer_usage_hash(condition);
e8360425
JD
390 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
391 return lttng_condition_session_consumed_size_hash(condition);
a8393880
JG
392 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
393 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
394 return lttng_condition_session_rotation_hash(condition);
ab0ee2ca
JG
395 default:
396 ERR("[notification-thread] Unexpected condition type caught");
397 abort();
398 }
399}
400
e4db5ace
JR
401static
402unsigned long hash_channel_key(struct channel_key *key)
403{
404 unsigned long key_hash = hash_key_u64(&key->key, lttng_ht_seed);
405 unsigned long domain_hash = hash_key_ulong(
406 (void *) (unsigned long) key->domain, lttng_ht_seed);
407
408 return key_hash ^ domain_hash;
409}
410
ac1889bf
JG
411static
412unsigned long hash_client_socket(int socket)
413{
414 return hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed);
415}
416
417static
418unsigned long hash_client_id(notification_client_id id)
419{
420 return hash_key_u64(&id, lttng_ht_seed);
421}
422
f82f93a1
JG
423/*
424 * Get the type of object to which a given condition applies. Bindings let
425 * the notification system evaluate a trigger's condition when a given
426 * object's state is updated.
427 *
428 * For instance, a condition bound to a channel will be evaluated everytime
429 * the channel's state is changed by a channel monitoring sample.
430 */
431static
432enum lttng_object_type get_condition_binding_object(
433 const struct lttng_condition *condition)
434{
435 switch (lttng_condition_get_type(condition)) {
436 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
437 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
438 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
e742b055 439 return LTTNG_OBJECT_TYPE_CHANNEL;
f82f93a1
JG
440 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
441 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
442 return LTTNG_OBJECT_TYPE_SESSION;
443 default:
444 return LTTNG_OBJECT_TYPE_UNKNOWN;
445 }
446}
447
83b934ad
MD
448static
449void free_channel_info_rcu(struct rcu_head *node)
450{
451 free(caa_container_of(node, struct channel_info, rcu_node));
452}
453
ab0ee2ca
JG
454static
455void channel_info_destroy(struct channel_info *channel_info)
456{
457 if (!channel_info) {
458 return;
459 }
460
8abe313a
JG
461 if (channel_info->session_info) {
462 session_info_remove_channel(channel_info->session_info,
463 channel_info);
464 session_info_put(channel_info->session_info);
ab0ee2ca 465 }
8abe313a
JG
466 if (channel_info->name) {
467 free(channel_info->name);
ab0ee2ca 468 }
83b934ad
MD
469 call_rcu(&channel_info->rcu_node, free_channel_info_rcu);
470}
471
472static
473void free_session_info_rcu(struct rcu_head *node)
474{
475 free(caa_container_of(node, struct session_info, rcu_node));
ab0ee2ca
JG
476}
477
8abe313a 478/* Don't call directly, use the ref-counting mechanism. */
ab0ee2ca 479static
8abe313a 480void session_info_destroy(void *_data)
ab0ee2ca 481{
8abe313a 482 struct session_info *session_info = _data;
3b68d0a3 483 int ret;
ab0ee2ca 484
8abe313a
JG
485 assert(session_info);
486 if (session_info->channel_infos_ht) {
3b68d0a3
JR
487 ret = cds_lfht_destroy(session_info->channel_infos_ht, NULL);
488 if (ret) {
4c3f302b 489 ERR("[notification-thread] Failed to destroy channel information hash table");
3b68d0a3 490 }
8abe313a 491 }
ea9a44f0 492 lttng_session_trigger_list_destroy(session_info->trigger_list);
1eee26c5
JG
493
494 rcu_read_lock();
495 cds_lfht_del(session_info->sessions_ht,
496 &session_info->sessions_ht_node);
497 rcu_read_unlock();
8abe313a 498 free(session_info->name);
83b934ad 499 call_rcu(&session_info->rcu_node, free_session_info_rcu);
8abe313a 500}
ab0ee2ca 501
8abe313a
JG
502static
503void session_info_get(struct session_info *session_info)
504{
505 if (!session_info) {
506 return;
507 }
508 lttng_ref_get(&session_info->ref);
509}
510
511static
512void session_info_put(struct session_info *session_info)
513{
514 if (!session_info) {
515 return;
ab0ee2ca 516 }
8abe313a
JG
517 lttng_ref_put(&session_info->ref);
518}
519
520static
ea9a44f0 521struct session_info *session_info_create(const char *name, uid_t uid, gid_t gid,
1eee26c5
JG
522 struct lttng_session_trigger_list *trigger_list,
523 struct cds_lfht *sessions_ht)
8abe313a
JG
524{
525 struct session_info *session_info;
ab0ee2ca 526
8abe313a 527 assert(name);
ab0ee2ca 528
8abe313a
JG
529 session_info = zmalloc(sizeof(*session_info));
530 if (!session_info) {
531 goto end;
532 }
533 lttng_ref_init(&session_info->ref, session_info_destroy);
534
535 session_info->channel_infos_ht = cds_lfht_new(DEFAULT_HT_SIZE,
536 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
537 if (!session_info->channel_infos_ht) {
ab0ee2ca
JG
538 goto error;
539 }
8abe313a
JG
540
541 cds_lfht_node_init(&session_info->sessions_ht_node);
542 session_info->name = strdup(name);
543 if (!session_info->name) {
ab0ee2ca
JG
544 goto error;
545 }
8abe313a
JG
546 session_info->uid = uid;
547 session_info->gid = gid;
ea9a44f0 548 session_info->trigger_list = trigger_list;
1eee26c5 549 session_info->sessions_ht = sessions_ht;
8abe313a
JG
550end:
551 return session_info;
552error:
553 session_info_put(session_info);
554 return NULL;
555}
556
557static
558void session_info_add_channel(struct session_info *session_info,
559 struct channel_info *channel_info)
560{
561 rcu_read_lock();
562 cds_lfht_add(session_info->channel_infos_ht,
563 hash_channel_key(&channel_info->key),
564 &channel_info->session_info_channels_ht_node);
565 rcu_read_unlock();
566}
567
568static
569void session_info_remove_channel(struct session_info *session_info,
570 struct channel_info *channel_info)
571{
572 rcu_read_lock();
573 cds_lfht_del(session_info->channel_infos_ht,
574 &channel_info->session_info_channels_ht_node);
575 rcu_read_unlock();
576}
577
578static
579struct channel_info *channel_info_create(const char *channel_name,
580 struct channel_key *channel_key, uint64_t channel_capacity,
581 struct session_info *session_info)
582{
583 struct channel_info *channel_info = zmalloc(sizeof(*channel_info));
584
585 if (!channel_info) {
586 goto end;
587 }
588
ab0ee2ca 589 cds_lfht_node_init(&channel_info->channels_ht_node);
8abe313a
JG
590 cds_lfht_node_init(&channel_info->session_info_channels_ht_node);
591 memcpy(&channel_info->key, channel_key, sizeof(*channel_key));
592 channel_info->capacity = channel_capacity;
593
594 channel_info->name = strdup(channel_name);
595 if (!channel_info->name) {
596 goto error;
597 }
598
599 /*
600 * Set the references between session and channel infos:
601 * - channel_info holds a strong reference to session_info
602 * - session_info holds a weak reference to channel_info
603 */
604 session_info_get(session_info);
605 session_info_add_channel(session_info, channel_info);
606 channel_info->session_info = session_info;
ab0ee2ca 607end:
8abe313a 608 return channel_info;
ab0ee2ca 609error:
8abe313a 610 channel_info_destroy(channel_info);
ab0ee2ca
JG
611 return NULL;
612}
613
d1ba29d2 614LTTNG_HIDDEN
505b2d90
JG
615bool notification_client_list_get(struct notification_client_list *list)
616{
617 return urcu_ref_get_unless_zero(&list->ref);
618}
619
620static
621void free_notification_client_list_rcu(struct rcu_head *node)
622{
623 free(caa_container_of(node, struct notification_client_list,
624 rcu_node));
625}
626
627static
628void notification_client_list_release(struct urcu_ref *list_ref)
629{
630 struct notification_client_list *list =
631 container_of(list_ref, typeof(*list), ref);
632 struct notification_client_list_element *client_list_element, *tmp;
633
634 if (list->notification_trigger_clients_ht) {
635 rcu_read_lock();
636 cds_lfht_del(list->notification_trigger_clients_ht,
637 &list->notification_trigger_clients_ht_node);
638 rcu_read_unlock();
639 list->notification_trigger_clients_ht = NULL;
640 }
641 cds_list_for_each_entry_safe(client_list_element, tmp,
642 &list->list, node) {
643 free(client_list_element);
644 }
645 pthread_mutex_destroy(&list->lock);
646 call_rcu(&list->rcu_node, free_notification_client_list_rcu);
647}
648
649static
650struct notification_client_list *notification_client_list_create(
651 const struct lttng_trigger *trigger)
652{
653 struct notification_client_list *client_list =
654 zmalloc(sizeof(*client_list));
655
656 if (!client_list) {
657 goto error;
658 }
659 pthread_mutex_init(&client_list->lock, NULL);
660 urcu_ref_init(&client_list->ref);
661 cds_lfht_node_init(&client_list->notification_trigger_clients_ht_node);
662 CDS_INIT_LIST_HEAD(&client_list->list);
663 client_list->trigger = trigger;
664error:
665 return client_list;
666}
667
668static
669void publish_notification_client_list(
670 struct notification_thread_state *state,
671 struct notification_client_list *list)
672{
673 const struct lttng_condition *condition =
674 lttng_trigger_get_const_condition(list->trigger);
675
676 assert(!list->notification_trigger_clients_ht);
677
678 list->notification_trigger_clients_ht =
679 state->notification_trigger_clients_ht;
680
681 rcu_read_lock();
682 cds_lfht_add(state->notification_trigger_clients_ht,
683 lttng_condition_hash(condition),
684 &list->notification_trigger_clients_ht_node);
685 rcu_read_unlock();
686}
687
d1ba29d2 688LTTNG_HIDDEN
505b2d90
JG
689void notification_client_list_put(struct notification_client_list *list)
690{
691 if (!list) {
692 return;
693 }
694 return urcu_ref_put(&list->ref, notification_client_list_release);
695}
696
697/* Provides a reference to the returned list. */
ed327204
JG
698static
699struct notification_client_list *get_client_list_from_condition(
700 struct notification_thread_state *state,
701 const struct lttng_condition *condition)
702{
703 struct cds_lfht_node *node;
704 struct cds_lfht_iter iter;
505b2d90 705 struct notification_client_list *list = NULL;
ed327204 706
505b2d90 707 rcu_read_lock();
ed327204
JG
708 cds_lfht_lookup(state->notification_trigger_clients_ht,
709 lttng_condition_hash(condition),
710 match_client_list_condition,
711 condition,
712 &iter);
713 node = cds_lfht_iter_get_node(&iter);
505b2d90
JG
714 if (node) {
715 list = container_of(node, struct notification_client_list,
716 notification_trigger_clients_ht_node);
717 list = notification_client_list_get(list) ? list : NULL;
718 }
ed327204 719
505b2d90
JG
720 rcu_read_unlock();
721 return list;
ed327204
JG
722}
723
e4db5ace 724static
f82f93a1
JG
725int evaluate_channel_condition_for_client(
726 const struct lttng_condition *condition,
727 struct notification_thread_state *state,
728 struct lttng_evaluation **evaluation,
729 uid_t *session_uid, gid_t *session_gid)
e4db5ace
JR
730{
731 int ret;
732 struct cds_lfht_iter iter;
733 struct cds_lfht_node *node;
734 struct channel_info *channel_info = NULL;
735 struct channel_key *channel_key = NULL;
736 struct channel_state_sample *last_sample = NULL;
737 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
e4db5ace 738
505b2d90
JG
739 rcu_read_lock();
740
f82f93a1 741 /* Find the channel associated with the condition. */
e4db5ace 742 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter,
f82f93a1 743 channel_trigger_list, channel_triggers_ht_node) {
e4db5ace
JR
744 struct lttng_trigger_list_element *element;
745
746 cds_list_for_each_entry(element, &channel_trigger_list->list, node) {
9b63a4aa 747 const struct lttng_condition *current_condition =
f82f93a1 748 lttng_trigger_get_const_condition(
e4db5ace
JR
749 element->trigger);
750
751 assert(current_condition);
752 if (!lttng_condition_is_equal(condition,
2ae99f0b 753 current_condition)) {
e4db5ace
JR
754 continue;
755 }
756
757 /* Found the trigger, save the channel key. */
758 channel_key = &channel_trigger_list->channel_key;
759 break;
760 }
761 if (channel_key) {
762 /* The channel key was found stop iteration. */
763 break;
764 }
765 }
766
767 if (!channel_key){
768 /* No channel found; normal exit. */
f82f93a1 769 DBG("[notification-thread] No known channel associated with newly subscribed-to condition");
e4db5ace
JR
770 ret = 0;
771 goto end;
772 }
773
774 /* Fetch channel info for the matching channel. */
775 cds_lfht_lookup(state->channels_ht,
776 hash_channel_key(channel_key),
777 match_channel_info,
778 channel_key,
779 &iter);
780 node = cds_lfht_iter_get_node(&iter);
781 assert(node);
782 channel_info = caa_container_of(node, struct channel_info,
783 channels_ht_node);
784
785 /* Retrieve the channel's last sample, if it exists. */
786 cds_lfht_lookup(state->channel_state_ht,
787 hash_channel_key(channel_key),
788 match_channel_state_sample,
789 channel_key,
790 &iter);
791 node = cds_lfht_iter_get_node(&iter);
792 if (node) {
793 last_sample = caa_container_of(node,
794 struct channel_state_sample,
795 channel_state_ht_node);
796 } else {
797 /* Nothing to evaluate, no sample was ever taken. Normal exit */
798 DBG("[notification-thread] No channel sample associated with newly subscribed-to condition");
799 ret = 0;
800 goto end;
801 }
802
f82f93a1 803 ret = evaluate_buffer_condition(condition, evaluation, state,
e8360425
JD
804 NULL, last_sample,
805 0, channel_info->session_info->consumed_data_size,
806 channel_info);
e4db5ace 807 if (ret) {
066a3c82 808 WARN("[notification-thread] Fatal error occurred while evaluating a newly subscribed-to condition");
e4db5ace
JR
809 goto end;
810 }
811
f82f93a1
JG
812 *session_uid = channel_info->session_info->uid;
813 *session_gid = channel_info->session_info->gid;
814end:
505b2d90 815 rcu_read_unlock();
f82f93a1
JG
816 return ret;
817}
818
819static
820const char *get_condition_session_name(const struct lttng_condition *condition)
821{
822 const char *session_name = NULL;
823 enum lttng_condition_status status;
824
825 switch (lttng_condition_get_type(condition)) {
826 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
827 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
828 status = lttng_condition_buffer_usage_get_session_name(
829 condition, &session_name);
830 break;
831 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
832 status = lttng_condition_session_consumed_size_get_session_name(
833 condition, &session_name);
834 break;
835 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
836 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
837 status = lttng_condition_session_rotation_get_session_name(
838 condition, &session_name);
839 break;
840 default:
841 abort();
842 }
843 if (status != LTTNG_CONDITION_STATUS_OK) {
844 ERR("[notification-thread] Failed to retrieve session rotation condition's session name");
845 goto end;
846 }
847end:
848 return session_name;
849}
850
f82f93a1
JG
851static
852int evaluate_session_condition_for_client(
853 const struct lttng_condition *condition,
854 struct notification_thread_state *state,
855 struct lttng_evaluation **evaluation,
856 uid_t *session_uid, gid_t *session_gid)
857{
858 int ret;
859 struct cds_lfht_iter iter;
860 struct cds_lfht_node *node;
861 const char *session_name;
862 struct session_info *session_info = NULL;
863
505b2d90 864 rcu_read_lock();
f82f93a1
JG
865 session_name = get_condition_session_name(condition);
866
867 /* Find the session associated with the trigger. */
868 cds_lfht_lookup(state->sessions_ht,
869 hash_key_str(session_name, lttng_ht_seed),
870 match_session,
871 session_name,
872 &iter);
873 node = cds_lfht_iter_get_node(&iter);
874 if (!node) {
875 DBG("[notification-thread] No known session matching name \"%s\"",
876 session_name);
877 ret = 0;
878 goto end;
879 }
880
881 session_info = caa_container_of(node, struct session_info,
882 sessions_ht_node);
883 session_info_get(session_info);
884
885 /*
886 * Evaluation is performed in-line here since only one type of
887 * session-bound condition is handled for the moment.
888 */
889 switch (lttng_condition_get_type(condition)) {
890 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
891 if (!session_info->rotation.ongoing) {
be558f88 892 ret = 0;
f82f93a1
JG
893 goto end_session_put;
894 }
895
896 *evaluation = lttng_evaluation_session_rotation_ongoing_create(
897 session_info->rotation.id);
898 if (!*evaluation) {
899 /* Fatal error. */
900 ERR("[notification-thread] Failed to create session rotation ongoing evaluation for session \"%s\"",
901 session_info->name);
902 ret = -1;
903 goto end_session_put;
904 }
905 ret = 0;
906 break;
907 default:
908 ret = 0;
909 goto end_session_put;
910 }
911
912 *session_uid = session_info->uid;
913 *session_gid = session_info->gid;
914
915end_session_put:
916 session_info_put(session_info);
917end:
505b2d90 918 rcu_read_unlock();
f82f93a1
JG
919 return ret;
920}
921
f82f93a1
JG
922static
923int evaluate_condition_for_client(const struct lttng_trigger *trigger,
924 const struct lttng_condition *condition,
925 struct notification_client *client,
926 struct notification_thread_state *state)
927{
928 int ret;
929 struct lttng_evaluation *evaluation = NULL;
505b2d90
JG
930 struct notification_client_list client_list = {
931 .lock = PTHREAD_MUTEX_INITIALIZER,
932 };
f82f93a1
JG
933 struct notification_client_list_element client_list_element = { 0 };
934 uid_t object_uid = 0;
935 gid_t object_gid = 0;
936
937 assert(trigger);
938 assert(condition);
939 assert(client);
940 assert(state);
941
942 switch (get_condition_binding_object(condition)) {
943 case LTTNG_OBJECT_TYPE_SESSION:
944 ret = evaluate_session_condition_for_client(condition, state,
945 &evaluation, &object_uid, &object_gid);
946 break;
947 case LTTNG_OBJECT_TYPE_CHANNEL:
948 ret = evaluate_channel_condition_for_client(condition, state,
949 &evaluation, &object_uid, &object_gid);
950 break;
951 case LTTNG_OBJECT_TYPE_NONE:
952 ret = 0;
953 goto end;
954 case LTTNG_OBJECT_TYPE_UNKNOWN:
955 default:
956 ret = -1;
957 goto end;
958 }
af0c318d
JG
959 if (ret) {
960 /* Fatal error. */
961 goto end;
962 }
e4db5ace
JR
963 if (!evaluation) {
964 /* Evaluation yielded nothing. Normal exit. */
965 DBG("[notification-thread] Newly subscribed-to condition evaluated to false, nothing to report to client");
966 ret = 0;
967 goto end;
968 }
969
970 /*
971 * Create a temporary client list with the client currently
972 * subscribing.
973 */
505b2d90 974 cds_lfht_node_init(&client_list.notification_trigger_clients_ht_node);
e4db5ace
JR
975 CDS_INIT_LIST_HEAD(&client_list.list);
976 client_list.trigger = trigger;
977
978 CDS_INIT_LIST_HEAD(&client_list_element.node);
979 client_list_element.client = client;
980 cds_list_add(&client_list_element.node, &client_list.list);
981
982 /* Send evaluation result to the newly-subscribed client. */
983 DBG("[notification-thread] Newly subscribed-to condition evaluated to true, notifying client");
984 ret = send_evaluation_to_clients(trigger, evaluation, &client_list,
f82f93a1 985 state, object_uid, object_gid);
e4db5ace
JR
986
987end:
988 return ret;
989}
990
ab0ee2ca
JG
991static
992int notification_thread_client_subscribe(struct notification_client *client,
993 struct lttng_condition *condition,
994 struct notification_thread_state *state,
995 enum lttng_notification_channel_status *_status)
996{
997 int ret = 0;
505b2d90 998 struct notification_client_list *client_list = NULL;
ab0ee2ca
JG
999 struct lttng_condition_list_element *condition_list_element = NULL;
1000 struct notification_client_list_element *client_list_element = NULL;
1001 enum lttng_notification_channel_status status =
1002 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
1003
1004 /*
1005 * Ensure that the client has not already subscribed to this condition
1006 * before.
1007 */
1008 cds_list_for_each_entry(condition_list_element, &client->condition_list, node) {
1009 if (lttng_condition_is_equal(condition_list_element->condition,
1010 condition)) {
1011 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ALREADY_SUBSCRIBED;
1012 goto end;
1013 }
1014 }
1015
1016 condition_list_element = zmalloc(sizeof(*condition_list_element));
1017 if (!condition_list_element) {
1018 ret = -1;
1019 goto error;
1020 }
1021 client_list_element = zmalloc(sizeof(*client_list_element));
1022 if (!client_list_element) {
1023 ret = -1;
1024 goto error;
1025 }
1026
ab0ee2ca
JG
1027 /*
1028 * Add the newly-subscribed condition to the client's subscription list.
1029 */
1030 CDS_INIT_LIST_HEAD(&condition_list_element->node);
1031 condition_list_element->condition = condition;
1032 cds_list_add(&condition_list_element->node, &client->condition_list);
1033
ed327204
JG
1034 client_list = get_client_list_from_condition(state, condition);
1035 if (!client_list) {
2ae99f0b
JG
1036 /*
1037 * No notification-emiting trigger registered with this
1038 * condition. We don't evaluate the condition right away
1039 * since this trigger is not registered yet.
1040 */
4fb43b68 1041 free(client_list_element);
505b2d90 1042 goto end;
ab0ee2ca
JG
1043 }
1044
2ae99f0b
JG
1045 /*
1046 * The condition to which the client just subscribed is evaluated
1047 * at this point so that conditions that are already TRUE result
1048 * in a notification being sent out.
505b2d90
JG
1049 *
1050 * The client_list's trigger is used without locking the list itself.
1051 * This is correct since the list doesn't own the trigger and the
1052 * object is immutable.
2ae99f0b 1053 */
e4db5ace
JR
1054 if (evaluate_condition_for_client(client_list->trigger, condition,
1055 client, state)) {
1056 WARN("[notification-thread] Evaluation of a condition on client subscription failed, aborting.");
1057 ret = -1;
4bd5b4af 1058 free(client_list_element);
505b2d90 1059 goto end;
e4db5ace
JR
1060 }
1061
1062 /*
1063 * Add the client to the list of clients interested in a given trigger
1064 * if a "notification" trigger with a corresponding condition was
1065 * added prior.
1066 */
ab0ee2ca
JG
1067 client_list_element->client = client;
1068 CDS_INIT_LIST_HEAD(&client_list_element->node);
505b2d90
JG
1069
1070 pthread_mutex_lock(&client_list->lock);
ab0ee2ca 1071 cds_list_add(&client_list_element->node, &client_list->list);
505b2d90 1072 pthread_mutex_unlock(&client_list->lock);
ab0ee2ca
JG
1073end:
1074 if (_status) {
1075 *_status = status;
1076 }
505b2d90
JG
1077 if (client_list) {
1078 notification_client_list_put(client_list);
1079 }
ab0ee2ca
JG
1080 return ret;
1081error:
1082 free(condition_list_element);
1083 free(client_list_element);
1084 return ret;
1085}
1086
1087static
1088int notification_thread_client_unsubscribe(
1089 struct notification_client *client,
1090 struct lttng_condition *condition,
1091 struct notification_thread_state *state,
1092 enum lttng_notification_channel_status *_status)
1093{
ab0ee2ca
JG
1094 struct notification_client_list *client_list;
1095 struct lttng_condition_list_element *condition_list_element,
1096 *condition_tmp;
1097 struct notification_client_list_element *client_list_element,
1098 *client_tmp;
1099 bool condition_found = false;
1100 enum lttng_notification_channel_status status =
1101 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
1102
1103 /* Remove the condition from the client's condition list. */
1104 cds_list_for_each_entry_safe(condition_list_element, condition_tmp,
1105 &client->condition_list, node) {
1106 if (!lttng_condition_is_equal(condition_list_element->condition,
1107 condition)) {
1108 continue;
1109 }
1110
1111 cds_list_del(&condition_list_element->node);
1112 /*
1113 * The caller may be iterating on the client's conditions to
1114 * tear down a client's connection. In this case, the condition
1115 * will be destroyed at the end.
1116 */
1117 if (condition != condition_list_element->condition) {
1118 lttng_condition_destroy(
1119 condition_list_element->condition);
1120 }
1121 free(condition_list_element);
1122 condition_found = true;
1123 break;
1124 }
1125
1126 if (!condition_found) {
1127 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNKNOWN_CONDITION;
1128 goto end;
1129 }
1130
1131 /*
1132 * Remove the client from the list of clients interested the trigger
1133 * matching the condition.
1134 */
ed327204
JG
1135 client_list = get_client_list_from_condition(state, condition);
1136 if (!client_list) {
505b2d90 1137 goto end;
ab0ee2ca
JG
1138 }
1139
505b2d90 1140 pthread_mutex_lock(&client_list->lock);
ab0ee2ca
JG
1141 cds_list_for_each_entry_safe(client_list_element, client_tmp,
1142 &client_list->list, node) {
505b2d90 1143 if (client_list_element->client->id != client->id) {
ab0ee2ca
JG
1144 continue;
1145 }
1146 cds_list_del(&client_list_element->node);
1147 free(client_list_element);
1148 break;
1149 }
505b2d90
JG
1150 pthread_mutex_unlock(&client_list->lock);
1151 notification_client_list_put(client_list);
1152 client_list = NULL;
ab0ee2ca
JG
1153end:
1154 lttng_condition_destroy(condition);
1155 if (_status) {
1156 *_status = status;
1157 }
1158 return 0;
1159}
1160
83b934ad
MD
1161static
1162void free_notification_client_rcu(struct rcu_head *node)
1163{
1164 free(caa_container_of(node, struct notification_client, rcu_node));
1165}
1166
ab0ee2ca
JG
1167static
1168void notification_client_destroy(struct notification_client *client,
1169 struct notification_thread_state *state)
1170{
ab0ee2ca
JG
1171 if (!client) {
1172 return;
1173 }
1174
505b2d90
JG
1175 /*
1176 * The client object is not reachable by other threads, no need to lock
1177 * the client here.
1178 */
ab0ee2ca
JG
1179 if (client->socket >= 0) {
1180 (void) lttcomm_close_unix_sock(client->socket);
ac1889bf 1181 client->socket = -1;
ab0ee2ca 1182 }
505b2d90 1183 client->communication.active = false;
ab0ee2ca
JG
1184 lttng_dynamic_buffer_reset(&client->communication.inbound.buffer);
1185 lttng_dynamic_buffer_reset(&client->communication.outbound.buffer);
505b2d90 1186 pthread_mutex_destroy(&client->lock);
83b934ad 1187 call_rcu(&client->rcu_node, free_notification_client_rcu);
ab0ee2ca
JG
1188}
1189
1190/*
1191 * Call with rcu_read_lock held (and hold for the lifetime of the returned
1192 * client pointer).
1193 */
1194static
1195struct notification_client *get_client_from_socket(int socket,
1196 struct notification_thread_state *state)
1197{
1198 struct cds_lfht_iter iter;
1199 struct cds_lfht_node *node;
1200 struct notification_client *client = NULL;
1201
1202 cds_lfht_lookup(state->client_socket_ht,
ac1889bf
JG
1203 hash_client_socket(socket),
1204 match_client_socket,
ab0ee2ca
JG
1205 (void *) (unsigned long) socket,
1206 &iter);
1207 node = cds_lfht_iter_get_node(&iter);
1208 if (!node) {
1209 goto end;
1210 }
1211
1212 client = caa_container_of(node, struct notification_client,
1213 client_socket_ht_node);
1214end:
1215 return client;
1216}
1217
d1ba29d2
JG
1218/*
1219 * Call with rcu_read_lock held (and hold for the lifetime of the returned
1220 * client pointer).
1221 */
1222static
1223struct notification_client *get_client_from_id(notification_client_id id,
1224 struct notification_thread_state *state)
1225{
1226 struct cds_lfht_iter iter;
1227 struct cds_lfht_node *node;
1228 struct notification_client *client = NULL;
1229
1230 cds_lfht_lookup(state->client_id_ht,
1231 hash_client_id(id),
1232 match_client_id,
1233 &id,
1234 &iter);
1235 node = cds_lfht_iter_get_node(&iter);
1236 if (!node) {
1237 goto end;
1238 }
1239
1240 client = caa_container_of(node, struct notification_client,
1241 client_id_ht_node);
1242end:
1243 return client;
1244}
1245
ab0ee2ca 1246static
e8360425 1247bool buffer_usage_condition_applies_to_channel(
51eab943
JG
1248 const struct lttng_condition *condition,
1249 const struct channel_info *channel_info)
ab0ee2ca
JG
1250{
1251 enum lttng_condition_status status;
e8360425
JD
1252 enum lttng_domain_type condition_domain;
1253 const char *condition_session_name = NULL;
1254 const char *condition_channel_name = NULL;
ab0ee2ca 1255
e8360425
JD
1256 status = lttng_condition_buffer_usage_get_domain_type(condition,
1257 &condition_domain);
1258 assert(status == LTTNG_CONDITION_STATUS_OK);
1259 if (channel_info->key.domain != condition_domain) {
ab0ee2ca
JG
1260 goto fail;
1261 }
1262
e8360425
JD
1263 status = lttng_condition_buffer_usage_get_session_name(
1264 condition, &condition_session_name);
1265 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
1266
1267 status = lttng_condition_buffer_usage_get_channel_name(
1268 condition, &condition_channel_name);
1269 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_channel_name);
1270
1271 if (strcmp(channel_info->session_info->name, condition_session_name)) {
1272 goto fail;
1273 }
1274 if (strcmp(channel_info->name, condition_channel_name)) {
ab0ee2ca
JG
1275 goto fail;
1276 }
1277
e8360425
JD
1278 return true;
1279fail:
1280 return false;
1281}
1282
1283static
1284bool session_consumed_size_condition_applies_to_channel(
51eab943
JG
1285 const struct lttng_condition *condition,
1286 const struct channel_info *channel_info)
e8360425
JD
1287{
1288 enum lttng_condition_status status;
1289 const char *condition_session_name = NULL;
1290
1291 status = lttng_condition_session_consumed_size_get_session_name(
1292 condition, &condition_session_name);
1293 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
1294
1295 if (strcmp(channel_info->session_info->name, condition_session_name)) {
ab0ee2ca
JG
1296 goto fail;
1297 }
1298
e8360425
JD
1299 return true;
1300fail:
1301 return false;
1302}
ab0ee2ca 1303
51eab943
JG
1304static
1305bool trigger_applies_to_channel(const struct lttng_trigger *trigger,
1306 const struct channel_info *channel_info)
1307{
1308 const struct lttng_condition *condition;
e8360425 1309 bool trigger_applies;
ab0ee2ca 1310
51eab943 1311 condition = lttng_trigger_get_const_condition(trigger);
e8360425 1312 if (!condition) {
ab0ee2ca
JG
1313 goto fail;
1314 }
e8360425
JD
1315
1316 switch (lttng_condition_get_type(condition)) {
1317 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
1318 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
1319 trigger_applies = buffer_usage_condition_applies_to_channel(
1320 condition, channel_info);
1321 break;
1322 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
1323 trigger_applies = session_consumed_size_condition_applies_to_channel(
1324 condition, channel_info);
1325 break;
1326 default:
ab0ee2ca
JG
1327 goto fail;
1328 }
1329
e8360425 1330 return trigger_applies;
ab0ee2ca
JG
1331fail:
1332 return false;
1333}
1334
1335static
1336bool trigger_applies_to_client(struct lttng_trigger *trigger,
1337 struct notification_client *client)
1338{
1339 bool applies = false;
1340 struct lttng_condition_list_element *condition_list_element;
1341
1342 cds_list_for_each_entry(condition_list_element, &client->condition_list,
1343 node) {
1344 applies = lttng_condition_is_equal(
1345 condition_list_element->condition,
1346 lttng_trigger_get_condition(trigger));
1347 if (applies) {
1348 break;
1349 }
1350 }
1351 return applies;
1352}
1353
ed327204
JG
1354/* Must be called with RCU read lock held. */
1355static
1356struct lttng_session_trigger_list *get_session_trigger_list(
1357 struct notification_thread_state *state,
1358 const char *session_name)
1359{
1360 struct lttng_session_trigger_list *list = NULL;
1361 struct cds_lfht_node *node;
1362 struct cds_lfht_iter iter;
1363
1364 cds_lfht_lookup(state->session_triggers_ht,
1365 hash_key_str(session_name, lttng_ht_seed),
1366 match_session_trigger_list,
1367 session_name,
1368 &iter);
1369 node = cds_lfht_iter_get_node(&iter);
1370 if (!node) {
1371 /*
1372 * Not an error, the list of triggers applying to that session
1373 * will be initialized when the session is created.
1374 */
1375 DBG("[notification-thread] No trigger list found for session \"%s\" as it is not yet known to the notification system",
1376 session_name);
1377 goto end;
1378 }
1379
e742b055 1380 list = caa_container_of(node,
ed327204
JG
1381 struct lttng_session_trigger_list,
1382 session_triggers_ht_node);
1383end:
1384 return list;
1385}
1386
ea9a44f0
JG
1387/*
1388 * Allocate an empty lttng_session_trigger_list for the session named
1389 * 'session_name'.
1390 *
1391 * No ownership of 'session_name' is assumed by the session trigger list.
1392 * It is the caller's responsability to ensure the session name is alive
1393 * for as long as this list is.
1394 */
1395static
1396struct lttng_session_trigger_list *lttng_session_trigger_list_create(
1397 const char *session_name,
1398 struct cds_lfht *session_triggers_ht)
1399{
1400 struct lttng_session_trigger_list *list;
1401
1402 list = zmalloc(sizeof(*list));
1403 if (!list) {
1404 goto end;
1405 }
1406 list->session_name = session_name;
1407 CDS_INIT_LIST_HEAD(&list->list);
1408 cds_lfht_node_init(&list->session_triggers_ht_node);
1409 list->session_triggers_ht = session_triggers_ht;
1410
1411 rcu_read_lock();
1412 /* Publish the list through the session_triggers_ht. */
1413 cds_lfht_add(session_triggers_ht,
1414 hash_key_str(session_name, lttng_ht_seed),
1415 &list->session_triggers_ht_node);
1416 rcu_read_unlock();
1417end:
1418 return list;
1419}
1420
1421static
1422void free_session_trigger_list_rcu(struct rcu_head *node)
1423{
1424 free(caa_container_of(node, struct lttng_session_trigger_list,
1425 rcu_node));
1426}
1427
1428static
1429void lttng_session_trigger_list_destroy(struct lttng_session_trigger_list *list)
1430{
1431 struct lttng_trigger_list_element *trigger_list_element, *tmp;
1432
1433 /* Empty the list element by element, and then free the list itself. */
1434 cds_list_for_each_entry_safe(trigger_list_element, tmp,
1435 &list->list, node) {
1436 cds_list_del(&trigger_list_element->node);
1437 free(trigger_list_element);
1438 }
1439 rcu_read_lock();
1440 /* Unpublish the list from the session_triggers_ht. */
1441 cds_lfht_del(list->session_triggers_ht,
1442 &list->session_triggers_ht_node);
1443 rcu_read_unlock();
1444 call_rcu(&list->rcu_node, free_session_trigger_list_rcu);
1445}
1446
1447static
1448int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
d1ba29d2 1449 struct lttng_trigger *trigger)
ea9a44f0
JG
1450{
1451 int ret = 0;
1452 struct lttng_trigger_list_element *new_element =
1453 zmalloc(sizeof(*new_element));
1454
1455 if (!new_element) {
1456 ret = -1;
1457 goto end;
1458 }
1459 CDS_INIT_LIST_HEAD(&new_element->node);
1460 new_element->trigger = trigger;
1461 cds_list_add(&new_element->node, &list->list);
1462end:
1463 return ret;
1464}
1465
1466static
1467bool trigger_applies_to_session(const struct lttng_trigger *trigger,
1468 const char *session_name)
1469{
1470 bool applies = false;
1471 const struct lttng_condition *condition;
1472
1473 condition = lttng_trigger_get_const_condition(trigger);
1474 switch (lttng_condition_get_type(condition)) {
1475 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
1476 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
1477 {
1478 enum lttng_condition_status condition_status;
1479 const char *condition_session_name;
1480
1481 condition_status = lttng_condition_session_rotation_get_session_name(
1482 condition, &condition_session_name);
1483 if (condition_status != LTTNG_CONDITION_STATUS_OK) {
1484 ERR("[notification-thread] Failed to retrieve session rotation condition's session name");
1485 goto end;
1486 }
1487
1488 assert(condition_session_name);
1489 applies = !strcmp(condition_session_name, session_name);
1490 break;
1491 }
1492 default:
1493 goto end;
1494 }
1495end:
1496 return applies;
1497}
1498
1499/*
1500 * Allocate and initialize an lttng_session_trigger_list which contains
1501 * all triggers that apply to the session named 'session_name'.
1502 *
1503 * No ownership of 'session_name' is assumed by the session trigger list.
1504 * It is the caller's responsability to ensure the session name is alive
1505 * for as long as this list is.
1506 */
1507static
1508struct lttng_session_trigger_list *lttng_session_trigger_list_build(
1509 const struct notification_thread_state *state,
1510 const char *session_name)
1511{
1512 int trigger_count = 0;
1513 struct lttng_session_trigger_list *session_trigger_list = NULL;
1514 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1515 struct cds_lfht_iter iter;
1516
1517 session_trigger_list = lttng_session_trigger_list_create(session_name,
1518 state->session_triggers_ht);
1519
1520 /* Add all triggers applying to the session named 'session_name'. */
1521 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1522 node) {
1523 int ret;
1524
1525 if (!trigger_applies_to_session(trigger_ht_element->trigger,
1526 session_name)) {
1527 continue;
1528 }
1529
1530 ret = lttng_session_trigger_list_add(session_trigger_list,
1531 trigger_ht_element->trigger);
1532 if (ret) {
1533 goto error;
1534 }
1535
1536 trigger_count++;
1537 }
1538
1539 DBG("[notification-thread] Found %i triggers that apply to newly created session",
1540 trigger_count);
1541 return session_trigger_list;
1542error:
1543 lttng_session_trigger_list_destroy(session_trigger_list);
1544 return NULL;
1545}
1546
8abe313a
JG
1547static
1548struct session_info *find_or_create_session_info(
1549 struct notification_thread_state *state,
1550 const char *name, uid_t uid, gid_t gid)
1551{
1552 struct session_info *session = NULL;
1553 struct cds_lfht_node *node;
1554 struct cds_lfht_iter iter;
ea9a44f0 1555 struct lttng_session_trigger_list *trigger_list;
8abe313a
JG
1556
1557 rcu_read_lock();
1558 cds_lfht_lookup(state->sessions_ht,
1559 hash_key_str(name, lttng_ht_seed),
1560 match_session,
1561 name,
1562 &iter);
1563 node = cds_lfht_iter_get_node(&iter);
1564 if (node) {
1565 DBG("[notification-thread] Found session info of session \"%s\" (uid = %i, gid = %i)",
1566 name, uid, gid);
1567 session = caa_container_of(node, struct session_info,
1568 sessions_ht_node);
1569 assert(session->uid == uid);
1570 assert(session->gid == gid);
1eee26c5
JG
1571 session_info_get(session);
1572 goto end;
ea9a44f0
JG
1573 }
1574
1575 trigger_list = lttng_session_trigger_list_build(state, name);
1576 if (!trigger_list) {
1577 goto error;
8abe313a
JG
1578 }
1579
1eee26c5
JG
1580 session = session_info_create(name, uid, gid, trigger_list,
1581 state->sessions_ht);
8abe313a
JG
1582 if (!session) {
1583 ERR("[notification-thread] Failed to allocation session info for session \"%s\" (uid = %i, gid = %i)",
1584 name, uid, gid);
b248644d 1585 lttng_session_trigger_list_destroy(trigger_list);
ea9a44f0 1586 goto error;
8abe313a 1587 }
ea9a44f0 1588 trigger_list = NULL;
577983cb
JG
1589
1590 cds_lfht_add(state->sessions_ht, hash_key_str(name, lttng_ht_seed),
9b63a4aa 1591 &session->sessions_ht_node);
1eee26c5 1592end:
8abe313a
JG
1593 rcu_read_unlock();
1594 return session;
ea9a44f0
JG
1595error:
1596 rcu_read_unlock();
1597 session_info_put(session);
1598 return NULL;
8abe313a
JG
1599}
1600
ab0ee2ca
JG
1601static
1602int handle_notification_thread_command_add_channel(
8abe313a
JG
1603 struct notification_thread_state *state,
1604 const char *session_name, uid_t session_uid, gid_t session_gid,
1605 const char *channel_name, enum lttng_domain_type channel_domain,
1606 uint64_t channel_key_int, uint64_t channel_capacity,
1607 enum lttng_error_code *cmd_result)
ab0ee2ca
JG
1608{
1609 struct cds_list_head trigger_list;
8abe313a
JG
1610 struct channel_info *new_channel_info = NULL;
1611 struct channel_key channel_key = {
1612 .key = channel_key_int,
1613 .domain = channel_domain,
1614 };
ab0ee2ca
JG
1615 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
1616 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1617 int trigger_count = 0;
1618 struct cds_lfht_iter iter;
8abe313a 1619 struct session_info *session_info = NULL;
ab0ee2ca
JG
1620
1621 DBG("[notification-thread] Adding channel %s from session %s, channel key = %" PRIu64 " in %s domain",
8abe313a
JG
1622 channel_name, session_name, channel_key_int,
1623 channel_domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
ab0ee2ca
JG
1624
1625 CDS_INIT_LIST_HEAD(&trigger_list);
1626
8abe313a
JG
1627 session_info = find_or_create_session_info(state, session_name,
1628 session_uid, session_gid);
1629 if (!session_info) {
a9577b76 1630 /* Allocation error or an internal error occurred. */
ab0ee2ca
JG
1631 goto error;
1632 }
1633
8abe313a
JG
1634 new_channel_info = channel_info_create(channel_name, &channel_key,
1635 channel_capacity, session_info);
1636 if (!new_channel_info) {
1637 goto error;
1638 }
ab0ee2ca 1639
83b934ad 1640 rcu_read_lock();
ab0ee2ca
JG
1641 /* Build a list of all triggers applying to the new channel. */
1642 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1643 node) {
1644 struct lttng_trigger_list_element *new_element;
1645
1646 if (!trigger_applies_to_channel(trigger_ht_element->trigger,
8abe313a 1647 new_channel_info)) {
ab0ee2ca
JG
1648 continue;
1649 }
1650
1651 new_element = zmalloc(sizeof(*new_element));
1652 if (!new_element) {
83b934ad 1653 rcu_read_unlock();
ab0ee2ca
JG
1654 goto error;
1655 }
1656 CDS_INIT_LIST_HEAD(&new_element->node);
1657 new_element->trigger = trigger_ht_element->trigger;
1658 cds_list_add(&new_element->node, &trigger_list);
1659 trigger_count++;
1660 }
83b934ad 1661 rcu_read_unlock();
ab0ee2ca
JG
1662
1663 DBG("[notification-thread] Found %i triggers that apply to newly added channel",
1664 trigger_count);
1665 channel_trigger_list = zmalloc(sizeof(*channel_trigger_list));
1666 if (!channel_trigger_list) {
1667 goto error;
1668 }
8abe313a 1669 channel_trigger_list->channel_key = new_channel_info->key;
ab0ee2ca
JG
1670 CDS_INIT_LIST_HEAD(&channel_trigger_list->list);
1671 cds_lfht_node_init(&channel_trigger_list->channel_triggers_ht_node);
1672 cds_list_splice(&trigger_list, &channel_trigger_list->list);
1673
1674 rcu_read_lock();
1675 /* Add channel to the channel_ht which owns the channel_infos. */
1676 cds_lfht_add(state->channels_ht,
8abe313a 1677 hash_channel_key(&new_channel_info->key),
ab0ee2ca
JG
1678 &new_channel_info->channels_ht_node);
1679 /*
1680 * Add the list of triggers associated with this channel to the
1681 * channel_triggers_ht.
1682 */
1683 cds_lfht_add(state->channel_triggers_ht,
8abe313a 1684 hash_channel_key(&new_channel_info->key),
ab0ee2ca
JG
1685 &channel_trigger_list->channel_triggers_ht_node);
1686 rcu_read_unlock();
1eee26c5 1687 session_info_put(session_info);
ab0ee2ca
JG
1688 *cmd_result = LTTNG_OK;
1689 return 0;
1690error:
ab0ee2ca 1691 channel_info_destroy(new_channel_info);
8abe313a 1692 session_info_put(session_info);
ab0ee2ca
JG
1693 return 1;
1694}
1695
83b934ad
MD
1696static
1697void free_channel_trigger_list_rcu(struct rcu_head *node)
1698{
1699 free(caa_container_of(node, struct lttng_channel_trigger_list,
1700 rcu_node));
1701}
1702
1703static
1704void free_channel_state_sample_rcu(struct rcu_head *node)
1705{
1706 free(caa_container_of(node, struct channel_state_sample,
1707 rcu_node));
1708}
1709
ab0ee2ca
JG
1710static
1711int handle_notification_thread_command_remove_channel(
1712 struct notification_thread_state *state,
1713 uint64_t channel_key, enum lttng_domain_type domain,
1714 enum lttng_error_code *cmd_result)
1715{
1716 struct cds_lfht_node *node;
1717 struct cds_lfht_iter iter;
1718 struct lttng_channel_trigger_list *trigger_list;
1719 struct lttng_trigger_list_element *trigger_list_element, *tmp;
1720 struct channel_key key = { .key = channel_key, .domain = domain };
1721 struct channel_info *channel_info;
1722
1723 DBG("[notification-thread] Removing channel key = %" PRIu64 " in %s domain",
1724 channel_key, domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
1725
1726 rcu_read_lock();
1727
1728 cds_lfht_lookup(state->channel_triggers_ht,
1729 hash_channel_key(&key),
1730 match_channel_trigger_list,
1731 &key,
1732 &iter);
1733 node = cds_lfht_iter_get_node(&iter);
1734 /*
1735 * There is a severe internal error if we are being asked to remove a
1736 * channel that doesn't exist.
1737 */
1738 if (!node) {
1739 ERR("[notification-thread] Channel being removed is unknown to the notification thread");
1740 goto end;
1741 }
1742
1743 /* Free the list of triggers associated with this channel. */
1744 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
1745 channel_triggers_ht_node);
1746 cds_list_for_each_entry_safe(trigger_list_element, tmp,
1747 &trigger_list->list, node) {
1748 cds_list_del(&trigger_list_element->node);
1749 free(trigger_list_element);
1750 }
1751 cds_lfht_del(state->channel_triggers_ht, node);
83b934ad 1752 call_rcu(&trigger_list->rcu_node, free_channel_trigger_list_rcu);
ab0ee2ca
JG
1753
1754 /* Free sampled channel state. */
1755 cds_lfht_lookup(state->channel_state_ht,
1756 hash_channel_key(&key),
1757 match_channel_state_sample,
1758 &key,
1759 &iter);
1760 node = cds_lfht_iter_get_node(&iter);
1761 /*
1762 * This is expected to be NULL if the channel is destroyed before we
1763 * received a sample.
1764 */
1765 if (node) {
1766 struct channel_state_sample *sample = caa_container_of(node,
1767 struct channel_state_sample,
1768 channel_state_ht_node);
1769
1770 cds_lfht_del(state->channel_state_ht, node);
83b934ad 1771 call_rcu(&sample->rcu_node, free_channel_state_sample_rcu);
ab0ee2ca
JG
1772 }
1773
1774 /* Remove the channel from the channels_ht and free it. */
1775 cds_lfht_lookup(state->channels_ht,
1776 hash_channel_key(&key),
1777 match_channel_info,
1778 &key,
1779 &iter);
1780 node = cds_lfht_iter_get_node(&iter);
1781 assert(node);
1782 channel_info = caa_container_of(node, struct channel_info,
1783 channels_ht_node);
1784 cds_lfht_del(state->channels_ht, node);
1785 channel_info_destroy(channel_info);
1786end:
1787 rcu_read_unlock();
1788 *cmd_result = LTTNG_OK;
1789 return 0;
1790}
1791
0ca52944 1792static
ed327204 1793int handle_notification_thread_command_session_rotation(
0ca52944 1794 struct notification_thread_state *state,
ed327204
JG
1795 enum notification_thread_command_type cmd_type,
1796 const char *session_name, uid_t session_uid, gid_t session_gid,
1797 uint64_t trace_archive_chunk_id,
1798 struct lttng_trace_archive_location *location,
1799 enum lttng_error_code *_cmd_result)
0ca52944 1800{
ed327204
JG
1801 int ret = 0;
1802 enum lttng_error_code cmd_result = LTTNG_OK;
1803 struct lttng_session_trigger_list *trigger_list;
1804 struct lttng_trigger_list_element *trigger_list_element;
1805 struct session_info *session_info;
d1ba29d2
JG
1806 const struct lttng_credentials session_creds = {
1807 .uid = session_uid,
1808 .gid = session_gid,
1809 };
0ca52944 1810
ed327204
JG
1811 rcu_read_lock();
1812
1813 session_info = find_or_create_session_info(state, session_name,
1814 session_uid, session_gid);
1815 if (!session_info) {
a9577b76 1816 /* Allocation error or an internal error occurred. */
ed327204
JG
1817 ret = -1;
1818 cmd_result = LTTNG_ERR_NOMEM;
1819 goto end;
1820 }
1821
1822 session_info->rotation.ongoing =
1823 cmd_type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING;
1824 session_info->rotation.id = trace_archive_chunk_id;
1825 trigger_list = get_session_trigger_list(state, session_name);
1826 if (!trigger_list) {
1827 DBG("[notification-thread] No triggers applying to session \"%s\" found",
1828 session_name);
1829 goto end;
1830 }
1831
1832 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
1833 node) {
1834 const struct lttng_condition *condition;
1835 const struct lttng_action *action;
d1ba29d2 1836 struct lttng_trigger *trigger;
ed327204
JG
1837 struct notification_client_list *client_list;
1838 struct lttng_evaluation *evaluation = NULL;
1839 enum lttng_condition_type condition_type;
505b2d90 1840 bool client_list_is_empty;
d1ba29d2 1841 enum action_executor_status executor_status;
ed327204
JG
1842
1843 trigger = trigger_list_element->trigger;
1844 condition = lttng_trigger_get_const_condition(trigger);
1845 assert(condition);
1846 condition_type = lttng_condition_get_type(condition);
1847
1848 if (condition_type == LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING &&
1849 cmd_type != NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING) {
1850 continue;
1851 } else if (condition_type == LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED &&
1852 cmd_type != NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED) {
1853 continue;
1854 }
1855
1856 action = lttng_trigger_get_const_action(trigger);
1857
1858 /* Notify actions are the only type currently supported. */
1859 assert(lttng_action_get_type_const(action) ==
1860 LTTNG_ACTION_TYPE_NOTIFY);
1861
1862 client_list = get_client_list_from_condition(state, condition);
1863 assert(client_list);
1864
505b2d90
JG
1865 pthread_mutex_lock(&client_list->lock);
1866 client_list_is_empty = cds_list_empty(&client_list->list);
1867 pthread_mutex_unlock(&client_list->lock);
1868 if (client_list_is_empty) {
ed327204
JG
1869 /*
1870 * No clients interested in the evaluation's result,
1871 * skip it.
1872 */
1873 continue;
1874 }
1875
1876 if (cmd_type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING) {
1877 evaluation = lttng_evaluation_session_rotation_ongoing_create(
1878 trace_archive_chunk_id);
1879 } else {
1880 evaluation = lttng_evaluation_session_rotation_completed_create(
1881 trace_archive_chunk_id, location);
1882 }
1883
1884 if (!evaluation) {
1885 /* Internal error */
1886 ret = -1;
1887 cmd_result = LTTNG_ERR_UNK;
505b2d90 1888 goto put_list;
ed327204
JG
1889 }
1890
d1ba29d2
JG
1891 /*
1892 * Ownership of `evaluation` transferred to the action executor
1893 * no matter the result.
1894 */
1895 executor_status = action_executor_enqueue(state->executor,
1896 trigger, evaluation, &session_creds,
1897 client_list);
1898 evaluation = NULL;
1899 switch (executor_status) {
1900 case ACTION_EXECUTOR_STATUS_OK:
1901 break;
1902 case ACTION_EXECUTOR_STATUS_ERROR:
1903 case ACTION_EXECUTOR_STATUS_INVALID:
1904 /*
1905 * TODO Add trigger identification (name/id) when
1906 * it is added to the API.
1907 */
1908 ERR("Fatal error occurred while enqueuing action associated with session rotation trigger");
1909 ret = -1;
1910 goto put_list;
1911 case ACTION_EXECUTOR_STATUS_OVERFLOW:
1912 /*
1913 * TODO Add trigger identification (name/id) when
1914 * it is added to the API.
1915 *
1916 * Not a fatal error.
1917 */
1918 WARN("No space left when enqueuing action associated with session rotation trigger");
1919 ret = 0;
1920 goto put_list;
1921 default:
1922 abort();
1923 }
1924
505b2d90
JG
1925put_list:
1926 notification_client_list_put(client_list);
ed327204 1927 if (caa_unlikely(ret)) {
505b2d90 1928 break;
ed327204
JG
1929 }
1930 }
1931end:
1932 session_info_put(session_info);
1933 *_cmd_result = cmd_result;
1934 rcu_read_unlock();
1935 return ret;
0ca52944
JG
1936}
1937
1da26331
JG
1938static
1939int condition_is_supported(struct lttng_condition *condition)
1940{
1941 int ret;
1942
1943 switch (lttng_condition_get_type(condition)) {
1944 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
1945 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
1946 {
1947 enum lttng_domain_type domain;
1948
1949 ret = lttng_condition_buffer_usage_get_domain_type(condition,
1950 &domain);
1951 if (ret) {
1952 ret = -1;
1953 goto end;
1954 }
1955
1956 if (domain != LTTNG_DOMAIN_KERNEL) {
1957 ret = 1;
1958 goto end;
1959 }
1960
1961 /*
1962 * Older kernel tracers don't expose the API to monitor their
1963 * buffers. Therefore, we reject triggers that require that
1964 * mechanism to be available to be evaluated.
1965 */
7d268848 1966 ret = kernel_supports_ring_buffer_snapshot_sample_positions();
1da26331
JG
1967 break;
1968 }
1969 default:
1970 ret = 1;
1971 }
1972end:
1973 return ret;
1974}
1975
51eab943
JG
1976/* Must be called with RCU read lock held. */
1977static
d1ba29d2 1978int bind_trigger_to_matching_session(struct lttng_trigger *trigger,
51eab943
JG
1979 struct notification_thread_state *state)
1980{
1981 int ret = 0;
51eab943
JG
1982 const struct lttng_condition *condition;
1983 const char *session_name;
1984 struct lttng_session_trigger_list *trigger_list;
1985
1986 condition = lttng_trigger_get_const_condition(trigger);
1987 switch (lttng_condition_get_type(condition)) {
1988 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
1989 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
1990 {
1991 enum lttng_condition_status status;
1992
1993 status = lttng_condition_session_rotation_get_session_name(
1994 condition, &session_name);
1995 if (status != LTTNG_CONDITION_STATUS_OK) {
1996 ERR("[notification-thread] Failed to bind trigger to session: unable to get 'session_rotation' condition's session name");
1997 ret = -1;
1998 goto end;
1999 }
2000 break;
2001 }
2002 default:
2003 ret = -1;
2004 goto end;
2005 }
2006
ed327204
JG
2007 trigger_list = get_session_trigger_list(state, session_name);
2008 if (!trigger_list) {
51eab943
JG
2009 DBG("[notification-thread] Unable to bind trigger applying to session \"%s\" as it is not yet known to the notification system",
2010 session_name);
2011 goto end;
51eab943 2012
ed327204 2013 }
51eab943
JG
2014
2015 DBG("[notification-thread] Newly registered trigger bound to session \"%s\"",
2016 session_name);
2017 ret = lttng_session_trigger_list_add(trigger_list, trigger);
2018end:
2019 return ret;
2020}
2021
2022/* Must be called with RCU read lock held. */
2023static
d1ba29d2 2024int bind_trigger_to_matching_channels(struct lttng_trigger *trigger,
51eab943
JG
2025 struct notification_thread_state *state)
2026{
2027 int ret = 0;
2028 struct cds_lfht_node *node;
2029 struct cds_lfht_iter iter;
2030 struct channel_info *channel;
2031
2032 cds_lfht_for_each_entry(state->channels_ht, &iter, channel,
2033 channels_ht_node) {
2034 struct lttng_trigger_list_element *trigger_list_element;
2035 struct lttng_channel_trigger_list *trigger_list;
a5d64ae7 2036 struct cds_lfht_iter lookup_iter;
51eab943
JG
2037
2038 if (!trigger_applies_to_channel(trigger, channel)) {
2039 continue;
2040 }
2041
2042 cds_lfht_lookup(state->channel_triggers_ht,
2043 hash_channel_key(&channel->key),
2044 match_channel_trigger_list,
2045 &channel->key,
a5d64ae7
MD
2046 &lookup_iter);
2047 node = cds_lfht_iter_get_node(&lookup_iter);
51eab943
JG
2048 assert(node);
2049 trigger_list = caa_container_of(node,
2050 struct lttng_channel_trigger_list,
2051 channel_triggers_ht_node);
2052
2053 trigger_list_element = zmalloc(sizeof(*trigger_list_element));
2054 if (!trigger_list_element) {
2055 ret = -1;
2056 goto end;
2057 }
2058 CDS_INIT_LIST_HEAD(&trigger_list_element->node);
2059 trigger_list_element->trigger = trigger;
2060 cds_list_add(&trigger_list_element->node, &trigger_list->list);
2061 DBG("[notification-thread] Newly registered trigger bound to channel \"%s\"",
2062 channel->name);
2063 }
2064end:
2065 return ret;
2066}
2067
ab0ee2ca
JG
2068/*
2069 * FIXME A client's credentials are not checked when registering a trigger, nor
2070 * are they stored alongside with the trigger.
2071 *
1da26331 2072 * The effects of this are benign since:
ab0ee2ca 2073 * - The client will succeed in registering the trigger, as it is valid,
51eab943 2074 * - The trigger will, internally, be bound to the channel/session,
ab0ee2ca
JG
2075 * - The notifications will not be sent since the client's credentials
2076 * are checked against the channel at that moment.
1da26331
JG
2077 *
2078 * If this function returns a non-zero value, it means something is
50ca7858 2079 * fundamentally broken and the whole subsystem/thread will be torn down.
1da26331
JG
2080 *
2081 * If a non-fatal error occurs, just set the cmd_result to the appropriate
2082 * error code.
ab0ee2ca
JG
2083 */
2084static
2085int handle_notification_thread_command_register_trigger(
8abe313a
JG
2086 struct notification_thread_state *state,
2087 struct lttng_trigger *trigger,
2088 enum lttng_error_code *cmd_result)
ab0ee2ca
JG
2089{
2090 int ret = 0;
2091 struct lttng_condition *condition;
2092 struct notification_client *client;
2093 struct notification_client_list *client_list = NULL;
2094 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
2095 struct notification_client_list_element *client_list_element, *tmp;
2096 struct cds_lfht_node *node;
2097 struct cds_lfht_iter iter;
ab0ee2ca
JG
2098 bool free_trigger = true;
2099
2100 rcu_read_lock();
2101
2102 condition = lttng_trigger_get_condition(trigger);
1da26331
JG
2103 assert(condition);
2104
2105 ret = condition_is_supported(condition);
2106 if (ret < 0) {
2107 goto error;
2108 } else if (ret == 0) {
2109 *cmd_result = LTTNG_ERR_NOT_SUPPORTED;
2110 goto error;
2111 } else {
2112 /* Feature is supported, continue. */
2113 ret = 0;
2114 }
2115
ab0ee2ca
JG
2116 trigger_ht_element = zmalloc(sizeof(*trigger_ht_element));
2117 if (!trigger_ht_element) {
2118 ret = -1;
2119 goto error;
2120 }
2121
2122 /* Add trigger to the trigger_ht. */
2123 cds_lfht_node_init(&trigger_ht_element->node);
2124 trigger_ht_element->trigger = trigger;
2125
2126 node = cds_lfht_add_unique(state->triggers_ht,
2127 lttng_condition_hash(condition),
2128 match_condition,
2129 condition,
2130 &trigger_ht_element->node);
2131 if (node != &trigger_ht_element->node) {
2132 /* Not a fatal error, simply report it to the client. */
2133 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
2134 goto error_free_ht_element;
2135 }
2136
2137 /*
2138 * Ownership of the trigger and of its wrapper was transfered to
2139 * the triggers_ht.
2140 */
2141 trigger_ht_element = NULL;
2142 free_trigger = false;
2143
2144 /*
2145 * The rest only applies to triggers that have a "notify" action.
2146 * It is not skipped as this is the only action type currently
2147 * supported.
2148 */
505b2d90 2149 client_list = notification_client_list_create(trigger);
ab0ee2ca
JG
2150 if (!client_list) {
2151 ret = -1;
2152 goto error_free_ht_element;
2153 }
ab0ee2ca
JG
2154
2155 /* Build a list of clients to which this new trigger applies. */
2156 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
2157 client_socket_ht_node) {
2158 if (!trigger_applies_to_client(trigger, client)) {
2159 continue;
2160 }
2161
2162 client_list_element = zmalloc(sizeof(*client_list_element));
2163 if (!client_list_element) {
2164 ret = -1;
505b2d90 2165 goto error_put_client_list;
ab0ee2ca
JG
2166 }
2167 CDS_INIT_LIST_HEAD(&client_list_element->node);
2168 client_list_element->client = client;
2169 cds_list_add(&client_list_element->node, &client_list->list);
2170 }
2171
f82f93a1 2172 switch (get_condition_binding_object(condition)) {
51eab943
JG
2173 case LTTNG_OBJECT_TYPE_SESSION:
2174 /* Add the trigger to the list if it matches a known session. */
2175 ret = bind_trigger_to_matching_session(trigger, state);
2176 if (ret) {
505b2d90 2177 goto error_put_client_list;
ab0ee2ca 2178 }
f82f93a1 2179 break;
51eab943
JG
2180 case LTTNG_OBJECT_TYPE_CHANNEL:
2181 /*
2182 * Add the trigger to list of triggers bound to the channels
2183 * currently known.
2184 */
2185 ret = bind_trigger_to_matching_channels(trigger, state);
2186 if (ret) {
505b2d90 2187 goto error_put_client_list;
ab0ee2ca 2188 }
51eab943
JG
2189 break;
2190 case LTTNG_OBJECT_TYPE_NONE:
2191 break;
2192 default:
2193 ERR("[notification-thread] Unknown object type on which to bind a newly registered trigger was encountered");
2194 ret = -1;
505b2d90 2195 goto error_put_client_list;
ab0ee2ca
JG
2196 }
2197
2ae99f0b
JG
2198 /*
2199 * Since there is nothing preventing clients from subscribing to a
2200 * condition before the corresponding trigger is registered, we have
2201 * to evaluate this new condition right away.
2202 *
2203 * At some point, we were waiting for the next "evaluation" (e.g. on
2204 * reception of a channel sample) to evaluate this new condition, but
2205 * that was broken.
2206 *
2207 * The reason it was broken is that waiting for the next sample
2208 * does not allow us to properly handle transitions for edge-triggered
2209 * conditions.
2210 *
2211 * Consider this example: when we handle a new channel sample, we
2212 * evaluate each conditions twice: once with the previous state, and
2213 * again with the newest state. We then use those two results to
2214 * determine whether a state change happened: a condition was false and
2215 * became true. If a state change happened, we have to notify clients.
2216 *
2217 * Now, if a client subscribes to a given notification and registers
2218 * a trigger *after* that subscription, we have to make sure the
2219 * condition is evaluated at this point while considering only the
2220 * current state. Otherwise, the next evaluation cycle may only see
2221 * that the evaluations remain the same (true for samples n-1 and n) and
2222 * the client will never know that the condition has been met.
505b2d90
JG
2223 *
2224 * No need to lock the list here as it has not been published yet.
2ae99f0b
JG
2225 */
2226 cds_list_for_each_entry_safe(client_list_element, tmp,
2227 &client_list->list, node) {
2228 ret = evaluate_condition_for_client(trigger, condition,
2229 client_list_element->client, state);
2230 if (ret) {
505b2d90 2231 goto error_put_client_list;
2ae99f0b
JG
2232 }
2233 }
2234
2235 /*
2236 * Client list ownership transferred to the
2237 * notification_trigger_clients_ht.
2238 */
505b2d90 2239 publish_notification_client_list(state, client_list);
2ae99f0b
JG
2240 client_list = NULL;
2241
ab0ee2ca 2242 *cmd_result = LTTNG_OK;
505b2d90
JG
2243
2244error_put_client_list:
2245 notification_client_list_put(client_list);
2246
ab0ee2ca
JG
2247error_free_ht_element:
2248 free(trigger_ht_element);
2249error:
2250 if (free_trigger) {
ab0ee2ca
JG
2251 lttng_trigger_destroy(trigger);
2252 }
2253 rcu_read_unlock();
2254 return ret;
2255}
2256
83b934ad
MD
2257static
2258void free_lttng_trigger_ht_element_rcu(struct rcu_head *node)
2259{
2260 free(caa_container_of(node, struct lttng_trigger_ht_element,
2261 rcu_node));
2262}
2263
cc2295b5 2264static
ab0ee2ca
JG
2265int handle_notification_thread_command_unregister_trigger(
2266 struct notification_thread_state *state,
2267 struct lttng_trigger *trigger,
2268 enum lttng_error_code *_cmd_reply)
2269{
2270 struct cds_lfht_iter iter;
ed327204 2271 struct cds_lfht_node *triggers_ht_node;
ab0ee2ca
JG
2272 struct lttng_channel_trigger_list *trigger_list;
2273 struct notification_client_list *client_list;
ab0ee2ca
JG
2274 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
2275 struct lttng_condition *condition = lttng_trigger_get_condition(
2276 trigger);
ab0ee2ca
JG
2277 enum lttng_error_code cmd_reply;
2278
2279 rcu_read_lock();
2280
2281 cds_lfht_lookup(state->triggers_ht,
2282 lttng_condition_hash(condition),
2283 match_condition,
2284 condition,
2285 &iter);
2286 triggers_ht_node = cds_lfht_iter_get_node(&iter);
2287 if (!triggers_ht_node) {
2288 cmd_reply = LTTNG_ERR_TRIGGER_NOT_FOUND;
2289 goto end;
2290 } else {
2291 cmd_reply = LTTNG_OK;
2292 }
2293
2294 /* Remove trigger from channel_triggers_ht. */
2295 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter, trigger_list,
2296 channel_triggers_ht_node) {
2297 struct lttng_trigger_list_element *trigger_element, *tmp;
2298
2299 cds_list_for_each_entry_safe(trigger_element, tmp,
2300 &trigger_list->list, node) {
9b63a4aa
JG
2301 const struct lttng_condition *current_condition =
2302 lttng_trigger_get_const_condition(
ab0ee2ca
JG
2303 trigger_element->trigger);
2304
2305 assert(current_condition);
2306 if (!lttng_condition_is_equal(condition,
2307 current_condition)) {
2308 continue;
2309 }
2310
2311 DBG("[notification-thread] Removed trigger from channel_triggers_ht");
2312 cds_list_del(&trigger_element->node);
e4db5ace
JR
2313 /* A trigger can only appear once per channel */
2314 break;
ab0ee2ca
JG
2315 }
2316 }
2317
2318 /*
2319 * Remove and release the client list from
2320 * notification_trigger_clients_ht.
2321 */
ed327204
JG
2322 client_list = get_client_list_from_condition(state, condition);
2323 assert(client_list);
2324
505b2d90
JG
2325 /* Put new reference and the hashtable's reference. */
2326 notification_client_list_put(client_list);
2327 notification_client_list_put(client_list);
2328 client_list = NULL;
ab0ee2ca
JG
2329
2330 /* Remove trigger from triggers_ht. */
2331 trigger_ht_element = caa_container_of(triggers_ht_node,
2332 struct lttng_trigger_ht_element, node);
2333 cds_lfht_del(state->triggers_ht, triggers_ht_node);
2334
7ca172c1 2335 /* Release the ownership of the trigger. */
ab0ee2ca 2336 lttng_trigger_destroy(trigger_ht_element->trigger);
83b934ad 2337 call_rcu(&trigger_ht_element->rcu_node, free_lttng_trigger_ht_element_rcu);
ab0ee2ca
JG
2338end:
2339 rcu_read_unlock();
2340 if (_cmd_reply) {
2341 *_cmd_reply = cmd_reply;
2342 }
2343 return 0;
2344}
2345
2346/* Returns 0 on success, 1 on exit requested, negative value on error. */
2347int handle_notification_thread_command(
2348 struct notification_thread_handle *handle,
2349 struct notification_thread_state *state)
2350{
2351 int ret;
2352 uint64_t counter;
2353 struct notification_thread_command *cmd;
2354
8abe313a 2355 /* Read the event pipe to put it back into a quiescent state. */
18aeca05 2356 ret = lttng_read(lttng_pipe_get_readfd(handle->cmd_queue.event_pipe), &counter,
8abe313a 2357 sizeof(counter));
18aeca05 2358 if (ret != sizeof(counter)) {
ab0ee2ca
JG
2359 goto error;
2360 }
2361
2362 pthread_mutex_lock(&handle->cmd_queue.lock);
2363 cmd = cds_list_first_entry(&handle->cmd_queue.list,
2364 struct notification_thread_command, cmd_list_node);
97a71ea6
JR
2365 cds_list_del(&cmd->cmd_list_node);
2366 pthread_mutex_unlock(&handle->cmd_queue.lock);
ab0ee2ca
JG
2367 switch (cmd->type) {
2368 case NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER:
2369 DBG("[notification-thread] Received register trigger command");
2370 ret = handle_notification_thread_command_register_trigger(
2371 state, cmd->parameters.trigger,
2372 &cmd->reply_code);
2373 break;
2374 case NOTIFICATION_COMMAND_TYPE_UNREGISTER_TRIGGER:
2375 DBG("[notification-thread] Received unregister trigger command");
2376 ret = handle_notification_thread_command_unregister_trigger(
2377 state, cmd->parameters.trigger,
2378 &cmd->reply_code);
2379 break;
2380 case NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL:
2381 DBG("[notification-thread] Received add channel command");
2382 ret = handle_notification_thread_command_add_channel(
8abe313a
JG
2383 state,
2384 cmd->parameters.add_channel.session.name,
2385 cmd->parameters.add_channel.session.uid,
2386 cmd->parameters.add_channel.session.gid,
2387 cmd->parameters.add_channel.channel.name,
2388 cmd->parameters.add_channel.channel.domain,
2389 cmd->parameters.add_channel.channel.key,
2390 cmd->parameters.add_channel.channel.capacity,
ab0ee2ca
JG
2391 &cmd->reply_code);
2392 break;
2393 case NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL:
2394 DBG("[notification-thread] Received remove channel command");
2395 ret = handle_notification_thread_command_remove_channel(
2396 state, cmd->parameters.remove_channel.key,
2397 cmd->parameters.remove_channel.domain,
2398 &cmd->reply_code);
2399 break;
0ca52944 2400 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING:
0ca52944 2401 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED:
ed327204
JG
2402 DBG("[notification-thread] Received session rotation %s command",
2403 cmd->type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING ?
2404 "ongoing" : "completed");
2405 ret = handle_notification_thread_command_session_rotation(
0ca52944 2406 state,
ed327204
JG
2407 cmd->type,
2408 cmd->parameters.session_rotation.session_name,
2409 cmd->parameters.session_rotation.uid,
2410 cmd->parameters.session_rotation.gid,
2411 cmd->parameters.session_rotation.trace_archive_chunk_id,
2412 cmd->parameters.session_rotation.location,
0ca52944
JG
2413 &cmd->reply_code);
2414 break;
ab0ee2ca
JG
2415 case NOTIFICATION_COMMAND_TYPE_QUIT:
2416 DBG("[notification-thread] Received quit command");
2417 cmd->reply_code = LTTNG_OK;
2418 ret = 1;
2419 goto end;
d1ba29d2
JG
2420 case NOTIFICATION_COMMAND_TYPE_CLIENT_COMMUNICATION_UPDATE:
2421 {
2422 const enum client_transmission_status client_status =
2423 cmd->parameters.client_communication_update
2424 .status;
2425 const notification_client_id client_id =
2426 cmd->parameters.client_communication_update.id;
2427 struct notification_client *client;
2428
2429 rcu_read_lock();
2430 client = get_client_from_id(client_id, state);
2431
2432 if (!client) {
2433 /*
2434 * Client error was probably already picked-up by the
2435 * notification thread or it has disconnected
2436 * gracefully while this command was queued.
2437 */
2438 DBG("Failed to find notification client to update communication status, client id = %" PRIu64,
2439 client_id);
2440 ret = 0;
2441 } else {
2442 pthread_mutex_lock(&client->lock);
2443 ret = client_handle_transmission_status(
2444 client, client_status, state);
2445 pthread_mutex_unlock(&client->lock);
2446 }
2447 rcu_read_unlock();
2448 break;
2449 }
ab0ee2ca
JG
2450 default:
2451 ERR("[notification-thread] Unknown internal command received");
2452 goto error_unlock;
2453 }
2454
2455 if (ret) {
2456 goto error_unlock;
2457 }
2458end:
0ab399e0
JG
2459 if (cmd->is_async) {
2460 free(cmd);
2461 cmd = NULL;
2462 } else {
2463 lttng_waiter_wake_up(&cmd->reply_waiter);
2464 }
ab0ee2ca
JG
2465 return ret;
2466error_unlock:
2467 /* Wake-up and return a fatal error to the calling thread. */
8ada111f 2468 lttng_waiter_wake_up(&cmd->reply_waiter);
ab0ee2ca
JG
2469 cmd->reply_code = LTTNG_ERR_FATAL;
2470error:
2471 /* Indicate a fatal error to the caller. */
2472 return -1;
2473}
2474
ab0ee2ca
JG
2475static
2476int socket_set_non_blocking(int socket)
2477{
2478 int ret, flags;
2479
2480 /* Set the pipe as non-blocking. */
2481 ret = fcntl(socket, F_GETFL, 0);
2482 if (ret == -1) {
2483 PERROR("fcntl get socket flags");
2484 goto end;
2485 }
2486 flags = ret;
2487
2488 ret = fcntl(socket, F_SETFL, flags | O_NONBLOCK);
2489 if (ret == -1) {
2490 PERROR("fcntl set O_NONBLOCK socket flag");
2491 goto end;
2492 }
2493 DBG("Client socket (fd = %i) set as non-blocking", socket);
2494end:
2495 return ret;
2496}
2497
505b2d90 2498/* Client lock must be acquired by caller. */
ab0ee2ca 2499static
14fa22f8 2500int client_reset_inbound_state(struct notification_client *client)
ab0ee2ca
JG
2501{
2502 int ret;
2503
505b2d90
JG
2504 ASSERT_LOCKED(client->lock);
2505
ab0ee2ca
JG
2506 ret = lttng_dynamic_buffer_set_size(
2507 &client->communication.inbound.buffer, 0);
2508 assert(!ret);
2509
2510 client->communication.inbound.bytes_to_receive =
2511 sizeof(struct lttng_notification_channel_message);
2512 client->communication.inbound.msg_type =
2513 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN;
ab0ee2ca
JG
2514 LTTNG_SOCK_SET_UID_CRED(&client->communication.inbound.creds, -1);
2515 LTTNG_SOCK_SET_GID_CRED(&client->communication.inbound.creds, -1);
14fa22f8
JG
2516 ret = lttng_dynamic_buffer_set_size(
2517 &client->communication.inbound.buffer,
2518 client->communication.inbound.bytes_to_receive);
2519 return ret;
ab0ee2ca
JG
2520}
2521
2522int handle_notification_thread_client_connect(
2523 struct notification_thread_state *state)
2524{
2525 int ret;
2526 struct notification_client *client;
2527
2528 DBG("[notification-thread] Handling new notification channel client connection");
2529
2530 client = zmalloc(sizeof(*client));
2531 if (!client) {
2532 /* Fatal error. */
2533 ret = -1;
2534 goto error;
2535 }
505b2d90 2536 pthread_mutex_init(&client->lock, NULL);
ac1889bf 2537 client->id = state->next_notification_client_id++;
ab0ee2ca
JG
2538 CDS_INIT_LIST_HEAD(&client->condition_list);
2539 lttng_dynamic_buffer_init(&client->communication.inbound.buffer);
2540 lttng_dynamic_buffer_init(&client->communication.outbound.buffer);
01ea340e 2541 client->communication.inbound.expect_creds = true;
505b2d90
JG
2542
2543 pthread_mutex_lock(&client->lock);
14fa22f8 2544 ret = client_reset_inbound_state(client);
505b2d90 2545 pthread_mutex_unlock(&client->lock);
14fa22f8
JG
2546 if (ret) {
2547 ERR("[notification-thread] Failed to reset client communication's inbound state");
e742b055 2548 ret = 0;
14fa22f8
JG
2549 goto error;
2550 }
ab0ee2ca
JG
2551
2552 ret = lttcomm_accept_unix_sock(state->notification_channel_socket);
2553 if (ret < 0) {
2554 ERR("[notification-thread] Failed to accept new notification channel client connection");
2555 ret = 0;
2556 goto error;
2557 }
2558
2559 client->socket = ret;
2560
2561 ret = socket_set_non_blocking(client->socket);
2562 if (ret) {
2563 ERR("[notification-thread] Failed to set new notification channel client connection socket as non-blocking");
2564 goto error;
2565 }
2566
2567 ret = lttcomm_setsockopt_creds_unix_sock(client->socket);
2568 if (ret < 0) {
2569 ERR("[notification-thread] Failed to set socket options on new notification channel client socket");
2570 ret = 0;
2571 goto error;
2572 }
2573
2574 ret = lttng_poll_add(&state->events, client->socket,
2575 LPOLLIN | LPOLLERR |
2576 LPOLLHUP | LPOLLRDHUP);
2577 if (ret < 0) {
2578 ERR("[notification-thread] Failed to add notification channel client socket to poll set");
2579 ret = 0;
2580 goto error;
2581 }
2582 DBG("[notification-thread] Added new notification channel client socket (%i) to poll set",
2583 client->socket);
2584
ab0ee2ca
JG
2585 rcu_read_lock();
2586 cds_lfht_add(state->client_socket_ht,
2587 hash_client_socket(client->socket),
2588 &client->client_socket_ht_node);
ac1889bf
JG
2589 cds_lfht_add(state->client_id_ht,
2590 hash_client_id(client->id),
2591 &client->client_id_ht_node);
ab0ee2ca
JG
2592 rcu_read_unlock();
2593
2594 return ret;
2595error:
2596 notification_client_destroy(client, state);
2597 return ret;
2598}
2599
505b2d90
JG
2600/* RCU read-lock must be held by the caller. */
2601/* Client lock must be held by the caller */
2602static
2603int notification_thread_client_disconnect(
2604 struct notification_client *client,
ab0ee2ca 2605 struct notification_thread_state *state)
505b2d90
JG
2606{
2607 int ret;
2608 struct lttng_condition_list_element *condition_list_element, *tmp;
2609
2610 /* Acquire the client lock to disable its communication atomically. */
2611 client->communication.active = false;
2612 ret = lttng_poll_del(&state->events, client->socket);
2613 if (ret) {
2614 ERR("[notification-thread] Failed to remove client socket %d from poll set",
2615 client->socket);
2616 }
2617
2618 cds_lfht_del(state->client_socket_ht, &client->client_socket_ht_node);
2619 cds_lfht_del(state->client_id_ht, &client->client_id_ht_node);
2620
2621 /* Release all conditions to which the client was subscribed. */
2622 cds_list_for_each_entry_safe(condition_list_element, tmp,
2623 &client->condition_list, node) {
2624 (void) notification_thread_client_unsubscribe(client,
2625 condition_list_element->condition, state, NULL);
2626 }
2627
2628 /*
2629 * Client no longer accessible to other threads (through the
2630 * client lists).
2631 */
2632 notification_client_destroy(client, state);
2633 return ret;
2634}
2635
2636int handle_notification_thread_client_disconnect(
2637 int client_socket, struct notification_thread_state *state)
ab0ee2ca
JG
2638{
2639 int ret = 0;
2640 struct notification_client *client;
2641
2642 rcu_read_lock();
2643 DBG("[notification-thread] Closing client connection (socket fd = %i)",
2644 client_socket);
2645 client = get_client_from_socket(client_socket, state);
2646 if (!client) {
2647 /* Internal state corruption, fatal error. */
2648 ERR("[notification-thread] Unable to find client (socket fd = %i)",
2649 client_socket);
2650 ret = -1;
2651 goto end;
2652 }
2653
505b2d90
JG
2654 pthread_mutex_lock(&client->lock);
2655 ret = notification_thread_client_disconnect(client, state);
2656 pthread_mutex_unlock(&client->lock);
ab0ee2ca
JG
2657end:
2658 rcu_read_unlock();
2659 return ret;
2660}
2661
2662int handle_notification_thread_client_disconnect_all(
2663 struct notification_thread_state *state)
2664{
2665 struct cds_lfht_iter iter;
2666 struct notification_client *client;
2667 bool error_encoutered = false;
2668
2669 rcu_read_lock();
2670 DBG("[notification-thread] Closing all client connections");
2671 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
505b2d90 2672 client_socket_ht_node) {
ab0ee2ca
JG
2673 int ret;
2674
505b2d90
JG
2675 pthread_mutex_lock(&client->lock);
2676 ret = notification_thread_client_disconnect(
2677 client, state);
2678 pthread_mutex_unlock(&client->lock);
ab0ee2ca
JG
2679 if (ret) {
2680 error_encoutered = true;
2681 }
2682 }
2683 rcu_read_unlock();
2684 return error_encoutered ? 1 : 0;
2685}
2686
2687int handle_notification_thread_trigger_unregister_all(
2688 struct notification_thread_state *state)
2689{
4149ace8 2690 bool error_occurred = false;
ab0ee2ca
JG
2691 struct cds_lfht_iter iter;
2692 struct lttng_trigger_ht_element *trigger_ht_element;
2693
763de384 2694 rcu_read_lock();
ab0ee2ca
JG
2695 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
2696 node) {
2697 int ret = handle_notification_thread_command_unregister_trigger(
2698 state, trigger_ht_element->trigger, NULL);
2699 if (ret) {
4149ace8 2700 error_occurred = true;
ab0ee2ca
JG
2701 }
2702 }
763de384 2703 rcu_read_unlock();
4149ace8 2704 return error_occurred ? -1 : 0;
ab0ee2ca
JG
2705}
2706
dd877ea6
JG
2707static
2708int client_handle_transmission_status(
2709 struct notification_client *client,
2710 enum client_transmission_status transmission_status,
2711 struct notification_thread_state *state)
2712{
2713 int ret = 0;
2714
2715 switch (transmission_status) {
2716 case CLIENT_TRANSMISSION_STATUS_COMPLETE:
2717 ret = lttng_poll_mod(&state->events, client->socket,
2718 CLIENT_POLL_MASK_IN);
2719 if (ret) {
2720 goto end;
2721 }
2722
2723 client->communication.outbound.queued_command_reply = false;
2724 client->communication.outbound.dropped_notification = false;
2725 break;
2726 case CLIENT_TRANSMISSION_STATUS_QUEUED:
2727 /*
2728 * We want to be notified whenever there is buffer space
2729 * available to send the rest of the payload.
2730 */
2731 ret = lttng_poll_mod(&state->events, client->socket,
2732 CLIENT_POLL_MASK_IN_OUT);
2733 if (ret) {
2734 goto end;
2735 }
2736 break;
2737 case CLIENT_TRANSMISSION_STATUS_FAIL:
2738 ret = notification_thread_client_disconnect(client, state);
2739 if (ret) {
2740 goto end;
2741 }
2742 break;
2743 case CLIENT_TRANSMISSION_STATUS_ERROR:
2744 ret = -1;
2745 goto end;
2746 default:
2747 abort();
2748 }
2749end:
2750 return ret;
2751}
2752
505b2d90 2753/* Client lock must be acquired by caller. */
ab0ee2ca 2754static
dd877ea6 2755enum client_transmission_status client_flush_outgoing_queue(
d1ba29d2 2756 struct notification_client *client)
ab0ee2ca
JG
2757{
2758 ssize_t ret;
2759 size_t to_send_count;
dd877ea6 2760 enum client_transmission_status status;
ab0ee2ca 2761
505b2d90
JG
2762 ASSERT_LOCKED(client->lock);
2763
d1ba29d2
JG
2764 if (!client->communication.active) {
2765 status = CLIENT_TRANSMISSION_STATUS_FAIL;
2766 goto end;
2767 }
2768
ab0ee2ca
JG
2769 assert(client->communication.outbound.buffer.size != 0);
2770 to_send_count = client->communication.outbound.buffer.size;
2771 DBG("[notification-thread] Flushing client (socket fd = %i) outgoing queue",
2772 client->socket);
2773
2774 ret = lttcomm_send_unix_sock_non_block(client->socket,
2775 client->communication.outbound.buffer.data,
2776 to_send_count);
44c180ca 2777 if ((ret >= 0 && ret < to_send_count)) {
ab0ee2ca
JG
2778 DBG("[notification-thread] Client (socket fd = %i) outgoing queue could not be completely flushed",
2779 client->socket);
2780 to_send_count -= max(ret, 0);
2781
2782 memcpy(client->communication.outbound.buffer.data,
2783 client->communication.outbound.buffer.data +
2784 client->communication.outbound.buffer.size - to_send_count,
2785 to_send_count);
2786 ret = lttng_dynamic_buffer_set_size(
2787 &client->communication.outbound.buffer,
2788 to_send_count);
2789 if (ret) {
2790 goto error;
2791 }
dd877ea6 2792 status = CLIENT_TRANSMISSION_STATUS_QUEUED;
ab0ee2ca
JG
2793 } else if (ret < 0) {
2794 /* Generic error, disconnect the client. */
dd877ea6 2795 ERR("[notification-thread] Failed to flush outgoing queue, disconnecting client (socket fd = %i)",
ab0ee2ca 2796 client->socket);
dd877ea6 2797 status = CLIENT_TRANSMISSION_STATUS_FAIL;
ab0ee2ca
JG
2798 } else {
2799 /* No error and flushed the queue completely. */
2800 ret = lttng_dynamic_buffer_set_size(
2801 &client->communication.outbound.buffer, 0);
2802 if (ret) {
2803 goto error;
2804 }
dd877ea6
JG
2805 status = CLIENT_TRANSMISSION_STATUS_COMPLETE;
2806 }
d1ba29d2
JG
2807end:
2808 return status;
ab0ee2ca 2809error:
d1ba29d2 2810 return CLIENT_TRANSMISSION_STATUS_ERROR;
ab0ee2ca
JG
2811}
2812
505b2d90 2813/* Client lock must be acquired by caller. */
ab0ee2ca
JG
2814static
2815int client_send_command_reply(struct notification_client *client,
2816 struct notification_thread_state *state,
2817 enum lttng_notification_channel_status status)
2818{
2819 int ret;
2820 struct lttng_notification_channel_command_reply reply = {
2821 .status = (int8_t) status,
2822 };
2823 struct lttng_notification_channel_message msg = {
2824 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY,
2825 .size = sizeof(reply),
2826 };
2827 char buffer[sizeof(msg) + sizeof(reply)];
d1ba29d2 2828 enum client_transmission_status transmission_status;
ab0ee2ca 2829
505b2d90
JG
2830 ASSERT_LOCKED(client->lock);
2831
ab0ee2ca
JG
2832 if (client->communication.outbound.queued_command_reply) {
2833 /* Protocol error. */
2834 goto error;
2835 }
2836
2837 memcpy(buffer, &msg, sizeof(msg));
2838 memcpy(buffer + sizeof(msg), &reply, sizeof(reply));
2839 DBG("[notification-thread] Send command reply (%i)", (int) status);
2840
2841 /* Enqueue buffer to outgoing queue and flush it. */
2842 ret = lttng_dynamic_buffer_append(
2843 &client->communication.outbound.buffer,
2844 buffer, sizeof(buffer));
2845 if (ret) {
2846 goto error;
2847 }
2848
d1ba29d2
JG
2849 transmission_status = client_flush_outgoing_queue(client);
2850 ret = client_handle_transmission_status(
2851 client, transmission_status, state);
ab0ee2ca
JG
2852 if (ret) {
2853 goto error;
2854 }
2855
2856 if (client->communication.outbound.buffer.size != 0) {
2857 /* Queue could not be emptied. */
2858 client->communication.outbound.queued_command_reply = true;
2859 }
2860
2861 return 0;
2862error:
2863 return -1;
2864}
2865
505b2d90
JG
2866static
2867int client_handle_message_unknown(struct notification_client *client,
2868 struct notification_thread_state *state)
2869{
2870 int ret;
2871
2872 pthread_mutex_lock(&client->lock);
2873
2874 /*
2875 * Receiving message header. The function will be called again
2876 * once the rest of the message as been received and can be
2877 * interpreted.
2878 */
2879 const struct lttng_notification_channel_message *msg;
2880
2881 assert(sizeof(*msg) == client->communication.inbound.buffer.size);
2882 msg = (const struct lttng_notification_channel_message *)
2883 client->communication.inbound.buffer.data;
2884
2885 if (msg->size == 0 ||
2886 msg->size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) {
2887 ERR("[notification-thread] Invalid notification channel message: length = %u",
2888 msg->size);
2889 ret = -1;
2890 goto end;
2891 }
2892
2893 switch (msg->type) {
2894 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
2895 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
2896 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
2897 break;
2898 default:
2899 ret = -1;
2900 ERR("[notification-thread] Invalid notification channel message: unexpected message type");
2901 goto end;
2902 }
2903
2904 client->communication.inbound.bytes_to_receive = msg->size;
2905 client->communication.inbound.msg_type =
2906 (enum lttng_notification_channel_message_type) msg->type;
2907 ret = lttng_dynamic_buffer_set_size(
2908 &client->communication.inbound.buffer, msg->size);
2909end:
2910 pthread_mutex_unlock(&client->lock);
2911 return ret;
2912}
2913
2914static
2915int client_handle_message_handshake(struct notification_client *client,
2916 struct notification_thread_state *state)
2917{
2918 int ret;
2919 struct lttng_notification_channel_command_handshake *handshake_client;
2920 const struct lttng_notification_channel_command_handshake handshake_reply = {
2921 .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
2922 .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
2923 };
2924 const struct lttng_notification_channel_message msg_header = {
2925 .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE,
2926 .size = sizeof(handshake_reply),
2927 };
2928 enum lttng_notification_channel_status status =
2929 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
2930 char send_buffer[sizeof(msg_header) + sizeof(handshake_reply)];
d1ba29d2 2931 enum client_transmission_status transmission_status;
505b2d90
JG
2932
2933 pthread_mutex_lock(&client->lock);
2934
2935 memcpy(send_buffer, &msg_header, sizeof(msg_header));
2936 memcpy(send_buffer + sizeof(msg_header), &handshake_reply,
2937 sizeof(handshake_reply));
2938
2939 handshake_client =
2940 (struct lttng_notification_channel_command_handshake *)
2941 client->communication.inbound.buffer
2942 .data;
2943 client->major = handshake_client->major;
2944 client->minor = handshake_client->minor;
2945 if (!client->communication.inbound.creds_received) {
2946 ERR("[notification-thread] No credentials received from client");
2947 ret = -1;
2948 goto end;
2949 }
2950
2951 client->uid = LTTNG_SOCK_GET_UID_CRED(
2952 &client->communication.inbound.creds);
2953 client->gid = LTTNG_SOCK_GET_GID_CRED(
2954 &client->communication.inbound.creds);
2955 DBG("[notification-thread] Received handshake from client (uid = %u, gid = %u) with version %i.%i",
2956 client->uid, client->gid, (int) client->major,
2957 (int) client->minor);
2958
2959 if (handshake_client->major !=
2960 LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) {
2961 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNSUPPORTED_VERSION;
2962 }
2963
2964 ret = lttng_dynamic_buffer_append(
2965 &client->communication.outbound.buffer, send_buffer,
2966 sizeof(send_buffer));
2967 if (ret) {
2968 ERR("[notification-thread] Failed to send protocol version to notification channel client");
2969 goto end;
2970 }
2971
2972 client->validated = true;
2973 client->communication.active = true;
2974
d1ba29d2
JG
2975 transmission_status = client_flush_outgoing_queue(client);
2976 ret = client_handle_transmission_status(
2977 client, transmission_status, state);
505b2d90
JG
2978 if (ret) {
2979 goto end;
2980 }
2981
2982 ret = client_send_command_reply(client, state, status);
2983 if (ret) {
2984 ERR("[notification-thread] Failed to send reply to notification channel client");
2985 goto end;
2986 }
2987
2988 /* Set reception state to receive the next message header. */
2989 ret = client_reset_inbound_state(client);
2990 if (ret) {
2991 ERR("[notification-thread] Failed to reset client communication's inbound state");
2992 goto end;
2993 }
2994
2995end:
2996 pthread_mutex_unlock(&client->lock);
2997 return ret;
2998}
2999
3000static
3001int client_handle_message_subscription(
3002 struct notification_client *client,
3003 enum lttng_notification_channel_message_type msg_type,
3004 struct notification_thread_state *state)
3005{
3006 int ret;
3007 struct lttng_condition *condition;
3008 enum lttng_notification_channel_status status =
3009 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
3010 struct lttng_payload_view condition_view =
3011 lttng_payload_view_from_dynamic_buffer(
3012 &client->communication.inbound.buffer,
3013 0, -1);
3014 size_t expected_condition_size;
3015
3016 pthread_mutex_lock(&client->lock);
3017 expected_condition_size = client->communication.inbound.buffer.size;
3018 pthread_mutex_unlock(&client->lock);
3019
3020 ret = lttng_condition_create_from_payload(&condition_view, &condition);
3021 if (ret != expected_condition_size) {
3022 ERR("[notification-thread] Malformed condition received from client");
3023 goto end;
3024 }
3025
3026 if (msg_type == LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE) {
3027 ret = notification_thread_client_subscribe(
3028 client, condition, state, &status);
3029 } else {
3030 ret = notification_thread_client_unsubscribe(
3031 client, condition, state, &status);
3032 }
3033 if (ret) {
3034 goto end;
3035 }
3036
3037 pthread_mutex_lock(&client->lock);
3038 ret = client_send_command_reply(client, state, status);
3039 if (ret) {
3040 ERR("[notification-thread] Failed to send reply to notification channel client");
3041 goto end_unlock;
3042 }
3043
3044 /* Set reception state to receive the next message header. */
3045 ret = client_reset_inbound_state(client);
3046 if (ret) {
3047 ERR("[notification-thread] Failed to reset client communication's inbound state");
3048 goto end_unlock;
3049 }
3050
3051end_unlock:
3052 pthread_mutex_unlock(&client->lock);
3053end:
3054 return ret;
3055}
3056
ab0ee2ca
JG
3057static
3058int client_dispatch_message(struct notification_client *client,
3059 struct notification_thread_state *state)
3060{
3061 int ret = 0;
3062
3063 if (client->communication.inbound.msg_type !=
3064 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE &&
3065 client->communication.inbound.msg_type !=
3066 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN &&
3067 !client->validated) {
3068 WARN("[notification-thread] client attempted a command before handshake");
3069 ret = -1;
3070 goto end;
3071 }
3072
3073 switch (client->communication.inbound.msg_type) {
3074 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN:
3075 {
505b2d90 3076 ret = client_handle_message_unknown(client, state);
ab0ee2ca
JG
3077 break;
3078 }
3079 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
3080 {
505b2d90 3081 ret = client_handle_message_handshake(client, state);
ab0ee2ca
JG
3082 break;
3083 }
3084 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
3085 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
3086 {
505b2d90
JG
3087 ret = client_handle_message_subscription(client,
3088 client->communication.inbound.msg_type, state);
ab0ee2ca
JG
3089 break;
3090 }
3091 default:
3092 abort();
3093 }
3094end:
3095 return ret;
3096}
3097
3098/* Incoming data from client. */
3099int handle_notification_thread_client_in(
3100 struct notification_thread_state *state, int socket)
3101{
14fa22f8 3102 int ret = 0;
ab0ee2ca
JG
3103 struct notification_client *client;
3104 ssize_t recv_ret;
3105 size_t offset;
505b2d90 3106 bool message_is_complete = false;
ab0ee2ca
JG
3107
3108 client = get_client_from_socket(socket, state);
3109 if (!client) {
3110 /* Internal error, abort. */
3111 ret = -1;
3112 goto end;
3113 }
3114
505b2d90 3115 pthread_mutex_lock(&client->lock);
14fa22f8
JG
3116 offset = client->communication.inbound.buffer.size -
3117 client->communication.inbound.bytes_to_receive;
01ea340e 3118 if (client->communication.inbound.expect_creds) {
ab0ee2ca
JG
3119 recv_ret = lttcomm_recv_creds_unix_sock(socket,
3120 client->communication.inbound.buffer.data + offset,
3121 client->communication.inbound.bytes_to_receive,
3122 &client->communication.inbound.creds);
3123 if (recv_ret > 0) {
01ea340e 3124 client->communication.inbound.expect_creds = false;
ab0ee2ca
JG
3125 client->communication.inbound.creds_received = true;
3126 }
3127 } else {
3128 recv_ret = lttcomm_recv_unix_sock_non_block(socket,
3129 client->communication.inbound.buffer.data + offset,
3130 client->communication.inbound.bytes_to_receive);
3131 }
505b2d90
JG
3132 if (recv_ret >= 0) {
3133 client->communication.inbound.bytes_to_receive -= recv_ret;
3134 message_is_complete = client->communication.inbound
3135 .bytes_to_receive == 0;
3136 }
3137 pthread_mutex_unlock(&client->lock);
ab0ee2ca
JG
3138 if (recv_ret < 0) {
3139 goto error_disconnect_client;
3140 }
3141
505b2d90 3142 if (message_is_complete) {
ab0ee2ca
JG
3143 ret = client_dispatch_message(client, state);
3144 if (ret) {
3145 /*
3146 * Only returns an error if this client must be
3147 * disconnected.
3148 */
3149 goto error_disconnect_client;
3150 }
ab0ee2ca
JG
3151 }
3152end:
3153 return ret;
3154error_disconnect_client:
505b2d90
JG
3155 pthread_mutex_lock(&client->lock);
3156 ret = notification_thread_client_disconnect(client, state);
3157 pthread_mutex_unlock(&client->lock);
ab0ee2ca
JG
3158 return ret;
3159}
3160
3161/* Client ready to receive outgoing data. */
3162int handle_notification_thread_client_out(
3163 struct notification_thread_state *state, int socket)
3164{
3165 int ret;
3166 struct notification_client *client;
d1ba29d2 3167 enum client_transmission_status transmission_status;
ab0ee2ca
JG
3168
3169 client = get_client_from_socket(socket, state);
3170 if (!client) {
3171 /* Internal error, abort. */
3172 ret = -1;
3173 goto end;
3174 }
3175
505b2d90 3176 pthread_mutex_lock(&client->lock);
d1ba29d2
JG
3177 transmission_status = client_flush_outgoing_queue(client);
3178 ret = client_handle_transmission_status(
3179 client, transmission_status, state);
505b2d90 3180 pthread_mutex_unlock(&client->lock);
ab0ee2ca
JG
3181 if (ret) {
3182 goto end;
3183 }
3184end:
3185 return ret;
3186}
3187
3188static
e8360425
JD
3189bool evaluate_buffer_usage_condition(const struct lttng_condition *condition,
3190 const struct channel_state_sample *sample,
3191 uint64_t buffer_capacity)
ab0ee2ca
JG
3192{
3193 bool result = false;
3194 uint64_t threshold;
3195 enum lttng_condition_type condition_type;
e8360425 3196 const struct lttng_condition_buffer_usage *use_condition = container_of(
ab0ee2ca
JG
3197 condition, struct lttng_condition_buffer_usage,
3198 parent);
3199
ab0ee2ca
JG
3200 if (use_condition->threshold_bytes.set) {
3201 threshold = use_condition->threshold_bytes.value;
3202 } else {
3203 /*
3204 * Threshold was expressed as a ratio.
3205 *
3206 * TODO the threshold (in bytes) of conditions expressed
3207 * as a ratio of total buffer size could be cached to
3208 * forego this double-multiplication or it could be performed
3209 * as fixed-point math.
3210 *
d49487dc 3211 * Note that caching should accommodates the case where the
ab0ee2ca
JG
3212 * condition applies to multiple channels (i.e. don't assume
3213 * that all channels matching my_chann* have the same size...)
3214 */
3215 threshold = (uint64_t) (use_condition->threshold_ratio.value *
3216 (double) buffer_capacity);
3217 }
3218
3219 condition_type = lttng_condition_get_type(condition);
3220 if (condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW) {
3221 DBG("[notification-thread] Low buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
3222 threshold, sample->highest_usage);
3223
3224 /*
3225 * The low condition should only be triggered once _all_ of the
3226 * streams in a channel have gone below the "low" threshold.
3227 */
3228 if (sample->highest_usage <= threshold) {
3229 result = true;
3230 }
3231 } else {
3232 DBG("[notification-thread] High buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
3233 threshold, sample->highest_usage);
3234
3235 /*
3236 * For high buffer usage scenarios, we want to trigger whenever
3237 * _any_ of the streams has reached the "high" threshold.
3238 */
3239 if (sample->highest_usage >= threshold) {
3240 result = true;
3241 }
3242 }
e8360425 3243
ab0ee2ca
JG
3244 return result;
3245}
3246
3247static
e8360425
JD
3248bool evaluate_session_consumed_size_condition(
3249 const struct lttng_condition *condition,
3250 uint64_t session_consumed_size)
3251{
3252 uint64_t threshold;
3253 const struct lttng_condition_session_consumed_size *size_condition =
3254 container_of(condition,
3255 struct lttng_condition_session_consumed_size,
3256 parent);
3257
3258 threshold = size_condition->consumed_threshold_bytes.value;
3259 DBG("[notification-thread] Session consumed size condition being evaluated: threshold = %" PRIu64 ", current size = %" PRIu64,
3260 threshold, session_consumed_size);
3261 return session_consumed_size >= threshold;