notification: mark tracer source element as out of poll set
[lttng-tools.git] / src / bin / lttng-sessiond / notification-thread-events.c
CommitLineData
ab0ee2ca 1/*
ab5be9fa 2 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
ab0ee2ca 3 *
ab5be9fa 4 * SPDX-License-Identifier: GPL-2.0-only
ab0ee2ca 5 *
ab0ee2ca
JG
6 */
7
f2b3ef9f
JG
8#include "lttng/action/action.h"
9#include "lttng/trigger/trigger-internal.h"
ab0ee2ca
JG
10#define _LGPL_SOURCE
11#include <urcu.h>
12#include <urcu/rculfhash.h>
13
ab0ee2ca
JG
14#include <common/defaults.h>
15#include <common/error.h>
16#include <common/futex.h>
17#include <common/unix.h>
18#include <common/dynamic-buffer.h>
19#include <common/hashtable/utils.h>
20#include <common/sessiond-comm/sessiond-comm.h>
21#include <common/macros.h>
22#include <lttng/condition/condition.h>
9b63a4aa 23#include <lttng/action/action-internal.h>
ab0ee2ca
JG
24#include <lttng/notification/notification-internal.h>
25#include <lttng/condition/condition-internal.h>
26#include <lttng/condition/buffer-usage-internal.h>
e8360425 27#include <lttng/condition/session-consumed-size-internal.h>
ea9a44f0 28#include <lttng/condition/session-rotation-internal.h>
959e3c66 29#include <lttng/condition/event-rule-internal.h>
d02d7404 30#include <lttng/domain-internal.h>
ab0ee2ca 31#include <lttng/notification/channel-internal.h>
85c06c44 32#include <lttng/trigger/trigger-internal.h>
959e3c66 33#include <lttng/event-rule/event-rule-internal.h>
1da26331 34
ab0ee2ca
JG
35#include <time.h>
36#include <unistd.h>
37#include <assert.h>
38#include <inttypes.h>
d53ea4e4 39#include <fcntl.h>
ab0ee2ca 40
e98a976a 41#include "condition-internal.h"
1da26331
JG
42#include "notification-thread.h"
43#include "notification-thread-events.h"
44#include "notification-thread-commands.h"
45#include "lttng-sessiond.h"
46#include "kernel.h"
47
ab0ee2ca
JG
48#define CLIENT_POLL_MASK_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
49#define CLIENT_POLL_MASK_IN_OUT (CLIENT_POLL_MASK_IN | LPOLLOUT)
50
51eab943
JG
51enum lttng_object_type {
52 LTTNG_OBJECT_TYPE_UNKNOWN,
53 LTTNG_OBJECT_TYPE_NONE,
54 LTTNG_OBJECT_TYPE_CHANNEL,
55 LTTNG_OBJECT_TYPE_SESSION,
56};
57
ab0ee2ca 58struct lttng_trigger_list_element {
9e0bb80e 59 /* No ownership of the trigger object is assumed. */
f2b3ef9f 60 struct lttng_trigger *trigger;
ab0ee2ca
JG
61 struct cds_list_head node;
62};
63
64struct lttng_channel_trigger_list {
65 struct channel_key channel_key;
ea9a44f0 66 /* List of struct lttng_trigger_list_element. */
ab0ee2ca 67 struct cds_list_head list;
ea9a44f0 68 /* Node in the channel_triggers_ht */
ab0ee2ca 69 struct cds_lfht_node channel_triggers_ht_node;
83b934ad
MD
70 /* call_rcu delayed reclaim. */
71 struct rcu_head rcu_node;
ab0ee2ca
JG
72};
73
ea9a44f0
JG
74/*
75 * List of triggers applying to a given session.
76 *
77 * See:
78 * - lttng_session_trigger_list_create()
79 * - lttng_session_trigger_list_build()
80 * - lttng_session_trigger_list_destroy()
81 * - lttng_session_trigger_list_add()
82 */
83struct lttng_session_trigger_list {
84 /*
85 * Not owned by this; points to the session_info structure's
86 * session name.
87 */
88 const char *session_name;
89 /* List of struct lttng_trigger_list_element. */
90 struct cds_list_head list;
91 /* Node in the session_triggers_ht */
92 struct cds_lfht_node session_triggers_ht_node;
93 /*
94 * Weak reference to the notification system's session triggers
95 * hashtable.
96 *
97 * The session trigger list structure structure is owned by
98 * the session's session_info.
99 *
100 * The session_info is kept alive the the channel_infos holding a
101 * reference to it (reference counting). When those channels are
102 * destroyed (at runtime or on teardown), the reference they hold
103 * to the session_info are released. On destruction of session_info,
104 * session_info_destroy() will remove the list of triggers applying
105 * to this session from the notification system's state.
106 *
107 * This implies that the session_triggers_ht must be destroyed
108 * after the channels.
109 */
110 struct cds_lfht *session_triggers_ht;
111 /* Used for delayed RCU reclaim. */
112 struct rcu_head rcu_node;
113};
114
ab0ee2ca
JG
115struct lttng_trigger_ht_element {
116 struct lttng_trigger *trigger;
117 struct cds_lfht_node node;
242388e4 118 struct cds_lfht_node node_by_name_uid;
83b934ad
MD
119 /* call_rcu delayed reclaim. */
120 struct rcu_head rcu_node;
ab0ee2ca
JG
121};
122
123struct lttng_condition_list_element {
124 struct lttng_condition *condition;
125 struct cds_list_head node;
126};
127
ab0ee2ca
JG
128struct channel_state_sample {
129 struct channel_key key;
130 struct cds_lfht_node channel_state_ht_node;
131 uint64_t highest_usage;
132 uint64_t lowest_usage;
e8360425 133 uint64_t channel_total_consumed;
83b934ad
MD
134 /* call_rcu delayed reclaim. */
135 struct rcu_head rcu_node;
ab0ee2ca
JG
136};
137
e4db5ace 138static unsigned long hash_channel_key(struct channel_key *key);
ed327204 139static int evaluate_buffer_condition(const struct lttng_condition *condition,
e4db5ace 140 struct lttng_evaluation **evaluation,
e8360425
JD
141 const struct notification_thread_state *state,
142 const struct channel_state_sample *previous_sample,
143 const struct channel_state_sample *latest_sample,
144 uint64_t previous_session_consumed_total,
145 uint64_t latest_session_consumed_total,
146 struct channel_info *channel_info);
e4db5ace 147static
9b63a4aa
JG
148int send_evaluation_to_clients(const struct lttng_trigger *trigger,
149 const struct lttng_evaluation *evaluation,
e4db5ace
JR
150 struct notification_client_list *client_list,
151 struct notification_thread_state *state,
152 uid_t channel_uid, gid_t channel_gid);
153
8abe313a 154
ea9a44f0 155/* session_info API */
8abe313a
JG
156static
157void session_info_destroy(void *_data);
158static
159void session_info_get(struct session_info *session_info);
160static
161void session_info_put(struct session_info *session_info);
162static
163struct session_info *session_info_create(const char *name,
ea9a44f0 164 uid_t uid, gid_t gid,
1eee26c5
JG
165 struct lttng_session_trigger_list *trigger_list,
166 struct cds_lfht *sessions_ht);
8abe313a
JG
167static
168void session_info_add_channel(struct session_info *session_info,
169 struct channel_info *channel_info);
170static
171void session_info_remove_channel(struct session_info *session_info,
172 struct channel_info *channel_info);
173
ea9a44f0
JG
174/* lttng_session_trigger_list API */
175static
176struct lttng_session_trigger_list *lttng_session_trigger_list_create(
177 const char *session_name,
178 struct cds_lfht *session_triggers_ht);
179static
180struct lttng_session_trigger_list *lttng_session_trigger_list_build(
181 const struct notification_thread_state *state,
182 const char *session_name);
183static
184void lttng_session_trigger_list_destroy(
185 struct lttng_session_trigger_list *list);
186static
187int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
f2b3ef9f 188 struct lttng_trigger *trigger);
ea9a44f0 189
f2b3ef9f
JG
190static
191int client_handle_transmission_status(
192 struct notification_client *client,
193 enum client_transmission_status transmission_status,
194 struct notification_thread_state *state);
ea9a44f0 195
242388e4
JR
196static
197void free_lttng_trigger_ht_element_rcu(struct rcu_head *node);
198
ab0ee2ca 199static
ac1889bf 200int match_client_socket(struct cds_lfht_node *node, const void *key)
ab0ee2ca
JG
201{
202 /* This double-cast is intended to supress pointer-to-cast warning. */
ac1889bf
JG
203 const int socket = (int) (intptr_t) key;
204 const struct notification_client *client = caa_container_of(node,
205 struct notification_client, client_socket_ht_node);
ab0ee2ca 206
ac1889bf
JG
207 return client->socket == socket;
208}
209
210static
211int match_client_id(struct cds_lfht_node *node, const void *key)
212{
213 /* This double-cast is intended to supress pointer-to-cast warning. */
214 const notification_client_id id = *((notification_client_id *) key);
215 const struct notification_client *client = caa_container_of(
216 node, struct notification_client, client_id_ht_node);
ab0ee2ca 217
ac1889bf 218 return client->id == id;
ab0ee2ca
JG
219}
220
221static
222int match_channel_trigger_list(struct cds_lfht_node *node, const void *key)
223{
224 struct channel_key *channel_key = (struct channel_key *) key;
225 struct lttng_channel_trigger_list *trigger_list;
226
227 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
228 channel_triggers_ht_node);
229
230 return !!((channel_key->key == trigger_list->channel_key.key) &&
231 (channel_key->domain == trigger_list->channel_key.domain));
232}
233
51eab943
JG
234static
235int match_session_trigger_list(struct cds_lfht_node *node, const void *key)
236{
237 const char *session_name = (const char *) key;
238 struct lttng_session_trigger_list *trigger_list;
239
240 trigger_list = caa_container_of(node, struct lttng_session_trigger_list,
241 session_triggers_ht_node);
242
243 return !!(strcmp(trigger_list->session_name, session_name) == 0);
244}
245
ab0ee2ca
JG
246static
247int match_channel_state_sample(struct cds_lfht_node *node, const void *key)
248{
249 struct channel_key *channel_key = (struct channel_key *) key;
250 struct channel_state_sample *sample;
251
252 sample = caa_container_of(node, struct channel_state_sample,
253 channel_state_ht_node);
254
255 return !!((channel_key->key == sample->key.key) &&
256 (channel_key->domain == sample->key.domain));
257}
258
259static
260int match_channel_info(struct cds_lfht_node *node, const void *key)
261{
262 struct channel_key *channel_key = (struct channel_key *) key;
263 struct channel_info *channel_info;
264
265 channel_info = caa_container_of(node, struct channel_info,
266 channels_ht_node);
267
268 return !!((channel_key->key == channel_info->key.key) &&
269 (channel_key->domain == channel_info->key.domain));
270}
271
272static
242388e4 273int match_trigger(struct cds_lfht_node *node, const void *key)
ab0ee2ca 274{
242388e4
JR
275 struct lttng_trigger *trigger_key = (struct lttng_trigger *) key;
276 struct lttng_trigger_ht_element *trigger_ht_element;
ab0ee2ca 277
242388e4 278 trigger_ht_element = caa_container_of(node, struct lttng_trigger_ht_element,
ab0ee2ca 279 node);
ab0ee2ca 280
242388e4 281 return !!lttng_trigger_is_equal(trigger_key, trigger_ht_element->trigger);
ab0ee2ca
JG
282}
283
e7c93cf9
JR
284static
285int match_trigger_token(struct cds_lfht_node *node, const void *key)
286{
287 const uint64_t *_key = key;
288 struct notification_trigger_tokens_ht_element *element;
289
290 element = caa_container_of(node,
291 struct notification_trigger_tokens_ht_element, node);
292 return *_key == element->token;
293}
294
ab0ee2ca
JG
295static
296int match_client_list_condition(struct cds_lfht_node *node, const void *key)
297{
298 struct lttng_condition *condition_key = (struct lttng_condition *) key;
299 struct notification_client_list *client_list;
f82f93a1 300 const struct lttng_condition *condition;
ab0ee2ca
JG
301
302 assert(condition_key);
303
304 client_list = caa_container_of(node, struct notification_client_list,
505b2d90 305 notification_trigger_clients_ht_node);
f82f93a1 306 condition = lttng_trigger_get_const_condition(client_list->trigger);
ab0ee2ca
JG
307
308 return !!lttng_condition_is_equal(condition_key, condition);
309}
310
f82f93a1
JG
311static
312int match_session(struct cds_lfht_node *node, const void *key)
313{
314 const char *name = key;
315 struct session_info *session_info = caa_container_of(
316 node, struct session_info, sessions_ht_node);
317
318 return !strcmp(session_info->name, name);
319}
320
242388e4
JR
321/*
322 * Match trigger based on name and credentials only.
323 * Name duplication is NOT allowed for the same uid.
324 */
325static
326int match_trigger_by_name_uid(struct cds_lfht_node *node,
327 const void *key)
328{
329 bool match = false;
330 const char *name;
331 const char *key_name;
332 enum lttng_trigger_status status;
333 const struct lttng_credentials *key_creds;
334 const struct lttng_credentials *node_creds;
335 const struct lttng_trigger *trigger_key =
336 (const struct lttng_trigger *) key;
337 const struct lttng_trigger_ht_element *trigger_ht_element =
338 caa_container_of(node,
339 struct lttng_trigger_ht_element,
340 node_by_name_uid);
341
342 status = lttng_trigger_get_name(trigger_ht_element->trigger, &name);
343 assert(status == LTTNG_TRIGGER_STATUS_OK);
344
345 status = lttng_trigger_get_name(trigger_key, &key_name);
346 assert(status == LTTNG_TRIGGER_STATUS_OK);
347
348 /* Compare the names. */
349 if (strcmp(name, key_name) != 0) {
350 goto end;
351 }
352
353 /* Compare the owners' UIDs. */
354 key_creds = lttng_trigger_get_credentials(trigger_key);
355 node_creds = lttng_trigger_get_credentials(trigger_ht_element->trigger);
356
357 match = lttng_credentials_is_equal_uid(key_creds, node_creds);
358
359end:
360 return match;
361}
362
363/*
364 * Hash trigger based on name and credentials only.
365 */
366static
367unsigned long hash_trigger_by_name_uid(const struct lttng_trigger *trigger)
368{
369 unsigned long hash = 0;
370 const struct lttng_credentials *trigger_creds;
371 const char *trigger_name;
372 enum lttng_trigger_status status;
373
374 status = lttng_trigger_get_name(trigger, &trigger_name);
375 if (status == LTTNG_TRIGGER_STATUS_OK) {
376 hash = hash_key_str(trigger_name, lttng_ht_seed);
377 }
378
379 trigger_creds = lttng_trigger_get_credentials(trigger);
380 hash ^= hash_key_ulong((void *) (unsigned long) LTTNG_OPTIONAL_GET(trigger_creds->uid),
381 lttng_ht_seed);
382
383 return hash;
384}
385
e4db5ace
JR
386static
387unsigned long hash_channel_key(struct channel_key *key)
388{
389 unsigned long key_hash = hash_key_u64(&key->key, lttng_ht_seed);
390 unsigned long domain_hash = hash_key_ulong(
391 (void *) (unsigned long) key->domain, lttng_ht_seed);
392
393 return key_hash ^ domain_hash;
394}
395
ac1889bf
JG
396static
397unsigned long hash_client_socket(int socket)
398{
399 return hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed);
400}
401
402static
403unsigned long hash_client_id(notification_client_id id)
404{
405 return hash_key_u64(&id, lttng_ht_seed);
406}
407
f82f93a1
JG
408/*
409 * Get the type of object to which a given condition applies. Bindings let
410 * the notification system evaluate a trigger's condition when a given
411 * object's state is updated.
412 *
413 * For instance, a condition bound to a channel will be evaluated everytime
414 * the channel's state is changed by a channel monitoring sample.
415 */
416static
417enum lttng_object_type get_condition_binding_object(
418 const struct lttng_condition *condition)
419{
420 switch (lttng_condition_get_type(condition)) {
421 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
422 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
423 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
e742b055 424 return LTTNG_OBJECT_TYPE_CHANNEL;
f82f93a1
JG
425 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
426 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
427 return LTTNG_OBJECT_TYPE_SESSION;
959e3c66
JR
428 case LTTNG_CONDITION_TYPE_EVENT_RULE_HIT:
429 return LTTNG_OBJECT_TYPE_NONE;
f82f93a1
JG
430 default:
431 return LTTNG_OBJECT_TYPE_UNKNOWN;
432 }
433}
434
83b934ad
MD
435static
436void free_channel_info_rcu(struct rcu_head *node)
437{
438 free(caa_container_of(node, struct channel_info, rcu_node));
439}
440
ab0ee2ca
JG
441static
442void channel_info_destroy(struct channel_info *channel_info)
443{
444 if (!channel_info) {
445 return;
446 }
447
8abe313a
JG
448 if (channel_info->session_info) {
449 session_info_remove_channel(channel_info->session_info,
450 channel_info);
451 session_info_put(channel_info->session_info);
ab0ee2ca 452 }
8abe313a
JG
453 if (channel_info->name) {
454 free(channel_info->name);
ab0ee2ca 455 }
83b934ad
MD
456 call_rcu(&channel_info->rcu_node, free_channel_info_rcu);
457}
458
459static
460void free_session_info_rcu(struct rcu_head *node)
461{
462 free(caa_container_of(node, struct session_info, rcu_node));
ab0ee2ca
JG
463}
464
8abe313a 465/* Don't call directly, use the ref-counting mechanism. */
ab0ee2ca 466static
8abe313a 467void session_info_destroy(void *_data)
ab0ee2ca 468{
8abe313a 469 struct session_info *session_info = _data;
3b68d0a3 470 int ret;
ab0ee2ca 471
8abe313a
JG
472 assert(session_info);
473 if (session_info->channel_infos_ht) {
3b68d0a3
JR
474 ret = cds_lfht_destroy(session_info->channel_infos_ht, NULL);
475 if (ret) {
4c3f302b 476 ERR("[notification-thread] Failed to destroy channel information hash table");
3b68d0a3 477 }
8abe313a 478 }
ea9a44f0 479 lttng_session_trigger_list_destroy(session_info->trigger_list);
1eee26c5
JG
480
481 rcu_read_lock();
482 cds_lfht_del(session_info->sessions_ht,
483 &session_info->sessions_ht_node);
484 rcu_read_unlock();
8abe313a 485 free(session_info->name);
83b934ad 486 call_rcu(&session_info->rcu_node, free_session_info_rcu);
8abe313a 487}
ab0ee2ca 488
8abe313a
JG
489static
490void session_info_get(struct session_info *session_info)
491{
492 if (!session_info) {
493 return;
494 }
495 lttng_ref_get(&session_info->ref);
496}
497
498static
499void session_info_put(struct session_info *session_info)
500{
501 if (!session_info) {
502 return;
ab0ee2ca 503 }
8abe313a
JG
504 lttng_ref_put(&session_info->ref);
505}
506
507static
ea9a44f0 508struct session_info *session_info_create(const char *name, uid_t uid, gid_t gid,
1eee26c5
JG
509 struct lttng_session_trigger_list *trigger_list,
510 struct cds_lfht *sessions_ht)
8abe313a
JG
511{
512 struct session_info *session_info;
ab0ee2ca 513
8abe313a 514 assert(name);
ab0ee2ca 515
8abe313a
JG
516 session_info = zmalloc(sizeof(*session_info));
517 if (!session_info) {
518 goto end;
519 }
520 lttng_ref_init(&session_info->ref, session_info_destroy);
521
522 session_info->channel_infos_ht = cds_lfht_new(DEFAULT_HT_SIZE,
523 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
524 if (!session_info->channel_infos_ht) {
ab0ee2ca
JG
525 goto error;
526 }
8abe313a
JG
527
528 cds_lfht_node_init(&session_info->sessions_ht_node);
529 session_info->name = strdup(name);
530 if (!session_info->name) {
ab0ee2ca
JG
531 goto error;
532 }
8abe313a
JG
533 session_info->uid = uid;
534 session_info->gid = gid;
ea9a44f0 535 session_info->trigger_list = trigger_list;
1eee26c5 536 session_info->sessions_ht = sessions_ht;
8abe313a
JG
537end:
538 return session_info;
539error:
540 session_info_put(session_info);
541 return NULL;
542}
543
544static
545void session_info_add_channel(struct session_info *session_info,
546 struct channel_info *channel_info)
547{
548 rcu_read_lock();
549 cds_lfht_add(session_info->channel_infos_ht,
550 hash_channel_key(&channel_info->key),
551 &channel_info->session_info_channels_ht_node);
552 rcu_read_unlock();
553}
554
555static
556void session_info_remove_channel(struct session_info *session_info,
557 struct channel_info *channel_info)
558{
559 rcu_read_lock();
560 cds_lfht_del(session_info->channel_infos_ht,
561 &channel_info->session_info_channels_ht_node);
562 rcu_read_unlock();
563}
564
565static
566struct channel_info *channel_info_create(const char *channel_name,
567 struct channel_key *channel_key, uint64_t channel_capacity,
568 struct session_info *session_info)
569{
570 struct channel_info *channel_info = zmalloc(sizeof(*channel_info));
571
572 if (!channel_info) {
573 goto end;
574 }
575
ab0ee2ca 576 cds_lfht_node_init(&channel_info->channels_ht_node);
8abe313a
JG
577 cds_lfht_node_init(&channel_info->session_info_channels_ht_node);
578 memcpy(&channel_info->key, channel_key, sizeof(*channel_key));
579 channel_info->capacity = channel_capacity;
580
581 channel_info->name = strdup(channel_name);
582 if (!channel_info->name) {
583 goto error;
584 }
585
586 /*
587 * Set the references between session and channel infos:
588 * - channel_info holds a strong reference to session_info
589 * - session_info holds a weak reference to channel_info
590 */
591 session_info_get(session_info);
592 session_info_add_channel(session_info, channel_info);
593 channel_info->session_info = session_info;
ab0ee2ca 594end:
8abe313a 595 return channel_info;
ab0ee2ca 596error:
8abe313a 597 channel_info_destroy(channel_info);
ab0ee2ca
JG
598 return NULL;
599}
600
f2b3ef9f 601LTTNG_HIDDEN
505b2d90
JG
602bool notification_client_list_get(struct notification_client_list *list)
603{
604 return urcu_ref_get_unless_zero(&list->ref);
605}
606
607static
608void free_notification_client_list_rcu(struct rcu_head *node)
609{
610 free(caa_container_of(node, struct notification_client_list,
611 rcu_node));
612}
613
614static
615void notification_client_list_release(struct urcu_ref *list_ref)
616{
617 struct notification_client_list *list =
618 container_of(list_ref, typeof(*list), ref);
619 struct notification_client_list_element *client_list_element, *tmp;
620
621 if (list->notification_trigger_clients_ht) {
622 rcu_read_lock();
623 cds_lfht_del(list->notification_trigger_clients_ht,
624 &list->notification_trigger_clients_ht_node);
625 rcu_read_unlock();
626 list->notification_trigger_clients_ht = NULL;
627 }
628 cds_list_for_each_entry_safe(client_list_element, tmp,
629 &list->list, node) {
630 free(client_list_element);
631 }
632 pthread_mutex_destroy(&list->lock);
633 call_rcu(&list->rcu_node, free_notification_client_list_rcu);
634}
635
636static
637struct notification_client_list *notification_client_list_create(
638 const struct lttng_trigger *trigger)
639{
640 struct notification_client_list *client_list =
641 zmalloc(sizeof(*client_list));
642
643 if (!client_list) {
644 goto error;
645 }
646 pthread_mutex_init(&client_list->lock, NULL);
647 urcu_ref_init(&client_list->ref);
648 cds_lfht_node_init(&client_list->notification_trigger_clients_ht_node);
649 CDS_INIT_LIST_HEAD(&client_list->list);
650 client_list->trigger = trigger;
651error:
652 return client_list;
653}
654
655static
656void publish_notification_client_list(
657 struct notification_thread_state *state,
658 struct notification_client_list *list)
659{
660 const struct lttng_condition *condition =
661 lttng_trigger_get_const_condition(list->trigger);
662
663 assert(!list->notification_trigger_clients_ht);
f2b3ef9f 664 notification_client_list_get(list);
505b2d90
JG
665
666 list->notification_trigger_clients_ht =
667 state->notification_trigger_clients_ht;
668
669 rcu_read_lock();
670 cds_lfht_add(state->notification_trigger_clients_ht,
671 lttng_condition_hash(condition),
672 &list->notification_trigger_clients_ht_node);
673 rcu_read_unlock();
674}
675
f2b3ef9f 676LTTNG_HIDDEN
505b2d90
JG
677void notification_client_list_put(struct notification_client_list *list)
678{
679 if (!list) {
680 return;
681 }
682 return urcu_ref_put(&list->ref, notification_client_list_release);
683}
684
685/* Provides a reference to the returned list. */
ed327204
JG
686static
687struct notification_client_list *get_client_list_from_condition(
688 struct notification_thread_state *state,
689 const struct lttng_condition *condition)
690{
691 struct cds_lfht_node *node;
692 struct cds_lfht_iter iter;
505b2d90 693 struct notification_client_list *list = NULL;
ed327204 694
505b2d90 695 rcu_read_lock();
ed327204
JG
696 cds_lfht_lookup(state->notification_trigger_clients_ht,
697 lttng_condition_hash(condition),
698 match_client_list_condition,
699 condition,
700 &iter);
701 node = cds_lfht_iter_get_node(&iter);
505b2d90
JG
702 if (node) {
703 list = container_of(node, struct notification_client_list,
704 notification_trigger_clients_ht_node);
705 list = notification_client_list_get(list) ? list : NULL;
706 }
ed327204 707
505b2d90
JG
708 rcu_read_unlock();
709 return list;
ed327204
JG
710}
711
e4db5ace 712static
f82f93a1
JG
713int evaluate_channel_condition_for_client(
714 const struct lttng_condition *condition,
715 struct notification_thread_state *state,
716 struct lttng_evaluation **evaluation,
717 uid_t *session_uid, gid_t *session_gid)
e4db5ace
JR
718{
719 int ret;
720 struct cds_lfht_iter iter;
721 struct cds_lfht_node *node;
722 struct channel_info *channel_info = NULL;
723 struct channel_key *channel_key = NULL;
724 struct channel_state_sample *last_sample = NULL;
725 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
e4db5ace 726
505b2d90
JG
727 rcu_read_lock();
728
f82f93a1 729 /* Find the channel associated with the condition. */
e4db5ace 730 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter,
f82f93a1 731 channel_trigger_list, channel_triggers_ht_node) {
e4db5ace
JR
732 struct lttng_trigger_list_element *element;
733
734 cds_list_for_each_entry(element, &channel_trigger_list->list, node) {
9b63a4aa 735 const struct lttng_condition *current_condition =
f82f93a1 736 lttng_trigger_get_const_condition(
e4db5ace
JR
737 element->trigger);
738
739 assert(current_condition);
740 if (!lttng_condition_is_equal(condition,
2ae99f0b 741 current_condition)) {
e4db5ace
JR
742 continue;
743 }
744
745 /* Found the trigger, save the channel key. */
746 channel_key = &channel_trigger_list->channel_key;
747 break;
748 }
749 if (channel_key) {
750 /* The channel key was found stop iteration. */
751 break;
752 }
753 }
754
755 if (!channel_key){
756 /* No channel found; normal exit. */
f82f93a1 757 DBG("[notification-thread] No known channel associated with newly subscribed-to condition");
e4db5ace
JR
758 ret = 0;
759 goto end;
760 }
761
762 /* Fetch channel info for the matching channel. */
763 cds_lfht_lookup(state->channels_ht,
764 hash_channel_key(channel_key),
765 match_channel_info,
766 channel_key,
767 &iter);
768 node = cds_lfht_iter_get_node(&iter);
769 assert(node);
770 channel_info = caa_container_of(node, struct channel_info,
771 channels_ht_node);
772
773 /* Retrieve the channel's last sample, if it exists. */
774 cds_lfht_lookup(state->channel_state_ht,
775 hash_channel_key(channel_key),
776 match_channel_state_sample,
777 channel_key,
778 &iter);
779 node = cds_lfht_iter_get_node(&iter);
780 if (node) {
781 last_sample = caa_container_of(node,
782 struct channel_state_sample,
783 channel_state_ht_node);
784 } else {
785 /* Nothing to evaluate, no sample was ever taken. Normal exit */
786 DBG("[notification-thread] No channel sample associated with newly subscribed-to condition");
787 ret = 0;
788 goto end;
789 }
790
f82f93a1 791 ret = evaluate_buffer_condition(condition, evaluation, state,
e8360425
JD
792 NULL, last_sample,
793 0, channel_info->session_info->consumed_data_size,
794 channel_info);
e4db5ace 795 if (ret) {
066a3c82 796 WARN("[notification-thread] Fatal error occurred while evaluating a newly subscribed-to condition");
e4db5ace
JR
797 goto end;
798 }
799
f82f93a1
JG
800 *session_uid = channel_info->session_info->uid;
801 *session_gid = channel_info->session_info->gid;
802end:
505b2d90 803 rcu_read_unlock();
f82f93a1
JG
804 return ret;
805}
806
807static
808const char *get_condition_session_name(const struct lttng_condition *condition)
809{
810 const char *session_name = NULL;
811 enum lttng_condition_status status;
812
813 switch (lttng_condition_get_type(condition)) {
814 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
815 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
816 status = lttng_condition_buffer_usage_get_session_name(
817 condition, &session_name);
818 break;
819 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
820 status = lttng_condition_session_consumed_size_get_session_name(
821 condition, &session_name);
822 break;
823 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
824 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
825 status = lttng_condition_session_rotation_get_session_name(
826 condition, &session_name);
827 break;
828 default:
829 abort();
830 }
831 if (status != LTTNG_CONDITION_STATUS_OK) {
832 ERR("[notification-thread] Failed to retrieve session rotation condition's session name");
833 goto end;
834 }
835end:
836 return session_name;
837}
838
f82f93a1
JG
839static
840int evaluate_session_condition_for_client(
841 const struct lttng_condition *condition,
842 struct notification_thread_state *state,
843 struct lttng_evaluation **evaluation,
844 uid_t *session_uid, gid_t *session_gid)
845{
846 int ret;
847 struct cds_lfht_iter iter;
848 struct cds_lfht_node *node;
849 const char *session_name;
850 struct session_info *session_info = NULL;
851
505b2d90 852 rcu_read_lock();
f82f93a1
JG
853 session_name = get_condition_session_name(condition);
854
855 /* Find the session associated with the trigger. */
856 cds_lfht_lookup(state->sessions_ht,
857 hash_key_str(session_name, lttng_ht_seed),
858 match_session,
859 session_name,
860 &iter);
861 node = cds_lfht_iter_get_node(&iter);
862 if (!node) {
863 DBG("[notification-thread] No known session matching name \"%s\"",
864 session_name);
865 ret = 0;
866 goto end;
867 }
868
869 session_info = caa_container_of(node, struct session_info,
870 sessions_ht_node);
871 session_info_get(session_info);
872
873 /*
874 * Evaluation is performed in-line here since only one type of
875 * session-bound condition is handled for the moment.
876 */
877 switch (lttng_condition_get_type(condition)) {
878 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
879 if (!session_info->rotation.ongoing) {
be558f88 880 ret = 0;
f82f93a1
JG
881 goto end_session_put;
882 }
883
884 *evaluation = lttng_evaluation_session_rotation_ongoing_create(
885 session_info->rotation.id);
886 if (!*evaluation) {
887 /* Fatal error. */
888 ERR("[notification-thread] Failed to create session rotation ongoing evaluation for session \"%s\"",
889 session_info->name);
890 ret = -1;
891 goto end_session_put;
892 }
893 ret = 0;
894 break;
895 default:
896 ret = 0;
897 goto end_session_put;
898 }
899
900 *session_uid = session_info->uid;
901 *session_gid = session_info->gid;
902
903end_session_put:
904 session_info_put(session_info);
905end:
505b2d90 906 rcu_read_unlock();
f82f93a1
JG
907 return ret;
908}
909
f82f93a1
JG
910static
911int evaluate_condition_for_client(const struct lttng_trigger *trigger,
912 const struct lttng_condition *condition,
913 struct notification_client *client,
914 struct notification_thread_state *state)
915{
916 int ret;
917 struct lttng_evaluation *evaluation = NULL;
505b2d90
JG
918 struct notification_client_list client_list = {
919 .lock = PTHREAD_MUTEX_INITIALIZER,
920 };
f82f93a1
JG
921 struct notification_client_list_element client_list_element = { 0 };
922 uid_t object_uid = 0;
923 gid_t object_gid = 0;
924
925 assert(trigger);
926 assert(condition);
927 assert(client);
928 assert(state);
929
930 switch (get_condition_binding_object(condition)) {
931 case LTTNG_OBJECT_TYPE_SESSION:
932 ret = evaluate_session_condition_for_client(condition, state,
933 &evaluation, &object_uid, &object_gid);
934 break;
935 case LTTNG_OBJECT_TYPE_CHANNEL:
936 ret = evaluate_channel_condition_for_client(condition, state,
937 &evaluation, &object_uid, &object_gid);
938 break;
939 case LTTNG_OBJECT_TYPE_NONE:
7e802d0a 940 DBG("[notification-thread] Newly subscribed-to condition not bound to object, nothing to evaluate");
f82f93a1
JG
941 ret = 0;
942 goto end;
943 case LTTNG_OBJECT_TYPE_UNKNOWN:
944 default:
945 ret = -1;
946 goto end;
947 }
af0c318d
JG
948 if (ret) {
949 /* Fatal error. */
950 goto end;
951 }
e4db5ace
JR
952 if (!evaluation) {
953 /* Evaluation yielded nothing. Normal exit. */
954 DBG("[notification-thread] Newly subscribed-to condition evaluated to false, nothing to report to client");
955 ret = 0;
956 goto end;
957 }
958
959 /*
960 * Create a temporary client list with the client currently
961 * subscribing.
962 */
505b2d90 963 cds_lfht_node_init(&client_list.notification_trigger_clients_ht_node);
e4db5ace
JR
964 CDS_INIT_LIST_HEAD(&client_list.list);
965 client_list.trigger = trigger;
966
967 CDS_INIT_LIST_HEAD(&client_list_element.node);
968 client_list_element.client = client;
969 cds_list_add(&client_list_element.node, &client_list.list);
970
971 /* Send evaluation result to the newly-subscribed client. */
972 DBG("[notification-thread] Newly subscribed-to condition evaluated to true, notifying client");
973 ret = send_evaluation_to_clients(trigger, evaluation, &client_list,
f82f93a1 974 state, object_uid, object_gid);
e4db5ace
JR
975
976end:
977 return ret;
978}
979
ab0ee2ca
JG
980static
981int notification_thread_client_subscribe(struct notification_client *client,
982 struct lttng_condition *condition,
983 struct notification_thread_state *state,
984 enum lttng_notification_channel_status *_status)
985{
986 int ret = 0;
505b2d90 987 struct notification_client_list *client_list = NULL;
ab0ee2ca
JG
988 struct lttng_condition_list_element *condition_list_element = NULL;
989 struct notification_client_list_element *client_list_element = NULL;
990 enum lttng_notification_channel_status status =
991 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
992
993 /*
994 * Ensure that the client has not already subscribed to this condition
995 * before.
996 */
997 cds_list_for_each_entry(condition_list_element, &client->condition_list, node) {
998 if (lttng_condition_is_equal(condition_list_element->condition,
999 condition)) {
1000 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ALREADY_SUBSCRIBED;
1001 goto end;
1002 }
1003 }
1004
1005 condition_list_element = zmalloc(sizeof(*condition_list_element));
1006 if (!condition_list_element) {
1007 ret = -1;
1008 goto error;
1009 }
1010 client_list_element = zmalloc(sizeof(*client_list_element));
1011 if (!client_list_element) {
1012 ret = -1;
1013 goto error;
1014 }
1015
ab0ee2ca
JG
1016 /*
1017 * Add the newly-subscribed condition to the client's subscription list.
1018 */
1019 CDS_INIT_LIST_HEAD(&condition_list_element->node);
1020 condition_list_element->condition = condition;
1021 cds_list_add(&condition_list_element->node, &client->condition_list);
1022
ed327204
JG
1023 client_list = get_client_list_from_condition(state, condition);
1024 if (!client_list) {
2ae99f0b
JG
1025 /*
1026 * No notification-emiting trigger registered with this
1027 * condition. We don't evaluate the condition right away
1028 * since this trigger is not registered yet.
1029 */
4fb43b68 1030 free(client_list_element);
505b2d90 1031 goto end;
ab0ee2ca
JG
1032 }
1033
2ae99f0b
JG
1034 /*
1035 * The condition to which the client just subscribed is evaluated
1036 * at this point so that conditions that are already TRUE result
1037 * in a notification being sent out.
505b2d90
JG
1038 *
1039 * The client_list's trigger is used without locking the list itself.
1040 * This is correct since the list doesn't own the trigger and the
1041 * object is immutable.
2ae99f0b 1042 */
e4db5ace
JR
1043 if (evaluate_condition_for_client(client_list->trigger, condition,
1044 client, state)) {
1045 WARN("[notification-thread] Evaluation of a condition on client subscription failed, aborting.");
1046 ret = -1;
4bd5b4af 1047 free(client_list_element);
505b2d90 1048 goto end;
e4db5ace
JR
1049 }
1050
1051 /*
1052 * Add the client to the list of clients interested in a given trigger
1053 * if a "notification" trigger with a corresponding condition was
1054 * added prior.
1055 */
ab0ee2ca
JG
1056 client_list_element->client = client;
1057 CDS_INIT_LIST_HEAD(&client_list_element->node);
505b2d90
JG
1058
1059 pthread_mutex_lock(&client_list->lock);
ab0ee2ca 1060 cds_list_add(&client_list_element->node, &client_list->list);
505b2d90 1061 pthread_mutex_unlock(&client_list->lock);
ab0ee2ca
JG
1062end:
1063 if (_status) {
1064 *_status = status;
1065 }
505b2d90
JG
1066 if (client_list) {
1067 notification_client_list_put(client_list);
1068 }
ab0ee2ca
JG
1069 return ret;
1070error:
1071 free(condition_list_element);
1072 free(client_list_element);
1073 return ret;
1074}
1075
1076static
1077int notification_thread_client_unsubscribe(
1078 struct notification_client *client,
1079 struct lttng_condition *condition,
1080 struct notification_thread_state *state,
1081 enum lttng_notification_channel_status *_status)
1082{
ab0ee2ca
JG
1083 struct notification_client_list *client_list;
1084 struct lttng_condition_list_element *condition_list_element,
1085 *condition_tmp;
1086 struct notification_client_list_element *client_list_element,
1087 *client_tmp;
1088 bool condition_found = false;
1089 enum lttng_notification_channel_status status =
1090 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
1091
1092 /* Remove the condition from the client's condition list. */
1093 cds_list_for_each_entry_safe(condition_list_element, condition_tmp,
1094 &client->condition_list, node) {
1095 if (!lttng_condition_is_equal(condition_list_element->condition,
1096 condition)) {
1097 continue;
1098 }
1099
1100 cds_list_del(&condition_list_element->node);
1101 /*
1102 * The caller may be iterating on the client's conditions to
1103 * tear down a client's connection. In this case, the condition
1104 * will be destroyed at the end.
1105 */
1106 if (condition != condition_list_element->condition) {
1107 lttng_condition_destroy(
1108 condition_list_element->condition);
1109 }
1110 free(condition_list_element);
1111 condition_found = true;
1112 break;
1113 }
1114
1115 if (!condition_found) {
1116 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNKNOWN_CONDITION;
1117 goto end;
1118 }
1119
1120 /*
1121 * Remove the client from the list of clients interested the trigger
1122 * matching the condition.
1123 */
ed327204
JG
1124 client_list = get_client_list_from_condition(state, condition);
1125 if (!client_list) {
505b2d90 1126 goto end;
ab0ee2ca
JG
1127 }
1128
505b2d90 1129 pthread_mutex_lock(&client_list->lock);
ab0ee2ca
JG
1130 cds_list_for_each_entry_safe(client_list_element, client_tmp,
1131 &client_list->list, node) {
505b2d90 1132 if (client_list_element->client->id != client->id) {
ab0ee2ca
JG
1133 continue;
1134 }
1135 cds_list_del(&client_list_element->node);
1136 free(client_list_element);
1137 break;
1138 }
505b2d90
JG
1139 pthread_mutex_unlock(&client_list->lock);
1140 notification_client_list_put(client_list);
1141 client_list = NULL;
ab0ee2ca
JG
1142end:
1143 lttng_condition_destroy(condition);
1144 if (_status) {
1145 *_status = status;
1146 }
1147 return 0;
1148}
1149
83b934ad
MD
1150static
1151void free_notification_client_rcu(struct rcu_head *node)
1152{
1153 free(caa_container_of(node, struct notification_client, rcu_node));
1154}
1155
ab0ee2ca
JG
1156static
1157void notification_client_destroy(struct notification_client *client,
1158 struct notification_thread_state *state)
1159{
ab0ee2ca
JG
1160 if (!client) {
1161 return;
1162 }
1163
505b2d90
JG
1164 /*
1165 * The client object is not reachable by other threads, no need to lock
1166 * the client here.
1167 */
ab0ee2ca
JG
1168 if (client->socket >= 0) {
1169 (void) lttcomm_close_unix_sock(client->socket);
ac1889bf 1170 client->socket = -1;
ab0ee2ca 1171 }
505b2d90 1172 client->communication.active = false;
882093ee
JR
1173 lttng_payload_reset(&client->communication.inbound.payload);
1174 lttng_payload_reset(&client->communication.outbound.payload);
505b2d90 1175 pthread_mutex_destroy(&client->lock);
83b934ad 1176 call_rcu(&client->rcu_node, free_notification_client_rcu);
ab0ee2ca
JG
1177}
1178
1179/*
1180 * Call with rcu_read_lock held (and hold for the lifetime of the returned
1181 * client pointer).
1182 */
1183static
1184struct notification_client *get_client_from_socket(int socket,
1185 struct notification_thread_state *state)
1186{
1187 struct cds_lfht_iter iter;
1188 struct cds_lfht_node *node;
1189 struct notification_client *client = NULL;
1190
1191 cds_lfht_lookup(state->client_socket_ht,
ac1889bf
JG
1192 hash_client_socket(socket),
1193 match_client_socket,
ab0ee2ca
JG
1194 (void *) (unsigned long) socket,
1195 &iter);
1196 node = cds_lfht_iter_get_node(&iter);
1197 if (!node) {
1198 goto end;
1199 }
1200
1201 client = caa_container_of(node, struct notification_client,
1202 client_socket_ht_node);
1203end:
1204 return client;
1205}
1206
f2b3ef9f
JG
1207/*
1208 * Call with rcu_read_lock held (and hold for the lifetime of the returned
1209 * client pointer).
1210 */
1211static
1212struct notification_client *get_client_from_id(notification_client_id id,
1213 struct notification_thread_state *state)
1214{
1215 struct cds_lfht_iter iter;
1216 struct cds_lfht_node *node;
1217 struct notification_client *client = NULL;
1218
1219 cds_lfht_lookup(state->client_id_ht,
1220 hash_client_id(id),
1221 match_client_id,
1222 &id,
1223 &iter);
1224 node = cds_lfht_iter_get_node(&iter);
1225 if (!node) {
1226 goto end;
1227 }
1228
1229 client = caa_container_of(node, struct notification_client,
1230 client_id_ht_node);
1231end:
1232 return client;
1233}
1234
ab0ee2ca 1235static
e8360425 1236bool buffer_usage_condition_applies_to_channel(
51eab943
JG
1237 const struct lttng_condition *condition,
1238 const struct channel_info *channel_info)
ab0ee2ca
JG
1239{
1240 enum lttng_condition_status status;
e8360425
JD
1241 enum lttng_domain_type condition_domain;
1242 const char *condition_session_name = NULL;
1243 const char *condition_channel_name = NULL;
ab0ee2ca 1244
e8360425
JD
1245 status = lttng_condition_buffer_usage_get_domain_type(condition,
1246 &condition_domain);
1247 assert(status == LTTNG_CONDITION_STATUS_OK);
1248 if (channel_info->key.domain != condition_domain) {
ab0ee2ca
JG
1249 goto fail;
1250 }
1251
e8360425
JD
1252 status = lttng_condition_buffer_usage_get_session_name(
1253 condition, &condition_session_name);
1254 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
1255
1256 status = lttng_condition_buffer_usage_get_channel_name(
1257 condition, &condition_channel_name);
1258 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_channel_name);
1259
1260 if (strcmp(channel_info->session_info->name, condition_session_name)) {
1261 goto fail;
1262 }
1263 if (strcmp(channel_info->name, condition_channel_name)) {
ab0ee2ca
JG
1264 goto fail;
1265 }
1266
e8360425
JD
1267 return true;
1268fail:
1269 return false;
1270}
1271
1272static
1273bool session_consumed_size_condition_applies_to_channel(
51eab943
JG
1274 const struct lttng_condition *condition,
1275 const struct channel_info *channel_info)
e8360425
JD
1276{
1277 enum lttng_condition_status status;
1278 const char *condition_session_name = NULL;
1279
1280 status = lttng_condition_session_consumed_size_get_session_name(
1281 condition, &condition_session_name);
1282 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
1283
1284 if (strcmp(channel_info->session_info->name, condition_session_name)) {
ab0ee2ca
JG
1285 goto fail;
1286 }
1287
e8360425
JD
1288 return true;
1289fail:
1290 return false;
1291}
ab0ee2ca 1292
51eab943
JG
1293static
1294bool trigger_applies_to_channel(const struct lttng_trigger *trigger,
1295 const struct channel_info *channel_info)
1296{
1297 const struct lttng_condition *condition;
e8360425 1298 bool trigger_applies;
ab0ee2ca 1299
51eab943 1300 condition = lttng_trigger_get_const_condition(trigger);
e8360425 1301 if (!condition) {
ab0ee2ca
JG
1302 goto fail;
1303 }
e8360425
JD
1304
1305 switch (lttng_condition_get_type(condition)) {
1306 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
1307 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
1308 trigger_applies = buffer_usage_condition_applies_to_channel(
1309 condition, channel_info);
1310 break;
1311 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
1312 trigger_applies = session_consumed_size_condition_applies_to_channel(
1313 condition, channel_info);
1314 break;
1315 default:
ab0ee2ca
JG
1316 goto fail;
1317 }
1318
e8360425 1319 return trigger_applies;
ab0ee2ca
JG
1320fail:
1321 return false;
1322}
1323
1324static
1325bool trigger_applies_to_client(struct lttng_trigger *trigger,
1326 struct notification_client *client)
1327{
1328 bool applies = false;
1329 struct lttng_condition_list_element *condition_list_element;
1330
1331 cds_list_for_each_entry(condition_list_element, &client->condition_list,
1332 node) {
1333 applies = lttng_condition_is_equal(
1334 condition_list_element->condition,
1335 lttng_trigger_get_condition(trigger));
1336 if (applies) {
1337 break;
1338 }
1339 }
1340 return applies;
1341}
1342
ed327204
JG
1343/* Must be called with RCU read lock held. */
1344static
1345struct lttng_session_trigger_list *get_session_trigger_list(
1346 struct notification_thread_state *state,
1347 const char *session_name)
1348{
1349 struct lttng_session_trigger_list *list = NULL;
1350 struct cds_lfht_node *node;
1351 struct cds_lfht_iter iter;
1352
1353 cds_lfht_lookup(state->session_triggers_ht,
1354 hash_key_str(session_name, lttng_ht_seed),
1355 match_session_trigger_list,
1356 session_name,
1357 &iter);
1358 node = cds_lfht_iter_get_node(&iter);
1359 if (!node) {
1360 /*
1361 * Not an error, the list of triggers applying to that session
1362 * will be initialized when the session is created.
1363 */
1364 DBG("[notification-thread] No trigger list found for session \"%s\" as it is not yet known to the notification system",
1365 session_name);
1366 goto end;
1367 }
1368
e742b055 1369 list = caa_container_of(node,
ed327204
JG
1370 struct lttng_session_trigger_list,
1371 session_triggers_ht_node);
1372end:
1373 return list;
1374}
1375
ea9a44f0
JG
1376/*
1377 * Allocate an empty lttng_session_trigger_list for the session named
1378 * 'session_name'.
1379 *
1380 * No ownership of 'session_name' is assumed by the session trigger list.
1381 * It is the caller's responsability to ensure the session name is alive
1382 * for as long as this list is.
1383 */
1384static
1385struct lttng_session_trigger_list *lttng_session_trigger_list_create(
1386 const char *session_name,
1387 struct cds_lfht *session_triggers_ht)
1388{
1389 struct lttng_session_trigger_list *list;
1390
1391 list = zmalloc(sizeof(*list));
1392 if (!list) {
1393 goto end;
1394 }
1395 list->session_name = session_name;
1396 CDS_INIT_LIST_HEAD(&list->list);
1397 cds_lfht_node_init(&list->session_triggers_ht_node);
1398 list->session_triggers_ht = session_triggers_ht;
1399
1400 rcu_read_lock();
1401 /* Publish the list through the session_triggers_ht. */
1402 cds_lfht_add(session_triggers_ht,
1403 hash_key_str(session_name, lttng_ht_seed),
1404 &list->session_triggers_ht_node);
1405 rcu_read_unlock();
1406end:
1407 return list;
1408}
1409
1410static
1411void free_session_trigger_list_rcu(struct rcu_head *node)
1412{
1413 free(caa_container_of(node, struct lttng_session_trigger_list,
1414 rcu_node));
1415}
1416
1417static
1418void lttng_session_trigger_list_destroy(struct lttng_session_trigger_list *list)
1419{
1420 struct lttng_trigger_list_element *trigger_list_element, *tmp;
1421
1422 /* Empty the list element by element, and then free the list itself. */
1423 cds_list_for_each_entry_safe(trigger_list_element, tmp,
1424 &list->list, node) {
1425 cds_list_del(&trigger_list_element->node);
1426 free(trigger_list_element);
1427 }
1428 rcu_read_lock();
1429 /* Unpublish the list from the session_triggers_ht. */
1430 cds_lfht_del(list->session_triggers_ht,
1431 &list->session_triggers_ht_node);
1432 rcu_read_unlock();
1433 call_rcu(&list->rcu_node, free_session_trigger_list_rcu);
1434}
1435
1436static
1437int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
f2b3ef9f 1438 struct lttng_trigger *trigger)
ea9a44f0
JG
1439{
1440 int ret = 0;
1441 struct lttng_trigger_list_element *new_element =
1442 zmalloc(sizeof(*new_element));
1443
1444 if (!new_element) {
1445 ret = -1;
1446 goto end;
1447 }
1448 CDS_INIT_LIST_HEAD(&new_element->node);
1449 new_element->trigger = trigger;
1450 cds_list_add(&new_element->node, &list->list);
1451end:
1452 return ret;
1453}
1454
1455static
1456bool trigger_applies_to_session(const struct lttng_trigger *trigger,
1457 const char *session_name)
1458{
1459 bool applies = false;
1460 const struct lttng_condition *condition;
1461
1462 condition = lttng_trigger_get_const_condition(trigger);
1463 switch (lttng_condition_get_type(condition)) {
1464 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
1465 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
1466 {
1467 enum lttng_condition_status condition_status;
1468 const char *condition_session_name;
1469
1470 condition_status = lttng_condition_session_rotation_get_session_name(
1471 condition, &condition_session_name);
1472 if (condition_status != LTTNG_CONDITION_STATUS_OK) {
1473 ERR("[notification-thread] Failed to retrieve session rotation condition's session name");
1474 goto end;
1475 }
1476
1477 assert(condition_session_name);
1478 applies = !strcmp(condition_session_name, session_name);
1479 break;
1480 }
1481 default:
1482 goto end;
1483 }
1484end:
1485 return applies;
1486}
1487
1488/*
1489 * Allocate and initialize an lttng_session_trigger_list which contains
1490 * all triggers that apply to the session named 'session_name'.
1491 *
1492 * No ownership of 'session_name' is assumed by the session trigger list.
1493 * It is the caller's responsability to ensure the session name is alive
1494 * for as long as this list is.
1495 */
1496static
1497struct lttng_session_trigger_list *lttng_session_trigger_list_build(
1498 const struct notification_thread_state *state,
1499 const char *session_name)
1500{
1501 int trigger_count = 0;
1502 struct lttng_session_trigger_list *session_trigger_list = NULL;
1503 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1504 struct cds_lfht_iter iter;
1505
1506 session_trigger_list = lttng_session_trigger_list_create(session_name,
1507 state->session_triggers_ht);
1508
1509 /* Add all triggers applying to the session named 'session_name'. */
1510 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1511 node) {
1512 int ret;
1513
1514 if (!trigger_applies_to_session(trigger_ht_element->trigger,
1515 session_name)) {
1516 continue;
1517 }
1518
1519 ret = lttng_session_trigger_list_add(session_trigger_list,
1520 trigger_ht_element->trigger);
1521 if (ret) {
1522 goto error;
1523 }
1524
1525 trigger_count++;
1526 }
1527
1528 DBG("[notification-thread] Found %i triggers that apply to newly created session",
1529 trigger_count);
1530 return session_trigger_list;
1531error:
1532 lttng_session_trigger_list_destroy(session_trigger_list);
1533 return NULL;
1534}
1535
8abe313a
JG
1536static
1537struct session_info *find_or_create_session_info(
1538 struct notification_thread_state *state,
1539 const char *name, uid_t uid, gid_t gid)
1540{
1541 struct session_info *session = NULL;
1542 struct cds_lfht_node *node;
1543 struct cds_lfht_iter iter;
ea9a44f0 1544 struct lttng_session_trigger_list *trigger_list;
8abe313a
JG
1545
1546 rcu_read_lock();
1547 cds_lfht_lookup(state->sessions_ht,
1548 hash_key_str(name, lttng_ht_seed),
1549 match_session,
1550 name,
1551 &iter);
1552 node = cds_lfht_iter_get_node(&iter);
1553 if (node) {
1554 DBG("[notification-thread] Found session info of session \"%s\" (uid = %i, gid = %i)",
1555 name, uid, gid);
1556 session = caa_container_of(node, struct session_info,
1557 sessions_ht_node);
1558 assert(session->uid == uid);
1559 assert(session->gid == gid);
1eee26c5
JG
1560 session_info_get(session);
1561 goto end;
ea9a44f0
JG
1562 }
1563
1564 trigger_list = lttng_session_trigger_list_build(state, name);
1565 if (!trigger_list) {
1566 goto error;
8abe313a
JG
1567 }
1568
1eee26c5
JG
1569 session = session_info_create(name, uid, gid, trigger_list,
1570 state->sessions_ht);
8abe313a
JG
1571 if (!session) {
1572 ERR("[notification-thread] Failed to allocation session info for session \"%s\" (uid = %i, gid = %i)",
1573 name, uid, gid);
b248644d 1574 lttng_session_trigger_list_destroy(trigger_list);
ea9a44f0 1575 goto error;
8abe313a 1576 }
ea9a44f0 1577 trigger_list = NULL;
577983cb
JG
1578
1579 cds_lfht_add(state->sessions_ht, hash_key_str(name, lttng_ht_seed),
9b63a4aa 1580 &session->sessions_ht_node);
1eee26c5 1581end:
8abe313a
JG
1582 rcu_read_unlock();
1583 return session;
ea9a44f0
JG
1584error:
1585 rcu_read_unlock();
1586 session_info_put(session);
1587 return NULL;
8abe313a
JG
1588}
1589
ab0ee2ca
JG
1590static
1591int handle_notification_thread_command_add_channel(
8abe313a
JG
1592 struct notification_thread_state *state,
1593 const char *session_name, uid_t session_uid, gid_t session_gid,
1594 const char *channel_name, enum lttng_domain_type channel_domain,
1595 uint64_t channel_key_int, uint64_t channel_capacity,
1596 enum lttng_error_code *cmd_result)
ab0ee2ca
JG
1597{
1598 struct cds_list_head trigger_list;
8abe313a
JG
1599 struct channel_info *new_channel_info = NULL;
1600 struct channel_key channel_key = {
1601 .key = channel_key_int,
1602 .domain = channel_domain,
1603 };
ab0ee2ca
JG
1604 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
1605 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1606 int trigger_count = 0;
1607 struct cds_lfht_iter iter;
8abe313a 1608 struct session_info *session_info = NULL;
ab0ee2ca
JG
1609
1610 DBG("[notification-thread] Adding channel %s from session %s, channel key = %" PRIu64 " in %s domain",
8abe313a
JG
1611 channel_name, session_name, channel_key_int,
1612 channel_domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
ab0ee2ca
JG
1613
1614 CDS_INIT_LIST_HEAD(&trigger_list);
1615
8abe313a
JG
1616 session_info = find_or_create_session_info(state, session_name,
1617 session_uid, session_gid);
1618 if (!session_info) {
a9577b76 1619 /* Allocation error or an internal error occurred. */
ab0ee2ca
JG
1620 goto error;
1621 }
1622
8abe313a
JG
1623 new_channel_info = channel_info_create(channel_name, &channel_key,
1624 channel_capacity, session_info);
1625 if (!new_channel_info) {
1626 goto error;
1627 }
ab0ee2ca 1628
83b934ad 1629 rcu_read_lock();
ab0ee2ca
JG
1630 /* Build a list of all triggers applying to the new channel. */
1631 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1632 node) {
1633 struct lttng_trigger_list_element *new_element;
1634
1635 if (!trigger_applies_to_channel(trigger_ht_element->trigger,
8abe313a 1636 new_channel_info)) {
ab0ee2ca
JG
1637 continue;
1638 }
1639
1640 new_element = zmalloc(sizeof(*new_element));
1641 if (!new_element) {
83b934ad 1642 rcu_read_unlock();
ab0ee2ca
JG
1643 goto error;
1644 }
1645 CDS_INIT_LIST_HEAD(&new_element->node);
1646 new_element->trigger = trigger_ht_element->trigger;
1647 cds_list_add(&new_element->node, &trigger_list);
1648 trigger_count++;
1649 }
83b934ad 1650 rcu_read_unlock();
ab0ee2ca
JG
1651
1652 DBG("[notification-thread] Found %i triggers that apply to newly added channel",
1653 trigger_count);
1654 channel_trigger_list = zmalloc(sizeof(*channel_trigger_list));
1655 if (!channel_trigger_list) {
1656 goto error;
1657 }
8abe313a 1658 channel_trigger_list->channel_key = new_channel_info->key;
ab0ee2ca
JG
1659 CDS_INIT_LIST_HEAD(&channel_trigger_list->list);
1660 cds_lfht_node_init(&channel_trigger_list->channel_triggers_ht_node);
1661 cds_list_splice(&trigger_list, &channel_trigger_list->list);
1662
1663 rcu_read_lock();
1664 /* Add channel to the channel_ht which owns the channel_infos. */
1665 cds_lfht_add(state->channels_ht,
8abe313a 1666 hash_channel_key(&new_channel_info->key),
ab0ee2ca
JG
1667 &new_channel_info->channels_ht_node);
1668 /*
1669 * Add the list of triggers associated with this channel to the
1670 * channel_triggers_ht.
1671 */
1672 cds_lfht_add(state->channel_triggers_ht,
8abe313a 1673 hash_channel_key(&new_channel_info->key),
ab0ee2ca
JG
1674 &channel_trigger_list->channel_triggers_ht_node);
1675 rcu_read_unlock();
1eee26c5 1676 session_info_put(session_info);
ab0ee2ca
JG
1677 *cmd_result = LTTNG_OK;
1678 return 0;
1679error:
ab0ee2ca 1680 channel_info_destroy(new_channel_info);
8abe313a 1681 session_info_put(session_info);
ab0ee2ca
JG
1682 return 1;
1683}
1684
83b934ad
MD
1685static
1686void free_channel_trigger_list_rcu(struct rcu_head *node)
1687{
1688 free(caa_container_of(node, struct lttng_channel_trigger_list,
1689 rcu_node));
1690}
1691
1692static
1693void free_channel_state_sample_rcu(struct rcu_head *node)
1694{
1695 free(caa_container_of(node, struct channel_state_sample,
1696 rcu_node));
1697}
1698
ab0ee2ca
JG
1699static
1700int handle_notification_thread_command_remove_channel(
1701 struct notification_thread_state *state,
1702 uint64_t channel_key, enum lttng_domain_type domain,
1703 enum lttng_error_code *cmd_result)
1704{
1705 struct cds_lfht_node *node;
1706 struct cds_lfht_iter iter;
1707 struct lttng_channel_trigger_list *trigger_list;
1708 struct lttng_trigger_list_element *trigger_list_element, *tmp;
1709 struct channel_key key = { .key = channel_key, .domain = domain };
1710 struct channel_info *channel_info;
1711
1712 DBG("[notification-thread] Removing channel key = %" PRIu64 " in %s domain",
1713 channel_key, domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
1714
1715 rcu_read_lock();
1716
1717 cds_lfht_lookup(state->channel_triggers_ht,
1718 hash_channel_key(&key),
1719 match_channel_trigger_list,
1720 &key,
1721 &iter);
1722 node = cds_lfht_iter_get_node(&iter);
1723 /*
1724 * There is a severe internal error if we are being asked to remove a
1725 * channel that doesn't exist.
1726 */
1727 if (!node) {
1728 ERR("[notification-thread] Channel being removed is unknown to the notification thread");
1729 goto end;
1730 }
1731
1732 /* Free the list of triggers associated with this channel. */
1733 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
1734 channel_triggers_ht_node);
1735 cds_list_for_each_entry_safe(trigger_list_element, tmp,
1736 &trigger_list->list, node) {
1737 cds_list_del(&trigger_list_element->node);
1738 free(trigger_list_element);
1739 }
1740 cds_lfht_del(state->channel_triggers_ht, node);
83b934ad 1741 call_rcu(&trigger_list->rcu_node, free_channel_trigger_list_rcu);
ab0ee2ca
JG
1742
1743 /* Free sampled channel state. */
1744 cds_lfht_lookup(state->channel_state_ht,
1745 hash_channel_key(&key),
1746 match_channel_state_sample,
1747 &key,
1748 &iter);
1749 node = cds_lfht_iter_get_node(&iter);
1750 /*
1751 * This is expected to be NULL if the channel is destroyed before we
1752 * received a sample.
1753 */
1754 if (node) {
1755 struct channel_state_sample *sample = caa_container_of(node,
1756 struct channel_state_sample,
1757 channel_state_ht_node);
1758
1759 cds_lfht_del(state->channel_state_ht, node);
83b934ad 1760 call_rcu(&sample->rcu_node, free_channel_state_sample_rcu);
ab0ee2ca
JG
1761 }
1762
1763 /* Remove the channel from the channels_ht and free it. */
1764 cds_lfht_lookup(state->channels_ht,
1765 hash_channel_key(&key),
1766 match_channel_info,
1767 &key,
1768 &iter);
1769 node = cds_lfht_iter_get_node(&iter);
1770 assert(node);
1771 channel_info = caa_container_of(node, struct channel_info,
1772 channels_ht_node);
1773 cds_lfht_del(state->channels_ht, node);
1774 channel_info_destroy(channel_info);
1775end:
1776 rcu_read_unlock();
1777 *cmd_result = LTTNG_OK;
1778 return 0;
1779}
1780
0ca52944 1781static
ed327204 1782int handle_notification_thread_command_session_rotation(
0ca52944 1783 struct notification_thread_state *state,
ed327204
JG
1784 enum notification_thread_command_type cmd_type,
1785 const char *session_name, uid_t session_uid, gid_t session_gid,
1786 uint64_t trace_archive_chunk_id,
1787 struct lttng_trace_archive_location *location,
1788 enum lttng_error_code *_cmd_result)
0ca52944 1789{
ed327204
JG
1790 int ret = 0;
1791 enum lttng_error_code cmd_result = LTTNG_OK;
1792 struct lttng_session_trigger_list *trigger_list;
1793 struct lttng_trigger_list_element *trigger_list_element;
1794 struct session_info *session_info;
f2b3ef9f 1795 const struct lttng_credentials session_creds = {
ff588497
JR
1796 .uid = LTTNG_OPTIONAL_INIT_VALUE(session_uid),
1797 .gid = LTTNG_OPTIONAL_INIT_VALUE(session_gid),
f2b3ef9f 1798 };
0ca52944 1799
ed327204
JG
1800 rcu_read_lock();
1801
1802 session_info = find_or_create_session_info(state, session_name,
1803 session_uid, session_gid);
1804 if (!session_info) {
a9577b76 1805 /* Allocation error or an internal error occurred. */
ed327204
JG
1806 ret = -1;
1807 cmd_result = LTTNG_ERR_NOMEM;
1808 goto end;
1809 }
1810
1811 session_info->rotation.ongoing =
1812 cmd_type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING;
1813 session_info->rotation.id = trace_archive_chunk_id;
1814 trigger_list = get_session_trigger_list(state, session_name);
1815 if (!trigger_list) {
1816 DBG("[notification-thread] No triggers applying to session \"%s\" found",
1817 session_name);
1818 goto end;
1819 }
1820
1821 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
1822 node) {
1823 const struct lttng_condition *condition;
f2b3ef9f 1824 struct lttng_trigger *trigger;
ed327204
JG
1825 struct notification_client_list *client_list;
1826 struct lttng_evaluation *evaluation = NULL;
1827 enum lttng_condition_type condition_type;
f2b3ef9f 1828 enum action_executor_status executor_status;
ed327204
JG
1829
1830 trigger = trigger_list_element->trigger;
1831 condition = lttng_trigger_get_const_condition(trigger);
1832 assert(condition);
1833 condition_type = lttng_condition_get_type(condition);
1834
1835 if (condition_type == LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING &&
1836 cmd_type != NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING) {
1837 continue;
1838 } else if (condition_type == LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED &&
1839 cmd_type != NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED) {
1840 continue;
1841 }
1842
ed327204 1843 client_list = get_client_list_from_condition(state, condition);
ed327204
JG
1844 if (cmd_type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING) {
1845 evaluation = lttng_evaluation_session_rotation_ongoing_create(
1846 trace_archive_chunk_id);
1847 } else {
1848 evaluation = lttng_evaluation_session_rotation_completed_create(
1849 trace_archive_chunk_id, location);
1850 }
1851
1852 if (!evaluation) {
1853 /* Internal error */
1854 ret = -1;
1855 cmd_result = LTTNG_ERR_UNK;
505b2d90 1856 goto put_list;
ed327204
JG
1857 }
1858
f2b3ef9f
JG
1859 /*
1860 * Ownership of `evaluation` transferred to the action executor
1861 * no matter the result.
1862 */
1863 executor_status = action_executor_enqueue(state->executor,
1864 trigger, evaluation, &session_creds,
1865 client_list);
1866 evaluation = NULL;
1867 switch (executor_status) {
1868 case ACTION_EXECUTOR_STATUS_OK:
1869 break;
1870 case ACTION_EXECUTOR_STATUS_ERROR:
1871 case ACTION_EXECUTOR_STATUS_INVALID:
1872 /*
1873 * TODO Add trigger identification (name/id) when
1874 * it is added to the API.
1875 */
1876 ERR("Fatal error occurred while enqueuing action associated with session rotation trigger");
1877 ret = -1;
1878 goto put_list;
1879 case ACTION_EXECUTOR_STATUS_OVERFLOW:
1880 /*
1881 * TODO Add trigger identification (name/id) when
1882 * it is added to the API.
1883 *
1884 * Not a fatal error.
1885 */
1886 WARN("No space left when enqueuing action associated with session rotation trigger");
1887 ret = 0;
1888 goto put_list;
1889 default:
1890 abort();
1891 }
1892
505b2d90
JG
1893put_list:
1894 notification_client_list_put(client_list);
ed327204 1895 if (caa_unlikely(ret)) {
505b2d90 1896 break;
ed327204
JG
1897 }
1898 }
1899end:
1900 session_info_put(session_info);
1901 *_cmd_result = cmd_result;
1902 rcu_read_unlock();
1903 return ret;
0ca52944
JG
1904}
1905
d02d7404
JR
1906static
1907int handle_notification_thread_command_add_tracer_event_source(
1908 struct notification_thread_state *state,
1909 int tracer_event_source_fd,
1910 enum lttng_domain_type domain_type,
1911 enum lttng_error_code *_cmd_result)
1912{
1913 int ret = 0;
1914 enum lttng_error_code cmd_result = LTTNG_OK;
1915 struct notification_event_tracer_event_source_element *element = NULL;
1916
1917 element = zmalloc(sizeof(*element));
1918 if (!element) {
1919 cmd_result = LTTNG_ERR_NOMEM;
1920 ret = -1;
1921 goto end;
1922 }
1923
1924 CDS_INIT_LIST_HEAD(&element->node);
1925 element->fd = tracer_event_source_fd;
1926 element->domain = domain_type;
1927
1928 cds_list_add(&element->node, &state->tracer_event_sources_list);
1929
1930 DBG3("[notification-thread] Adding tracer event source fd to poll set: tracer_event_source_fd = %d, domain = '%s'",
1931 tracer_event_source_fd,
1932 lttng_domain_type_str(domain_type));
1933
1934 /* Adding the read side pipe to the event poll. */
1935 ret = lttng_poll_add(&state->events, tracer_event_source_fd, LPOLLIN | LPOLLERR);
1936 if (ret < 0) {
1937 ERR("[notification-thread] Failed to add tracer event source to poll set: tracer_event_source_fd = %d, domain = '%s'",
1938 tracer_event_source_fd,
1939 lttng_domain_type_str(element->domain));
1940 cds_list_del(&element->node);
1941 free(element);
1942 goto end;
1943 }
1944
1945 element->is_fd_in_poll_set = true;
1946
1947end:
1948 *_cmd_result = cmd_result;
1949 return ret;
1950}
1951
1952static
1953int handle_notification_thread_command_remove_tracer_event_source(
1954 struct notification_thread_state *state,
1955 int tracer_event_source_fd,
1956 enum lttng_error_code *_cmd_result)
1957{
1958 int ret = 0;
1959 enum lttng_error_code cmd_result = LTTNG_OK;
1960 struct notification_event_tracer_event_source_element *source_element = NULL, *tmp;
1961
1962 cds_list_for_each_entry_safe(source_element, tmp,
1963 &state->tracer_event_sources_list, node) {
1964 if (source_element->fd != tracer_event_source_fd) {
1965 continue;
1966 }
1967
1968 DBG("[notification-thread] Removed tracer event source from poll set: tracer_event_source_fd = %d, domain = '%s'",
1969 tracer_event_source_fd,
1970 lttng_domain_type_str(source_element->domain));
1971 cds_list_del(&source_element->node);
1972 break;
1973 }
1974
1975 /* It should always be found. */
1976 assert(source_element);
1977
1978 if (!source_element->is_fd_in_poll_set) {
1979 /* Skip the poll set removal. */
1980 goto end;
1981 }
1982
1983 DBG3("[notification-thread] Removing tracer event source from poll set: tracer_event_source_fd = %d, domain = '%s'",
1984 tracer_event_source_fd,
1985 lttng_domain_type_str(source_element->domain));
1986
1987 /* Removing the fd from the event poll set. */
1988 ret = lttng_poll_del(&state->events, tracer_event_source_fd);
1989 if (ret < 0) {
1990 ERR("[notification-thread] Failed to remove tracer event source from poll set: tracer_event_source_fd = %d, domain = '%s'",
1991 tracer_event_source_fd,
1992 lttng_domain_type_str(source_element->domain));
1993 cmd_result = LTTNG_ERR_FATAL;
1994 goto end;
1995 }
1996
173900f6
JG
1997 source_element->is_fd_in_poll_set = false;
1998
d02d7404
JR
1999end:
2000 free(source_element);
2001 *_cmd_result = cmd_result;
2002 return ret;
2003}
2004
2005int handle_notification_thread_remove_tracer_event_source_no_result(
2006 struct notification_thread_state *state,
2007 int tracer_event_source_fd)
2008{
2009 int ret;
2010 enum lttng_error_code cmd_result;
2011
2012 ret = handle_notification_thread_command_remove_tracer_event_source(
2013 state, tracer_event_source_fd, &cmd_result);
2014 (void) cmd_result;
2015 return ret;
2016}
2017
fbc9f37d
JR
2018static int handle_notification_thread_command_list_triggers(
2019 struct notification_thread_handle *handle,
2020 struct notification_thread_state *state,
2021 uid_t client_uid,
2022 struct lttng_triggers **triggers,
2023 enum lttng_error_code *_cmd_result)
2024{
2025 int ret = 0;
2026 enum lttng_error_code cmd_result = LTTNG_OK;
2027 struct cds_lfht_iter iter;
2028 struct lttng_trigger_ht_element *trigger_ht_element;
2029 struct lttng_triggers *local_triggers = NULL;
2030 const struct lttng_credentials *creds;
2031
2032 rcu_read_lock();
2033
2034 local_triggers = lttng_triggers_create();
2035 if (!local_triggers) {
2036 /* Not a fatal error. */
2037 cmd_result = LTTNG_ERR_NOMEM;
2038 goto end;
2039 }
2040
2041 cds_lfht_for_each_entry(state->triggers_ht, &iter,
2042 trigger_ht_element, node) {
2043 /*
2044 * Only return the triggers to which the client has access.
2045 * The root user has visibility over all triggers.
2046 */
2047 creds = lttng_trigger_get_credentials(trigger_ht_element->trigger);
2048 if (client_uid != lttng_credentials_get_uid(creds) && client_uid != 0) {
2049 continue;
2050 }
2051
2052 ret = lttng_triggers_add(local_triggers,
2053 trigger_ht_element->trigger);
2054 if (ret < 0) {
2055 /* Not a fatal error. */
2056 ret = 0;
2057 cmd_result = LTTNG_ERR_NOMEM;
2058 goto end;
2059 }
2060 }
2061
2062 /* Transferring ownership to the caller. */
2063 *triggers = local_triggers;
2064 local_triggers = NULL;
2065
2066end:
2067 rcu_read_unlock();
2068 lttng_triggers_destroy(local_triggers);
2069 *_cmd_result = cmd_result;
2070 return ret;
2071}
2072
1da26331 2073static
959e3c66 2074bool condition_is_supported(struct lttng_condition *condition)
1da26331 2075{
959e3c66 2076 bool is_supported;
1da26331
JG
2077
2078 switch (lttng_condition_get_type(condition)) {
2079 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
2080 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
2081 {
959e3c66 2082 int ret;
1da26331
JG
2083 enum lttng_domain_type domain;
2084
2085 ret = lttng_condition_buffer_usage_get_domain_type(condition,
2086 &domain);
959e3c66 2087 assert(ret == 0);
1da26331
JG
2088
2089 if (domain != LTTNG_DOMAIN_KERNEL) {
959e3c66 2090 is_supported = true;
1da26331
JG
2091 goto end;
2092 }
2093
2094 /*
2095 * Older kernel tracers don't expose the API to monitor their
2096 * buffers. Therefore, we reject triggers that require that
2097 * mechanism to be available to be evaluated.
959e3c66
JR
2098 *
2099 * Assume unsupported on error.
1da26331 2100 */
959e3c66
JR
2101 is_supported = kernel_supports_ring_buffer_snapshot_sample_positions() == 1;
2102 break;
2103 }
2104 case LTTNG_CONDITION_TYPE_EVENT_RULE_HIT:
2105 {
2106 const struct lttng_event_rule *event_rule;
2107 enum lttng_domain_type domain;
2108 const enum lttng_condition_status status =
2109 lttng_condition_event_rule_get_rule(
2110 condition, &event_rule);
2111
2112 assert(status == LTTNG_CONDITION_STATUS_OK);
2113
2114 domain = lttng_event_rule_get_domain_type(event_rule);
2115 if (domain != LTTNG_DOMAIN_KERNEL) {
2116 is_supported = true;
2117 goto end;
2118 }
2119
2120 /*
2121 * Older kernel tracers can't emit notification. Therefore, we
2122 * reject triggers that require that mechanism to be available
2123 * to be evaluated.
2124 *
2125 * Assume unsupported on error.
2126 */
2127 is_supported = kernel_supports_event_notifiers() == 1;
1da26331
JG
2128 break;
2129 }
2130 default:
959e3c66 2131 is_supported = true;
1da26331
JG
2132 }
2133end:
959e3c66 2134 return is_supported;
1da26331
JG
2135}
2136
51eab943
JG
2137/* Must be called with RCU read lock held. */
2138static
f2b3ef9f 2139int bind_trigger_to_matching_session(struct lttng_trigger *trigger,
51eab943
JG
2140 struct notification_thread_state *state)
2141{
2142 int ret = 0;
51eab943
JG
2143 const struct lttng_condition *condition;
2144 const char *session_name;
2145 struct lttng_session_trigger_list *trigger_list;
2146
2147 condition = lttng_trigger_get_const_condition(trigger);
2148 switch (lttng_condition_get_type(condition)) {
2149 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
2150 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
2151 {
2152 enum lttng_condition_status status;
2153
2154 status = lttng_condition_session_rotation_get_session_name(
2155 condition, &session_name);
2156 if (status != LTTNG_CONDITION_STATUS_OK) {
2157 ERR("[notification-thread] Failed to bind trigger to session: unable to get 'session_rotation' condition's session name");
2158 ret = -1;
2159 goto end;
2160 }
2161 break;
2162 }
2163 default:
2164 ret = -1;
2165 goto end;
2166 }
2167
ed327204
JG
2168 trigger_list = get_session_trigger_list(state, session_name);
2169 if (!trigger_list) {
51eab943
JG
2170 DBG("[notification-thread] Unable to bind trigger applying to session \"%s\" as it is not yet known to the notification system",
2171 session_name);
2172 goto end;
51eab943 2173
ed327204 2174 }
51eab943
JG
2175
2176 DBG("[notification-thread] Newly registered trigger bound to session \"%s\"",
2177 session_name);
2178 ret = lttng_session_trigger_list_add(trigger_list, trigger);
2179end:
2180 return ret;
2181}
2182
2183/* Must be called with RCU read lock held. */
2184static
f2b3ef9f 2185int bind_trigger_to_matching_channels(struct lttng_trigger *trigger,
51eab943
JG
2186 struct notification_thread_state *state)
2187{
2188 int ret = 0;
2189 struct cds_lfht_node *node;
2190 struct cds_lfht_iter iter;
2191 struct channel_info *channel;
2192
2193 cds_lfht_for_each_entry(state->channels_ht, &iter, channel,
2194 channels_ht_node) {
2195 struct lttng_trigger_list_element *trigger_list_element;
2196 struct lttng_channel_trigger_list *trigger_list;
a5d64ae7 2197 struct cds_lfht_iter lookup_iter;
51eab943
JG
2198
2199 if (!trigger_applies_to_channel(trigger, channel)) {
2200 continue;
2201 }
2202
2203 cds_lfht_lookup(state->channel_triggers_ht,
2204 hash_channel_key(&channel->key),
2205 match_channel_trigger_list,
2206 &channel->key,
a5d64ae7
MD
2207 &lookup_iter);
2208 node = cds_lfht_iter_get_node(&lookup_iter);
51eab943
JG
2209 assert(node);
2210 trigger_list = caa_container_of(node,
2211 struct lttng_channel_trigger_list,
2212 channel_triggers_ht_node);
2213
2214 trigger_list_element = zmalloc(sizeof(*trigger_list_element));
2215 if (!trigger_list_element) {
2216 ret = -1;
2217 goto end;
2218 }
2219 CDS_INIT_LIST_HEAD(&trigger_list_element->node);
2220 trigger_list_element->trigger = trigger;
2221 cds_list_add(&trigger_list_element->node, &trigger_list->list);
2222 DBG("[notification-thread] Newly registered trigger bound to channel \"%s\"",
2223 channel->name);
2224 }
2225end:
2226 return ret;
2227}
2228
f2b3ef9f
JG
2229static
2230bool is_trigger_action_notify(const struct lttng_trigger *trigger)
2231{
2232 bool is_notify = false;
2233 unsigned int i, count;
2234 enum lttng_action_status action_status;
2235 const struct lttng_action *action =
2236 lttng_trigger_get_const_action(trigger);
2237 enum lttng_action_type action_type;
2238
2239 assert(action);
17182cfd 2240 action_type = lttng_action_get_type(action);
f2b3ef9f
JG
2241 if (action_type == LTTNG_ACTION_TYPE_NOTIFY) {
2242 is_notify = true;
2243 goto end;
2244 } else if (action_type != LTTNG_ACTION_TYPE_GROUP) {
2245 goto end;
2246 }
2247
2248 action_status = lttng_action_group_get_count(action, &count);
2249 assert(action_status == LTTNG_ACTION_STATUS_OK);
2250
2251 for (i = 0; i < count; i++) {
2252 const struct lttng_action *inner_action =
2253 lttng_action_group_get_at_index(
2254 action, i);
2255
17182cfd 2256 action_type = lttng_action_get_type(inner_action);
f2b3ef9f
JG
2257 if (action_type == LTTNG_ACTION_TYPE_NOTIFY) {
2258 is_notify = true;
2259 goto end;
2260 }
2261 }
2262
2263end:
2264 return is_notify;
2265}
2266
242388e4
JR
2267static bool trigger_name_taken(struct notification_thread_state *state,
2268 const struct lttng_trigger *trigger)
2269{
2270 struct cds_lfht_iter iter;
2271
2272 /*
2273 * No duplicata is allowed in the triggers_by_name_uid_ht.
2274 * The match is done against the trigger name and uid.
2275 */
2276 cds_lfht_lookup(state->triggers_by_name_uid_ht,
2277 hash_trigger_by_name_uid(trigger),
2278 match_trigger_by_name_uid,
2279 trigger,
2280 &iter);
2281 return !!cds_lfht_iter_get_node(&iter);
2282}
2283
2284static
2285enum lttng_error_code generate_trigger_name(
2286 struct notification_thread_state *state,
2287 struct lttng_trigger *trigger, const char **name)
2288{
2289 enum lttng_error_code ret_code = LTTNG_OK;
2290 bool taken = false;
2291 enum lttng_trigger_status status;
2292
2293 do {
2294 const int ret = lttng_trigger_generate_name(trigger,
2295 state->trigger_id.name_offset++);
2296 if (ret) {
2297 /* The only reason this can fail right now. */
2298 ret_code = LTTNG_ERR_NOMEM;
2299 break;
2300 }
2301
2302 status = lttng_trigger_get_name(trigger, name);
2303 assert(status == LTTNG_TRIGGER_STATUS_OK);
2304
2305 taken = trigger_name_taken(state, trigger);
2306 } while (taken || state->trigger_id.name_offset == UINT64_MAX);
2307
2308 return ret_code;
2309}
2310
ab0ee2ca 2311/*
f2b3ef9f 2312 * FIXME A client's credentials are not checked when registering a trigger.
ab0ee2ca 2313 *
1da26331 2314 * The effects of this are benign since:
ab0ee2ca 2315 * - The client will succeed in registering the trigger, as it is valid,
51eab943 2316 * - The trigger will, internally, be bound to the channel/session,
ab0ee2ca
JG
2317 * - The notifications will not be sent since the client's credentials
2318 * are checked against the channel at that moment.
1da26331
JG
2319 *
2320 * If this function returns a non-zero value, it means something is
50ca7858 2321 * fundamentally broken and the whole subsystem/thread will be torn down.
1da26331
JG
2322 *
2323 * If a non-fatal error occurs, just set the cmd_result to the appropriate
2324 * error code.
ab0ee2ca
JG
2325 */
2326static
2327int handle_notification_thread_command_register_trigger(
8abe313a
JG
2328 struct notification_thread_state *state,
2329 struct lttng_trigger *trigger,
2330 enum lttng_error_code *cmd_result)
ab0ee2ca
JG
2331{
2332 int ret = 0;
2333 struct lttng_condition *condition;
2334 struct notification_client *client;
2335 struct notification_client_list *client_list = NULL;
2336 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
f2b3ef9f 2337 struct notification_client_list_element *client_list_element;
e7c93cf9 2338 struct notification_trigger_tokens_ht_element *trigger_tokens_ht_element = NULL;
ab0ee2ca
JG
2339 struct cds_lfht_node *node;
2340 struct cds_lfht_iter iter;
242388e4 2341 const char* trigger_name;
ab0ee2ca 2342 bool free_trigger = true;
f2b3ef9f
JG
2343 struct lttng_evaluation *evaluation = NULL;
2344 struct lttng_credentials object_creds;
ff588497
JR
2345 uid_t object_uid;
2346 gid_t object_gid;
f2b3ef9f 2347 enum action_executor_status executor_status;
e6887944
JR
2348 const uint64_t trigger_tracer_token =
2349 state->trigger_id.next_tracer_token++;
ab0ee2ca
JG
2350
2351 rcu_read_lock();
2352
e6887944
JR
2353 /* Set the trigger's tracer token. */
2354 lttng_trigger_set_tracer_token(trigger, trigger_tracer_token);
2355
242388e4
JR
2356 if (lttng_trigger_get_name(trigger, &trigger_name) ==
2357 LTTNG_TRIGGER_STATUS_UNSET) {
2358 const enum lttng_error_code ret_code = generate_trigger_name(
2359 state, trigger, &trigger_name);
2360
2361 if (ret_code != LTTNG_OK) {
2362 /* Fatal error. */
2363 ret = -1;
2364 *cmd_result = ret_code;
2365 goto error;
2366 }
2367 } else if (trigger_name_taken(state, trigger)) {
2368 /* Not a fatal error. */
2369 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
2370 ret = 0;
2371 goto error;
2372 }
2373
ab0ee2ca 2374 condition = lttng_trigger_get_condition(trigger);
1da26331
JG
2375 assert(condition);
2376
959e3c66
JR
2377 /* Some conditions require tracers to implement a minimal ABI version. */
2378 if (!condition_is_supported(condition)) {
1da26331
JG
2379 *cmd_result = LTTNG_ERR_NOT_SUPPORTED;
2380 goto error;
1da26331
JG
2381 }
2382
ab0ee2ca
JG
2383 trigger_ht_element = zmalloc(sizeof(*trigger_ht_element));
2384 if (!trigger_ht_element) {
2385 ret = -1;
2386 goto error;
2387 }
2388
2389 /* Add trigger to the trigger_ht. */
2390 cds_lfht_node_init(&trigger_ht_element->node);
242388e4 2391 cds_lfht_node_init(&trigger_ht_element->node_by_name_uid);
ab0ee2ca
JG
2392 trigger_ht_element->trigger = trigger;
2393
2394 node = cds_lfht_add_unique(state->triggers_ht,
2395 lttng_condition_hash(condition),
242388e4
JR
2396 match_trigger,
2397 trigger,
ab0ee2ca
JG
2398 &trigger_ht_element->node);
2399 if (node != &trigger_ht_element->node) {
2400 /* Not a fatal error, simply report it to the client. */
2401 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
2402 goto error_free_ht_element;
2403 }
2404
242388e4
JR
2405 node = cds_lfht_add_unique(state->triggers_by_name_uid_ht,
2406 hash_trigger_by_name_uid(trigger),
2407 match_trigger_by_name_uid,
2408 trigger,
2409 &trigger_ht_element->node_by_name_uid);
2410 if (node != &trigger_ht_element->node_by_name_uid) {
2411 /* Not a fatal error, simply report it to the client. */
2412 cds_lfht_del(state->triggers_ht, &trigger_ht_element->node);
2413 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
2414 goto error_free_ht_element;
2415 }
2416
e7c93cf9
JR
2417 if (lttng_condition_get_type(condition) == LTTNG_CONDITION_TYPE_EVENT_RULE_HIT) {
2418 trigger_tokens_ht_element = zmalloc(sizeof(*trigger_tokens_ht_element));
2419 if (!trigger_tokens_ht_element) {
2420 /* Fatal error. */
2421 ret = -1;
2422 cds_lfht_del(state->triggers_ht,
2423 &trigger_ht_element->node);
2424 cds_lfht_del(state->triggers_by_name_uid_ht,
2425 &trigger_ht_element->node_by_name_uid);
2426 goto error_free_ht_element;
2427 }
2428
2429 /* Add trigger token to the trigger_tokens_ht. */
2430 cds_lfht_node_init(&trigger_tokens_ht_element->node);
2431 trigger_tokens_ht_element->token =
2432 LTTNG_OPTIONAL_GET(trigger->tracer_token);
2433 trigger_tokens_ht_element->trigger = trigger;
2434
2435 node = cds_lfht_add_unique(state->trigger_tokens_ht,
2436 hash_key_u64(&trigger_tokens_ht_element->token,
2437 lttng_ht_seed),
2438 match_trigger_token,
2439 &trigger_tokens_ht_element->token,
2440 &trigger_tokens_ht_element->node);
2441 if (node != &trigger_tokens_ht_element->node) {
2442 /* Internal corruption, fatal error. */
2443 ret = -1;
2444 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
2445 cds_lfht_del(state->triggers_ht,
2446 &trigger_ht_element->node);
2447 cds_lfht_del(state->triggers_by_name_uid_ht,
2448 &trigger_ht_element->node_by_name_uid);
2449 goto error_free_ht_element;
2450 }
2451 }
2452
ab0ee2ca
JG
2453 /*
2454 * Ownership of the trigger and of its wrapper was transfered to
e7c93cf9 2455 * the triggers_ht. Same for token ht element if necessary.
ab0ee2ca 2456 */
e7c93cf9 2457 trigger_tokens_ht_element = NULL;
ab0ee2ca
JG
2458 trigger_ht_element = NULL;
2459 free_trigger = false;
2460
2461 /*
2462 * The rest only applies to triggers that have a "notify" action.
2463 * It is not skipped as this is the only action type currently
2464 * supported.
2465 */
f2b3ef9f
JG
2466 if (is_trigger_action_notify(trigger)) {
2467 client_list = notification_client_list_create(trigger);
2468 if (!client_list) {
2469 ret = -1;
2470 goto error_free_ht_element;
ab0ee2ca
JG
2471 }
2472
f2b3ef9f
JG
2473 /* Build a list of clients to which this new trigger applies. */
2474 cds_lfht_for_each_entry (state->client_socket_ht, &iter, client,
2475 client_socket_ht_node) {
2476 if (!trigger_applies_to_client(trigger, client)) {
2477 continue;
2478 }
2479
2480 client_list_element =
2481 zmalloc(sizeof(*client_list_element));
2482 if (!client_list_element) {
2483 ret = -1;
2484 goto error_put_client_list;
2485 }
2486
2487 CDS_INIT_LIST_HEAD(&client_list_element->node);
2488 client_list_element->client = client;
2489 cds_list_add(&client_list_element->node,
2490 &client_list->list);
ab0ee2ca 2491 }
f2b3ef9f
JG
2492
2493 /*
2494 * Client list ownership transferred to the
2495 * notification_trigger_clients_ht.
2496 */
2497 publish_notification_client_list(state, client_list);
ab0ee2ca
JG
2498 }
2499
f82f93a1 2500 switch (get_condition_binding_object(condition)) {
51eab943
JG
2501 case LTTNG_OBJECT_TYPE_SESSION:
2502 /* Add the trigger to the list if it matches a known session. */
2503 ret = bind_trigger_to_matching_session(trigger, state);
2504 if (ret) {
505b2d90 2505 goto error_put_client_list;
ab0ee2ca 2506 }
f82f93a1 2507 break;
51eab943
JG
2508 case LTTNG_OBJECT_TYPE_CHANNEL:
2509 /*
2510 * Add the trigger to list of triggers bound to the channels
2511 * currently known.
2512 */
2513 ret = bind_trigger_to_matching_channels(trigger, state);
2514 if (ret) {
505b2d90 2515 goto error_put_client_list;
ab0ee2ca 2516 }
51eab943
JG
2517 break;
2518 case LTTNG_OBJECT_TYPE_NONE:
2519 break;
2520 default:
f2b3ef9f 2521 ERR("Unknown object type on which to bind a newly registered trigger was encountered");
51eab943 2522 ret = -1;
505b2d90 2523 goto error_put_client_list;
ab0ee2ca
JG
2524 }
2525
2ae99f0b 2526 /*
f2b3ef9f
JG
2527 * The new trigger's condition must be evaluated against the current
2528 * state.
2529 *
2530 * In the case of `notify` action, nothing preventing clients from
2531 * subscribing to a condition before the corresponding trigger is
2532 * registered, we have to evaluate this new condition right away.
2ae99f0b
JG
2533 *
2534 * At some point, we were waiting for the next "evaluation" (e.g. on
2535 * reception of a channel sample) to evaluate this new condition, but
2536 * that was broken.
2537 *
2538 * The reason it was broken is that waiting for the next sample
2539 * does not allow us to properly handle transitions for edge-triggered
2540 * conditions.
2541 *
2542 * Consider this example: when we handle a new channel sample, we
2543 * evaluate each conditions twice: once with the previous state, and
2544 * again with the newest state. We then use those two results to
2545 * determine whether a state change happened: a condition was false and
2546 * became true. If a state change happened, we have to notify clients.
2547 *
2548 * Now, if a client subscribes to a given notification and registers
2549 * a trigger *after* that subscription, we have to make sure the
2550 * condition is evaluated at this point while considering only the
2551 * current state. Otherwise, the next evaluation cycle may only see
2552 * that the evaluations remain the same (true for samples n-1 and n) and
2553 * the client will never know that the condition has been met.
2554 */
f2b3ef9f
JG
2555 switch (get_condition_binding_object(condition)) {
2556 case LTTNG_OBJECT_TYPE_SESSION:
2557 ret = evaluate_session_condition_for_client(condition, state,
ff588497
JR
2558 &evaluation, &object_uid,
2559 &object_gid);
f2b3ef9f
JG
2560 break;
2561 case LTTNG_OBJECT_TYPE_CHANNEL:
2562 ret = evaluate_channel_condition_for_client(condition, state,
ff588497
JR
2563 &evaluation, &object_uid,
2564 &object_gid);
f2b3ef9f
JG
2565 break;
2566 case LTTNG_OBJECT_TYPE_NONE:
2567 ret = 0;
242388e4 2568 break;
f2b3ef9f
JG
2569 case LTTNG_OBJECT_TYPE_UNKNOWN:
2570 default:
2571 ret = -1;
242388e4 2572 break;
f2b3ef9f
JG
2573 }
2574
2575 if (ret) {
2576 /* Fatal error. */
2577 goto error_put_client_list;
2578 }
2579
ff588497
JR
2580 LTTNG_OPTIONAL_SET(&object_creds.uid, object_uid);
2581 LTTNG_OPTIONAL_SET(&object_creds.gid, object_gid);
2582
f2b3ef9f
JG
2583 DBG("Newly registered trigger's condition evaluated to %s",
2584 evaluation ? "true" : "false");
2585 if (!evaluation) {
2586 /* Evaluation yielded nothing. Normal exit. */
2587 ret = 0;
242388e4 2588 goto end;
2ae99f0b
JG
2589 }
2590
2591 /*
f2b3ef9f
JG
2592 * Ownership of `evaluation` transferred to the action executor
2593 * no matter the result.
2ae99f0b 2594 */
f2b3ef9f
JG
2595 executor_status = action_executor_enqueue(state->executor, trigger,
2596 evaluation, &object_creds, client_list);
2597 evaluation = NULL;
2598 switch (executor_status) {
2599 case ACTION_EXECUTOR_STATUS_OK:
2600 break;
2601 case ACTION_EXECUTOR_STATUS_ERROR:
2602 case ACTION_EXECUTOR_STATUS_INVALID:
2603 /*
2604 * TODO Add trigger identification (name/id) when
2605 * it is added to the API.
2606 */
2607 ERR("Fatal error occurred while enqueuing action associated to newly registered trigger");
2608 ret = -1;
2609 goto error_put_client_list;
2610 case ACTION_EXECUTOR_STATUS_OVERFLOW:
2611 /*
2612 * TODO Add trigger identification (name/id) when
2613 * it is added to the API.
2614 *
2615 * Not a fatal error.
2616 */
2617 WARN("No space left when enqueuing action associated to newly registered trigger");
2618 ret = 0;
242388e4 2619 goto end;
f2b3ef9f
JG
2620 default:
2621 abort();
2622 }
2ae99f0b 2623
242388e4 2624end:
ab0ee2ca 2625 *cmd_result = LTTNG_OK;
e6887944
JR
2626 DBG("Registered trigger: name = `%s`, tracer token = %" PRIu64,
2627 trigger_name, trigger_tracer_token);
505b2d90
JG
2628
2629error_put_client_list:
2630 notification_client_list_put(client_list);
2631
ab0ee2ca 2632error_free_ht_element:
242388e4
JR
2633 if (trigger_ht_element) {
2634 /* Delayed removal due to RCU constraint on delete. */
2635 call_rcu(&trigger_ht_element->rcu_node,
2636 free_lttng_trigger_ht_element_rcu);
2637 }
2638
e7c93cf9 2639 free(trigger_tokens_ht_element);
ab0ee2ca
JG
2640error:
2641 if (free_trigger) {
ab0ee2ca
JG
2642 lttng_trigger_destroy(trigger);
2643 }
2644 rcu_read_unlock();
2645 return ret;
2646}
2647
83b934ad
MD
2648static
2649void free_lttng_trigger_ht_element_rcu(struct rcu_head *node)
2650{
2651 free(caa_container_of(node, struct lttng_trigger_ht_element,
2652 rcu_node));
2653}
2654
e7c93cf9
JR
2655static
2656void free_notification_trigger_tokens_ht_element_rcu(struct rcu_head *node)
2657{
2658 free(caa_container_of(node, struct notification_trigger_tokens_ht_element,
2659 rcu_node));
2660}
2661
cc2295b5 2662static
ab0ee2ca
JG
2663int handle_notification_thread_command_unregister_trigger(
2664 struct notification_thread_state *state,
2665 struct lttng_trigger *trigger,
2666 enum lttng_error_code *_cmd_reply)
2667{
2668 struct cds_lfht_iter iter;
ed327204 2669 struct cds_lfht_node *triggers_ht_node;
ab0ee2ca
JG
2670 struct lttng_channel_trigger_list *trigger_list;
2671 struct notification_client_list *client_list;
ab0ee2ca
JG
2672 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
2673 struct lttng_condition *condition = lttng_trigger_get_condition(
2674 trigger);
ab0ee2ca
JG
2675 enum lttng_error_code cmd_reply;
2676
2677 rcu_read_lock();
2678
2679 cds_lfht_lookup(state->triggers_ht,
2680 lttng_condition_hash(condition),
242388e4
JR
2681 match_trigger,
2682 trigger,
ab0ee2ca
JG
2683 &iter);
2684 triggers_ht_node = cds_lfht_iter_get_node(&iter);
2685 if (!triggers_ht_node) {
2686 cmd_reply = LTTNG_ERR_TRIGGER_NOT_FOUND;
2687 goto end;
2688 } else {
2689 cmd_reply = LTTNG_OK;
2690 }
2691
2692 /* Remove trigger from channel_triggers_ht. */
2693 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter, trigger_list,
2694 channel_triggers_ht_node) {
2695 struct lttng_trigger_list_element *trigger_element, *tmp;
2696
2697 cds_list_for_each_entry_safe(trigger_element, tmp,
2698 &trigger_list->list, node) {
e8f7fa11 2699 if (!lttng_trigger_is_equal(trigger, trigger_element->trigger)) {
ab0ee2ca
JG
2700 continue;
2701 }
2702
2703 DBG("[notification-thread] Removed trigger from channel_triggers_ht");
2704 cds_list_del(&trigger_element->node);
e4db5ace
JR
2705 /* A trigger can only appear once per channel */
2706 break;
ab0ee2ca
JG
2707 }
2708 }
2709
e7c93cf9
JR
2710 if (lttng_condition_get_type(condition) ==
2711 LTTNG_CONDITION_TYPE_EVENT_RULE_HIT) {
2712 struct notification_trigger_tokens_ht_element
2713 *trigger_tokens_ht_element;
2714
2715 cds_lfht_for_each_entry (state->trigger_tokens_ht, &iter,
2716 trigger_tokens_ht_element, node) {
2717 if (!lttng_trigger_is_equal(trigger,
2718 trigger_tokens_ht_element->trigger)) {
2719 continue;
2720 }
2721
2722 DBG("[notification-thread] Removed trigger from tokens_ht");
2723 cds_lfht_del(state->trigger_tokens_ht,
2724 &trigger_tokens_ht_element->node);
2725 call_rcu(&trigger_tokens_ht_element->rcu_node,
2726 free_notification_trigger_tokens_ht_element_rcu);
2727
2728 break;
2729 }
2730 }
2731
51367634
JR
2732 if (is_trigger_action_notify(trigger)) {
2733 /*
2734 * Remove and release the client list from
2735 * notification_trigger_clients_ht.
2736 */
2737 client_list = get_client_list_from_condition(state, condition);
2738 assert(client_list);
ed327204 2739
51367634
JR
2740 /* Put new reference and the hashtable's reference. */
2741 notification_client_list_put(client_list);
2742 notification_client_list_put(client_list);
2743 client_list = NULL;
2744 }
ab0ee2ca
JG
2745
2746 /* Remove trigger from triggers_ht. */
2747 trigger_ht_element = caa_container_of(triggers_ht_node,
2748 struct lttng_trigger_ht_element, node);
242388e4 2749 cds_lfht_del(state->triggers_by_name_uid_ht, &trigger_ht_element->node_by_name_uid);
ab0ee2ca
JG
2750 cds_lfht_del(state->triggers_ht, triggers_ht_node);
2751
7ca172c1 2752 /* Release the ownership of the trigger. */
ab0ee2ca 2753 lttng_trigger_destroy(trigger_ht_element->trigger);
83b934ad 2754 call_rcu(&trigger_ht_element->rcu_node, free_lttng_trigger_ht_element_rcu);
ab0ee2ca
JG
2755end:
2756 rcu_read_unlock();
2757 if (_cmd_reply) {
2758 *_cmd_reply = cmd_reply;
2759 }
2760 return 0;
2761}
2762
2763/* Returns 0 on success, 1 on exit requested, negative value on error. */
2764int handle_notification_thread_command(
2765 struct notification_thread_handle *handle,
2766 struct notification_thread_state *state)
2767{
2768 int ret;
2769 uint64_t counter;
2770 struct notification_thread_command *cmd;
2771
8abe313a 2772 /* Read the event pipe to put it back into a quiescent state. */
18aeca05 2773 ret = lttng_read(lttng_pipe_get_readfd(handle->cmd_queue.event_pipe), &counter,
8abe313a 2774 sizeof(counter));
18aeca05 2775 if (ret != sizeof(counter)) {
ab0ee2ca
JG
2776 goto error;
2777 }
2778
2779 pthread_mutex_lock(&handle->cmd_queue.lock);
2780 cmd = cds_list_first_entry(&handle->cmd_queue.list,
2781 struct notification_thread_command, cmd_list_node);
97a71ea6
JR
2782 cds_list_del(&cmd->cmd_list_node);
2783 pthread_mutex_unlock(&handle->cmd_queue.lock);
ab0ee2ca
JG
2784 switch (cmd->type) {
2785 case NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER:
2786 DBG("[notification-thread] Received register trigger command");
2787 ret = handle_notification_thread_command_register_trigger(
2788 state, cmd->parameters.trigger,
2789 &cmd->reply_code);
2790 break;
2791 case NOTIFICATION_COMMAND_TYPE_UNREGISTER_TRIGGER:
2792 DBG("[notification-thread] Received unregister trigger command");
2793 ret = handle_notification_thread_command_unregister_trigger(
2794 state, cmd->parameters.trigger,
2795 &cmd->reply_code);
2796 break;
2797 case NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL:
2798 DBG("[notification-thread] Received add channel command");
2799 ret = handle_notification_thread_command_add_channel(
8abe313a
JG
2800 state,
2801 cmd->parameters.add_channel.session.name,
2802 cmd->parameters.add_channel.session.uid,
2803 cmd->parameters.add_channel.session.gid,
2804 cmd->parameters.add_channel.channel.name,
2805 cmd->parameters.add_channel.channel.domain,
2806 cmd->parameters.add_channel.channel.key,
2807 cmd->parameters.add_channel.channel.capacity,
ab0ee2ca
JG
2808 &cmd->reply_code);
2809 break;
2810 case NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL:
2811 DBG("[notification-thread] Received remove channel command");
2812 ret = handle_notification_thread_command_remove_channel(
2813 state, cmd->parameters.remove_channel.key,
2814 cmd->parameters.remove_channel.domain,
2815 &cmd->reply_code);
2816 break;
0ca52944 2817 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING:
0ca52944 2818 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED:
ed327204
JG
2819 DBG("[notification-thread] Received session rotation %s command",
2820 cmd->type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING ?
2821 "ongoing" : "completed");
2822 ret = handle_notification_thread_command_session_rotation(
0ca52944 2823 state,
ed327204
JG
2824 cmd->type,
2825 cmd->parameters.session_rotation.session_name,
2826 cmd->parameters.session_rotation.uid,
2827 cmd->parameters.session_rotation.gid,
2828 cmd->parameters.session_rotation.trace_archive_chunk_id,
2829 cmd->parameters.session_rotation.location,
0ca52944
JG
2830 &cmd->reply_code);
2831 break;
d02d7404
JR
2832 case NOTIFICATION_COMMAND_TYPE_ADD_TRACER_EVENT_SOURCE:
2833 ret = handle_notification_thread_command_add_tracer_event_source(
2834 state,
2835 cmd->parameters.tracer_event_source.tracer_event_source_fd,
2836 cmd->parameters.tracer_event_source.domain,
2837 &cmd->reply_code);
2838 break;
2839 case NOTIFICATION_COMMAND_TYPE_REMOVE_TRACER_EVENT_SOURCE:
2840 ret = handle_notification_thread_command_remove_tracer_event_source(
2841 state,
2842 cmd->parameters.tracer_event_source.tracer_event_source_fd,
2843 &cmd->reply_code);
2844 break;
fbc9f37d
JR
2845 case NOTIFICATION_COMMAND_TYPE_LIST_TRIGGERS:
2846 {
2847 struct lttng_triggers *triggers = NULL;
2848
2849 ret = handle_notification_thread_command_list_triggers(
2850 handle,
2851 state,
2852 cmd->parameters.list_triggers.uid,
2853 &triggers,
2854 &cmd->reply_code);
2855 cmd->reply.list_triggers.triggers = triggers;
2856 ret = 0;
2857 break;
2858 }
ab0ee2ca
JG
2859 case NOTIFICATION_COMMAND_TYPE_QUIT:
2860 DBG("[notification-thread] Received quit command");
2861 cmd->reply_code = LTTNG_OK;
2862 ret = 1;
2863 goto end;
f2b3ef9f
JG
2864 case NOTIFICATION_COMMAND_TYPE_CLIENT_COMMUNICATION_UPDATE:
2865 {
2866 const enum client_transmission_status client_status =
2867 cmd->parameters.client_communication_update
2868 .status;
2869 const notification_client_id client_id =
2870 cmd->parameters.client_communication_update.id;
2871 struct notification_client *client;
2872
2873 rcu_read_lock();
2874 client = get_client_from_id(client_id, state);
2875
2876 if (!client) {
2877 /*
2878 * Client error was probably already picked-up by the
2879 * notification thread or it has disconnected
2880 * gracefully while this command was queued.
2881 */
2882 DBG("Failed to find notification client to update communication status, client id = %" PRIu64,
2883 client_id);
2884 ret = 0;
2885 } else {
f2b3ef9f
JG
2886 ret = client_handle_transmission_status(
2887 client, client_status, state);
f2b3ef9f
JG
2888 }
2889 rcu_read_unlock();
2890 break;
2891 }
ab0ee2ca
JG
2892 default:
2893 ERR("[notification-thread] Unknown internal command received");
2894 goto error_unlock;
2895 }
2896
2897 if (ret) {
2898 goto error_unlock;
2899 }
2900end:
0ab399e0
JG
2901 if (cmd->is_async) {
2902 free(cmd);
2903 cmd = NULL;
2904 } else {
2905 lttng_waiter_wake_up(&cmd->reply_waiter);
2906 }
ab0ee2ca
JG
2907 return ret;
2908error_unlock:
2909 /* Wake-up and return a fatal error to the calling thread. */
8ada111f 2910 lttng_waiter_wake_up(&cmd->reply_waiter);
ab0ee2ca
JG
2911 cmd->reply_code = LTTNG_ERR_FATAL;
2912error:
2913 /* Indicate a fatal error to the caller. */
2914 return -1;
2915}
2916
ab0ee2ca
JG
2917static
2918int socket_set_non_blocking(int socket)
2919{
2920 int ret, flags;
2921
2922 /* Set the pipe as non-blocking. */
2923 ret = fcntl(socket, F_GETFL, 0);
2924 if (ret == -1) {
2925 PERROR("fcntl get socket flags");
2926 goto end;
2927 }
2928 flags = ret;
2929
2930 ret = fcntl(socket, F_SETFL, flags | O_NONBLOCK);
2931 if (ret == -1) {
2932 PERROR("fcntl set O_NONBLOCK socket flag");
2933 goto end;
2934 }
2935 DBG("Client socket (fd = %i) set as non-blocking", socket);
2936end:
2937 return ret;
2938}
2939
2940static
14fa22f8 2941int client_reset_inbound_state(struct notification_client *client)
ab0ee2ca
JG
2942{
2943 int ret;
2944
882093ee
JR
2945
2946 lttng_payload_clear(&client->communication.inbound.payload);
ab0ee2ca
JG
2947
2948 client->communication.inbound.bytes_to_receive =
2949 sizeof(struct lttng_notification_channel_message);
2950 client->communication.inbound.msg_type =
2951 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN;
ab0ee2ca
JG
2952 LTTNG_SOCK_SET_UID_CRED(&client->communication.inbound.creds, -1);
2953 LTTNG_SOCK_SET_GID_CRED(&client->communication.inbound.creds, -1);
14fa22f8 2954 ret = lttng_dynamic_buffer_set_size(
882093ee 2955 &client->communication.inbound.payload.buffer,
14fa22f8 2956 client->communication.inbound.bytes_to_receive);
882093ee 2957
14fa22f8 2958 return ret;
ab0ee2ca
JG
2959}
2960
2961int handle_notification_thread_client_connect(
2962 struct notification_thread_state *state)
2963{
2964 int ret;
2965 struct notification_client *client;
2966
2967 DBG("[notification-thread] Handling new notification channel client connection");
2968
2969 client = zmalloc(sizeof(*client));
2970 if (!client) {
2971 /* Fatal error. */
2972 ret = -1;
2973 goto error;
2974 }
6c24d3fd 2975
505b2d90 2976 pthread_mutex_init(&client->lock, NULL);
ac1889bf 2977 client->id = state->next_notification_client_id++;
ab0ee2ca 2978 CDS_INIT_LIST_HEAD(&client->condition_list);
882093ee
JR
2979 lttng_payload_init(&client->communication.inbound.payload);
2980 lttng_payload_init(&client->communication.outbound.payload);
01ea340e 2981 client->communication.inbound.expect_creds = true;
505b2d90 2982
14fa22f8
JG
2983 ret = client_reset_inbound_state(client);
2984 if (ret) {
2985 ERR("[notification-thread] Failed to reset client communication's inbound state");
e742b055 2986 ret = 0;
14fa22f8
JG
2987 goto error;
2988 }
ab0ee2ca
JG
2989
2990 ret = lttcomm_accept_unix_sock(state->notification_channel_socket);
2991 if (ret < 0) {
2992 ERR("[notification-thread] Failed to accept new notification channel client connection");
2993 ret = 0;
2994 goto error;
2995 }
2996
2997 client->socket = ret;
2998
2999 ret = socket_set_non_blocking(client->socket);
3000 if (ret) {
3001 ERR("[notification-thread] Failed to set new notification channel client connection socket as non-blocking");
3002 goto error;
3003 }
3004
3005 ret = lttcomm_setsockopt_creds_unix_sock(client->socket);
3006 if (ret < 0) {
3007 ERR("[notification-thread] Failed to set socket options on new notification channel client socket");
3008 ret = 0;
3009 goto error;
3010 }
3011
3012 ret = lttng_poll_add(&state->events, client->socket,
3013 LPOLLIN | LPOLLERR |
3014 LPOLLHUP | LPOLLRDHUP);
3015 if (ret < 0) {
3016 ERR("[notification-thread] Failed to add notification channel client socket to poll set");
3017 ret = 0;
3018 goto error;
3019 }
3020 DBG("[notification-thread] Added new notification channel client socket (%i) to poll set",
3021 client->socket);
3022
ab0ee2ca
JG
3023 rcu_read_lock();
3024 cds_lfht_add(state->client_socket_ht,
3025 hash_client_socket(client->socket),
3026 &client->client_socket_ht_node);
ac1889bf
JG
3027 cds_lfht_add(state->client_id_ht,
3028 hash_client_id(client->id),
3029 &client->client_id_ht_node);
ab0ee2ca
JG
3030 rcu_read_unlock();
3031
3032 return ret;
6c24d3fd 3033
ab0ee2ca
JG
3034error:
3035 notification_client_destroy(client, state);
3036 return ret;
3037}
3038
6c24d3fd
JG
3039/*
3040 * RCU read-lock must be held by the caller.
3041 * Client lock must _not_ be held by the caller.
3042 */
505b2d90
JG
3043static
3044int notification_thread_client_disconnect(
3045 struct notification_client *client,
ab0ee2ca 3046 struct notification_thread_state *state)
505b2d90
JG
3047{
3048 int ret;
3049 struct lttng_condition_list_element *condition_list_element, *tmp;
3050
3051 /* Acquire the client lock to disable its communication atomically. */
6c24d3fd 3052 pthread_mutex_lock(&client->lock);
505b2d90 3053 client->communication.active = false;
6c24d3fd
JG
3054 cds_lfht_del(state->client_socket_ht, &client->client_socket_ht_node);
3055 cds_lfht_del(state->client_id_ht, &client->client_id_ht_node);
3056 pthread_mutex_unlock(&client->lock);
3057
505b2d90
JG
3058 ret = lttng_poll_del(&state->events, client->socket);
3059 if (ret) {
3060 ERR("[notification-thread] Failed to remove client socket %d from poll set",
3061 client->socket);
3062 }
3063
505b2d90
JG
3064 /* Release all conditions to which the client was subscribed. */
3065 cds_list_for_each_entry_safe(condition_list_element, tmp,
3066 &client->condition_list, node) {
3067 (void) notification_thread_client_unsubscribe(client,
3068 condition_list_element->condition, state, NULL);
3069 }
3070
3071 /*
3072 * Client no longer accessible to other threads (through the
3073 * client lists).
3074 */
3075 notification_client_destroy(client, state);
3076 return ret;
3077}
3078
3079int handle_notification_thread_client_disconnect(
3080 int client_socket, struct notification_thread_state *state)
ab0ee2ca
JG
3081{
3082 int ret = 0;
3083 struct notification_client *client;
3084
3085 rcu_read_lock();
3086 DBG("[notification-thread] Closing client connection (socket fd = %i)",
3087 client_socket);
3088 client = get_client_from_socket(client_socket, state);
3089 if (!client) {
3090 /* Internal state corruption, fatal error. */
3091 ERR("[notification-thread] Unable to find client (socket fd = %i)",
3092 client_socket);
3093 ret = -1;
3094 goto end;
3095 }
3096
505b2d90 3097 ret = notification_thread_client_disconnect(client, state);
ab0ee2ca
JG
3098end:
3099 rcu_read_unlock();
3100 return ret;
3101}
3102
3103int handle_notification_thread_client_disconnect_all(
3104 struct notification_thread_state *state)
3105{
3106 struct cds_lfht_iter iter;
3107 struct notification_client *client;
3108 bool error_encoutered = false;
3109
3110 rcu_read_lock();
3111 DBG("[notification-thread] Closing all client connections");
3112 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
505b2d90 3113 client_socket_ht_node) {
ab0ee2ca
JG
3114 int ret;
3115
505b2d90
JG
3116 ret = notification_thread_client_disconnect(
3117 client, state);
ab0ee2ca
JG
3118 if (ret) {
3119 error_encoutered = true;
3120 }
3121 }
3122 rcu_read_unlock();
3123 return error_encoutered ? 1 : 0;
3124}
3125
3126int handle_notification_thread_trigger_unregister_all(
3127 struct notification_thread_state *state)
3128{
4149ace8 3129 bool error_occurred = false;
ab0ee2ca
JG
3130 struct cds_lfht_iter iter;
3131 struct lttng_trigger_ht_element *trigger_ht_element;
3132
763de384 3133 rcu_read_lock();
ab0ee2ca
JG
3134 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
3135 node) {
3136 int ret = handle_notification_thread_command_unregister_trigger(
3137 state, trigger_ht_element->trigger, NULL);
3138 if (ret) {
4149ace8 3139 error_occurred = true;
ab0ee2ca
JG
3140 }
3141 }
763de384 3142 rcu_read_unlock();
4149ace8 3143 return error_occurred ? -1 : 0;
ab0ee2ca
JG
3144}
3145
dd877ea6
JG
3146static
3147int client_handle_transmission_status(
3148 struct notification_client *client,
3149 enum client_transmission_status transmission_status,
3150 struct notification_thread_state *state)
3151{
3152 int ret = 0;
3153
3154 switch (transmission_status) {
3155 case CLIENT_TRANSMISSION_STATUS_COMPLETE:
3156 ret = lttng_poll_mod(&state->events, client->socket,
3157 CLIENT_POLL_MASK_IN);
3158 if (ret) {
3159 goto end;
3160 }
3161
dd877ea6
JG
3162 break;
3163 case CLIENT_TRANSMISSION_STATUS_QUEUED:
3164 /*
3165 * We want to be notified whenever there is buffer space
3166 * available to send the rest of the payload.
3167 */
3168 ret = lttng_poll_mod(&state->events, client->socket,
3169 CLIENT_POLL_MASK_IN_OUT);
3170 if (ret) {
3171 goto end;
3172 }
3173 break;
3174 case CLIENT_TRANSMISSION_STATUS_FAIL:
3175 ret = notification_thread_client_disconnect(client, state);
3176 if (ret) {
3177 goto end;
3178 }
3179 break;
3180 case CLIENT_TRANSMISSION_STATUS_ERROR:
3181 ret = -1;
3182 goto end;
3183 default:
3184 abort();
3185 }
3186end:
3187 return ret;
3188}
3189
505b2d90 3190/* Client lock must be acquired by caller. */
ab0ee2ca 3191static
dd877ea6 3192enum client_transmission_status client_flush_outgoing_queue(
f2b3ef9f 3193 struct notification_client *client)
ab0ee2ca
JG
3194{
3195 ssize_t ret;
3196 size_t to_send_count;
dd877ea6 3197 enum client_transmission_status status;
882093ee
JR
3198 struct lttng_payload_view pv = lttng_payload_view_from_payload(
3199 &client->communication.outbound.payload, 0, -1);
3200 const int fds_to_send_count =
3201 lttng_payload_view_get_fd_handle_count(&pv);
ab0ee2ca 3202
505b2d90
JG
3203 ASSERT_LOCKED(client->lock);
3204
f2b3ef9f
JG
3205 if (!client->communication.active) {
3206 status = CLIENT_TRANSMISSION_STATUS_FAIL;
3207 goto end;
3208 }
3209
882093ee
JR
3210 if (pv.buffer.size == 0) {
3211 /*
3212 * If both data and fds are equal to zero, we are in an invalid
3213 * state.
3214 */
3215 assert(fds_to_send_count != 0);
3216 goto send_fds;
3217 }
3218
3219 /* Send data. */
3220 to_send_count = pv.buffer.size;
ab0ee2ca
JG
3221 DBG("[notification-thread] Flushing client (socket fd = %i) outgoing queue",
3222 client->socket);
3223
3224 ret = lttcomm_send_unix_sock_non_block(client->socket,
882093ee 3225 pv.buffer.data,
ab0ee2ca 3226 to_send_count);
44c180ca 3227 if ((ret >= 0 && ret < to_send_count)) {
ab0ee2ca
JG
3228 DBG("[notification-thread] Client (socket fd = %i) outgoing queue could not be completely flushed",
3229 client->socket);
3230 to_send_count -= max(ret, 0);
3231
6f110534 3232 memmove(client->communication.outbound.payload.buffer.data,
882093ee
JR
3233 pv.buffer.data +
3234 pv.buffer.size - to_send_count,
ab0ee2ca
JG
3235 to_send_count);
3236 ret = lttng_dynamic_buffer_set_size(
882093ee 3237 &client->communication.outbound.payload.buffer,
ab0ee2ca
JG
3238 to_send_count);
3239 if (ret) {
3240 goto error;
3241 }
6c24d3fd 3242
dd877ea6 3243 status = CLIENT_TRANSMISSION_STATUS_QUEUED;
882093ee 3244 goto end;
ab0ee2ca 3245 } else if (ret < 0) {
6c24d3fd 3246 /* Generic error, disable the client's communication. */
dd877ea6 3247 ERR("[notification-thread] Failed to flush outgoing queue, disconnecting client (socket fd = %i)",
ab0ee2ca 3248 client->socket);
6c24d3fd 3249 client->communication.active = false;
dd877ea6 3250 status = CLIENT_TRANSMISSION_STATUS_FAIL;
882093ee 3251 goto end;
ab0ee2ca 3252 } else {
882093ee
JR
3253 /*
3254 * No error and flushed the queue completely.
3255 *
3256 * The payload buffer size is used later to
3257 * check if there is notifications queued. So albeit that the
3258 * direct caller knows that the transmission is complete, we
3259 * need to set the buffer size to zero.
3260 */
ab0ee2ca 3261 ret = lttng_dynamic_buffer_set_size(
882093ee 3262 &client->communication.outbound.payload.buffer, 0);
ab0ee2ca
JG
3263 if (ret) {
3264 goto error;
3265 }
882093ee 3266 }
6c24d3fd 3267
882093ee
JR
3268send_fds:
3269 /* No fds to send, transmission is complete. */
3270 if (fds_to_send_count == 0) {
dd877ea6 3271 status = CLIENT_TRANSMISSION_STATUS_COMPLETE;
882093ee 3272 goto end;
dd877ea6 3273 }
882093ee
JR
3274
3275 ret = lttcomm_send_payload_view_fds_unix_sock_non_block(
3276 client->socket, &pv);
3277 if (ret < 0) {
3278 /* Generic error, disable the client's communication. */
3279 ERR("[notification-thread] Failed to flush outgoing fds queue, disconnecting client (socket fd = %i)",
3280 client->socket);
3281 client->communication.active = false;
3282 status = CLIENT_TRANSMISSION_STATUS_FAIL;
3283 goto end;
3284 } else if (ret == 0) {
3285 /* Nothing could be sent. */
3286 status = CLIENT_TRANSMISSION_STATUS_QUEUED;
3287 } else {
3288 /* Fd passing is an all or nothing kind of thing. */
3289 status = CLIENT_TRANSMISSION_STATUS_COMPLETE;
3290 /*
3291 * The payload _fd_array count is used later to
3292 * check if there is notifications queued. So although the
3293 * direct caller knows that the transmission is complete, we
3294 * need to clear the _fd_array for the queuing check.
3295 */
3296 lttng_dynamic_pointer_array_clear(
3297 &client->communication.outbound.payload
3298 ._fd_handles);
3299 }
3300
f2b3ef9f 3301end:
882093ee
JR
3302 if (status == CLIENT_TRANSMISSION_STATUS_COMPLETE) {
3303 client->communication.outbound.queued_command_reply = false;
3304 client->communication.outbound.dropped_notification = false;
3305 lttng_payload_clear(&client->communication.outbound.payload);
3306 }
3307
f2b3ef9f 3308 return status;
ab0ee2ca 3309error:
f2b3ef9f 3310 return CLIENT_TRANSMISSION_STATUS_ERROR;
ab0ee2ca
JG
3311}
3312
882093ee
JR
3313static
3314bool client_has_outbound_data_left(
3315 const struct notification_client *client)
3316{
3317 const struct lttng_payload_view pv = lttng_payload_view_from_payload(
3318 &client->communication.outbound.payload, 0, -1);
3319 const bool has_data = pv.buffer.size != 0;
3320 const bool has_fds = lttng_payload_view_get_fd_handle_count(&pv);
3321
3322 return has_data || has_fds;
3323}
3324
6c24d3fd 3325/* Client lock must _not_ be held by the caller. */
ab0ee2ca
JG
3326static
3327int client_send_command_reply(struct notification_client *client,
3328 struct notification_thread_state *state,
3329 enum lttng_notification_channel_status status)
3330{
3331 int ret;
3332 struct lttng_notification_channel_command_reply reply = {
3333 .status = (int8_t) status,
3334 };
3335 struct lttng_notification_channel_message msg = {
3336 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY,
3337 .size = sizeof(reply),
3338 };
3339 char buffer[sizeof(msg) + sizeof(reply)];
f2b3ef9f 3340 enum client_transmission_status transmission_status;
ab0ee2ca 3341
6c24d3fd
JG
3342 memcpy(buffer, &msg, sizeof(msg));
3343 memcpy(buffer + sizeof(msg), &reply, sizeof(reply));
3344 DBG("[notification-thread] Send command reply (%i)", (int) status);
505b2d90 3345
6c24d3fd 3346 pthread_mutex_lock(&client->lock);
ab0ee2ca
JG
3347 if (client->communication.outbound.queued_command_reply) {
3348 /* Protocol error. */
6c24d3fd 3349 goto error_unlock;
ab0ee2ca
JG
3350 }
3351
ab0ee2ca
JG
3352 /* Enqueue buffer to outgoing queue and flush it. */
3353 ret = lttng_dynamic_buffer_append(
882093ee 3354 &client->communication.outbound.payload.buffer,
ab0ee2ca
JG
3355 buffer, sizeof(buffer));
3356 if (ret) {
6c24d3fd 3357 goto error_unlock;
ab0ee2ca
JG
3358 }
3359
f2b3ef9f 3360 transmission_status = client_flush_outgoing_queue(client);
882093ee
JR
3361
3362 if (client_has_outbound_data_left(client)) {
6c24d3fd
JG
3363 /* Queue could not be emptied. */
3364 client->communication.outbound.queued_command_reply = true;
3365 }
3366
3367 pthread_mutex_unlock(&client->lock);
f2b3ef9f
JG
3368 ret = client_handle_transmission_status(
3369 client, transmission_status, state);
ab0ee2ca
JG
3370 if (ret) {
3371 goto error;
3372 }
3373
ab0ee2ca 3374 return 0;
6c24d3fd
JG
3375error_unlock:
3376 pthread_mutex_unlock(&client->lock);
ab0ee2ca
JG
3377error:
3378 return -1;
3379}
3380
505b2d90
JG
3381static
3382int client_handle_message_unknown(struct notification_client *client,
3383 struct notification_thread_state *state)
3384{
3385 int ret;
505b2d90
JG
3386 /*
3387 * Receiving message header. The function will be called again
3388 * once the rest of the message as been received and can be
3389 * interpreted.
3390 */
3391 const struct lttng_notification_channel_message *msg;
3392
882093ee 3393 assert(sizeof(*msg) == client->communication.inbound.payload.buffer.size);
505b2d90 3394 msg = (const struct lttng_notification_channel_message *)
882093ee 3395 client->communication.inbound.payload.buffer.data;
505b2d90
JG
3396
3397 if (msg->size == 0 ||
3398 msg->size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) {
3399 ERR("[notification-thread] Invalid notification channel message: length = %u",
3400 msg->size);
3401 ret = -1;
3402 goto end;
3403 }
3404
3405 switch (msg->type) {
3406 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
3407 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
3408 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
3409 break;
3410 default:
3411 ret = -1;
3412 ERR("[notification-thread] Invalid notification channel message: unexpected message type");
3413 goto end;
3414 }
3415
3416 client->communication.inbound.bytes_to_receive = msg->size;
882093ee 3417 client->communication.inbound.fds_to_receive = msg->fds;
505b2d90
JG
3418 client->communication.inbound.msg_type =
3419 (enum lttng_notification_channel_message_type) msg->type;
3420 ret = lttng_dynamic_buffer_set_size(
882093ee
JR
3421 &client->communication.inbound.payload.buffer, msg->size);
3422
3423 /* msg is not valid anymore due to lttng_dynamic_buffer_set_size. */
3424 msg = NULL;
505b2d90 3425end:
505b2d90
JG
3426 return ret;
3427}
3428
3429static
3430int client_handle_message_handshake(struct notification_client *client,
3431 struct notification_thread_state *state)
3432{
3433 int ret;
3434 struct lttng_notification_channel_command_handshake *handshake_client;
3435 const struct lttng_notification_channel_command_handshake handshake_reply = {
3436 .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
3437 .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
3438 };
3439 const struct lttng_notification_channel_message msg_header = {
3440 .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE,
3441 .size = sizeof(handshake_reply),
3442 };
3443 enum lttng_notification_channel_status status =
3444 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
3445 char send_buffer[sizeof(msg_header) + sizeof(handshake_reply)];
505b2d90
JG
3446
3447 memcpy(send_buffer, &msg_header, sizeof(msg_header));
3448 memcpy(send_buffer + sizeof(msg_header), &handshake_reply,
3449 sizeof(handshake_reply));
3450
3451 handshake_client =
3452 (struct lttng_notification_channel_command_handshake *)
882093ee 3453 client->communication.inbound.payload.buffer
505b2d90
JG
3454 .data;
3455 client->major = handshake_client->major;
3456 client->minor = handshake_client->minor;
3457 if (!client->communication.inbound.creds_received) {
3458 ERR("[notification-thread] No credentials received from client");
3459 ret = -1;
3460 goto end;
3461 }
3462
3463 client->uid = LTTNG_SOCK_GET_UID_CRED(
3464 &client->communication.inbound.creds);
3465 client->gid = LTTNG_SOCK_GET_GID_CRED(
3466 &client->communication.inbound.creds);
3467 DBG("[notification-thread] Received handshake from client (uid = %u, gid = %u) with version %i.%i",
3468 client->uid, client->gid, (int) client->major,
3469 (int) client->minor);
3470
3471 if (handshake_client->major !=
3472 LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) {
3473 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNSUPPORTED_VERSION;
3474 }
3475
6c24d3fd
JG
3476 pthread_mutex_lock(&client->lock);
3477 /* Outgoing queue will be flushed when the command reply is sent. */
505b2d90 3478 ret = lttng_dynamic_buffer_append(
882093ee 3479 &client->communication.outbound.payload.buffer, send_buffer,
505b2d90
JG
3480 sizeof(send_buffer));
3481 if (ret) {
3482 ERR("[notification-thread] Failed to send protocol version to notification channel client");
6c24d3fd 3483 goto end_unlock;
505b2d90
JG
3484 }
3485
3486 client->validated = true;
3487 client->communication.active = true;
6c24d3fd 3488 pthread_mutex_unlock(&client->lock);
505b2d90 3489
6c24d3fd
JG
3490 /* Set reception state to receive the next message header. */
3491 ret = client_reset_inbound_state(client);
505b2d90 3492 if (ret) {
6c24d3fd 3493 ERR("[notification-thread] Failed to reset client communication's inbound state");
505b2d90
JG
3494 goto end;
3495 }
3496
6c24d3fd 3497 /* Flushes the outgoing queue. */
505b2d90
JG
3498 ret = client_send_command_reply(client, state, status);
3499 if (ret) {
3500 ERR("[notification-thread] Failed to send reply to notification channel client");
3501 goto end;
3502 }
3503
6c24d3fd
JG
3504 goto end;
3505end_unlock:
505b2d90 3506 pthread_mutex_unlock(&client->lock);
6c24d3fd 3507end:
505b2d90
JG
3508 return ret;
3509}
3510
3511static
3512int client_handle_message_subscription(
3513 struct notification_client *client,
3514 enum lttng_notification_channel_message_type msg_type,
3515 struct notification_thread_state *state)
3516{
3517 int ret;
3518 struct lttng_condition *condition;
3519 enum lttng_notification_channel_status status =
3520 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
3521 struct lttng_payload_view condition_view =
882093ee
JR
3522 lttng_payload_view_from_payload(
3523 &client->communication.inbound.payload,
505b2d90
JG
3524 0, -1);
3525 size_t expected_condition_size;
3526
6c24d3fd
JG
3527 /*
3528 * No need to lock client to sample the inbound state as the only
3529 * other thread accessing clients (action executor) only uses the
3530 * outbound state.
3531 */
882093ee 3532 expected_condition_size = client->communication.inbound.payload.buffer.size;
505b2d90
JG
3533 ret = lttng_condition_create_from_payload(&condition_view, &condition);
3534 if (ret != expected_condition_size) {
3535 ERR("[notification-thread] Malformed condition received from client");
3536 goto end;
3537 }
3538
3539 if (msg_type == LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE) {
3540 ret = notification_thread_client_subscribe(
3541 client, condition, state, &status);
3542 } else {
3543 ret = notification_thread_client_unsubscribe(
3544 client, condition, state, &status);
3545 }
505b2d90 3546
505b2d90 3547 if (ret) {
6c24d3fd 3548 goto end;
505b2d90
JG
3549 }
3550
3551 /* Set reception state to receive the next message header. */
3552 ret = client_reset_inbound_state(client);
3553 if (ret) {
3554 ERR("[notification-thread] Failed to reset client communication's inbound state");
6c24d3fd
JG
3555 goto end;
3556 }
3557
3558 ret = client_send_command_reply(client, state, status);
3559 if (ret) {
3560 ERR("[notification-thread] Failed to send reply to notification channel client");
3561 goto end;
505b2d90
JG
3562 }
3563
505b2d90
JG
3564end:
3565 return ret;
3566}
3567
ab0ee2ca
JG
3568static
3569int client_dispatch_message(struct notification_client *client,
3570 struct notification_thread_state *state)
3571{
3572 int ret = 0;
3573
3574 if (client->communication.inbound.msg_type !=
3575 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE &&
3576 client->communication.inbound.msg_type !=
3577 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN &&
3578 !client->validated) {
3579 WARN("[notification-thread] client attempted a command before handshake");
3580 ret = -1;
3581 goto end;
3582 }
3583
3584 switch (client->communication.inbound.msg_type) {
3585 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN:
3586 {
505b2d90 3587 ret = client_handle_message_unknown(client, state);
ab0ee2ca
JG
3588 break;
3589 }
3590 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
3591 {
505b2d90 3592 ret = client_handle_message_handshake(client, state);
ab0ee2ca
JG
3593 break;
3594 }
3595 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
3596 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
3597 {
505b2d90
JG
3598 ret = client_handle_message_subscription(client,
3599 client->communication.inbound.msg_type, state);
ab0ee2ca
JG
3600 break;
3601 }
3602 default:
3603 abort();
3604 }
3605end:
3606 return ret;
3607}
3608
3609/* Incoming data from client. */
3610int handle_notification_thread_client_in(
3611 struct notification_thread_state *state, int socket)
3612{
14fa22f8 3613 int ret = 0;
ab0ee2ca
JG
3614 struct notification_client *client;
3615 ssize_t recv_ret;
3616 size_t offset;
3617
98b5ff34 3618 rcu_read_lock();
ab0ee2ca
JG
3619 client = get_client_from_socket(socket, state);
3620 if (!client) {
3621 /* Internal error, abort. */
3622 ret = -1;
3623 goto end;
3624 }
3625
882093ee 3626 offset = client->communication.inbound.payload.buffer.size -
14fa22f8 3627 client->communication.inbound.bytes_to_receive;
01ea340e 3628 if (client->communication.inbound.expect_creds) {
ab0ee2ca 3629 recv_ret = lttcomm_recv_creds_unix_sock(socket,
882093ee 3630 client->communication.inbound.payload.buffer.data + offset,
ab0ee2ca
JG
3631 client->communication.inbound.bytes_to_receive,
3632 &client->communication.inbound.creds);
3633 if (recv_ret > 0) {
01ea340e 3634 client->communication.inbound.expect_creds = false;
ab0ee2ca
JG
3635 client->communication.inbound.creds_received = true;
3636 }
3637 } else {
3638 recv_ret = lttcomm_recv_unix_sock_non_block(socket,
882093ee 3639 client->communication.inbound.payload.buffer.data + offset,
ab0ee2ca
JG
3640 client->communication.inbound.bytes_to_receive);
3641 }
505b2d90
JG
3642 if (recv_ret >= 0) {
3643 client->communication.inbound.bytes_to_receive -= recv_ret;
882093ee
JR
3644 } else {
3645 goto error_disconnect_client;
505b2d90 3646 }
6c24d3fd 3647
882093ee
JR
3648 if (client->communication.inbound.bytes_to_receive != 0) {
3649 /* Message incomplete wait for more data. */
3650 ret = 0;
3651 goto end;
ab0ee2ca
JG
3652 }
3653
882093ee
JR
3654 assert(client->communication.inbound.bytes_to_receive == 0);
3655
3656 /* Receive fds. */
3657 if (client->communication.inbound.fds_to_receive != 0) {
3658 ret = lttcomm_recv_payload_fds_unix_sock_non_block(
3659 client->socket,
3660 client->communication.inbound.fds_to_receive,
3661 &client->communication.inbound.payload);
3662 if (ret > 0) {
ab0ee2ca 3663 /*
882093ee
JR
3664 * Fds received. non blocking fds passing is all
3665 * or nothing.
ab0ee2ca 3666 */
882093ee
JR
3667 ssize_t expected_size;
3668
3669 expected_size = sizeof(int) *
3670 client->communication.inbound
3671 .fds_to_receive;
3672 assert(ret == expected_size);
3673 client->communication.inbound.fds_to_receive = 0;
3674 } else if (ret == 0) {
3675 /* Received nothing. */
3676 ret = 0;
3677 goto end;
3678 } else {
ab0ee2ca
JG
3679 goto error_disconnect_client;
3680 }
ab0ee2ca 3681 }
882093ee
JR
3682
3683 /* At this point the message is complete.*/
3684 assert(client->communication.inbound.bytes_to_receive == 0 &&
3685 client->communication.inbound.fds_to_receive == 0);
3686 ret = client_dispatch_message(client, state);
3687 if (ret) {
3688 /*
3689 * Only returns an error if this client must be
3690 * disconnected.
3691 */
3692 goto error_disconnect_client;
3693 }
3694
ab0ee2ca 3695end:
98b5ff34 3696 rcu_read_unlock();
ab0ee2ca 3697 return ret;
882093ee 3698
ab0ee2ca 3699error_disconnect_client:
505b2d90 3700 ret = notification_thread_client_disconnect(client, state);
98b5ff34 3701 goto end;
ab0ee2ca
JG
3702}
3703
3704/* Client ready to receive outgoing data. */
3705int handle_notification_thread_client_out(
3706 struct notification_thread_state *state, int socket)
3707{
3708 int ret;
3709 struct notification_client *client;
f2b3ef9f 3710 enum client_transmission_status transmission_status;
ab0ee2ca 3711
98b5ff34 3712 rcu_read_lock();
ab0ee2ca
JG
3713 client = get_client_from_socket(socket, state);
3714 if (!client) {
3715 /* Internal error, abort. */
3716 ret = -1;
3717 goto end;
3718 }
3719
505b2d90 3720 pthread_mutex_lock(&client->lock);
f2b3ef9f 3721 transmission_status = client_flush_outgoing_queue(client);
6c24d3fd
JG
3722 pthread_mutex_unlock(&client->lock);
3723
f2b3ef9f
JG
3724 ret = client_handle_transmission_status(
3725 client, transmission_status, state);
ab0ee2ca
JG
3726 if (ret) {
3727 goto end;
3728 }
3729end:
98b5ff34 3730 rcu_read_unlock();
ab0ee2ca
JG
3731 return ret;
3732}
3733
3734static
e8360425
JD
3735bool evaluate_buffer_usage_condition(const struct lttng_condition *condition,
3736 const struct channel_state_sample *sample,
3737 uint64_t buffer_capacity)
ab0ee2ca
JG
3738{
3739 bool result = false;
3740 uint64_t threshold;
3741 enum lttng_condition_type condition_type;
e8360425 3742 const struct lttng_condition_buffer_usage *use_condition = container_of(
ab0ee2ca
JG
3743 condition, struct lttng_condition_buffer_usage,
3744 parent);
3745
ab0ee2ca
JG
3746 if (use_condition->threshold_bytes.set) {
3747 threshold = use_condition->threshold_bytes.value;
3748 } else {
3749 /*
3750 * Threshold was expressed as a ratio.
3751 *
3752 * TODO the threshold (in bytes) of conditions expressed
3753 * as a ratio of total buffer size could be cached to
3754 * forego this double-multiplication or it could be performed
3755 * as fixed-point math.
3756 *
d49487dc 3757 * Note that caching should accommodates the case where the
ab0ee2ca
JG
3758 * condition applies to multiple channels (i.e. don't assume
3759 * that all channels matching my_chann* have the same size...)
3760 */
3761 threshold = (uint64_t) (use_condition->threshold_ratio.value *
3762 (double) buffer_capacity);
3763 }
3764
3765 condition_type = lttng_condition_get_type(condition);
3766 if (condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW) {
3767 DBG("[notification-thread] Low buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
3768 threshold, sample->highest_usage);
3769
3770 /*
3771 * The low condition should only be triggered once _all_ of the
3772 * streams in a channel have gone below the "low" threshold.
3773 */
3774 if (sample->highest_usage <= threshold) {
3775 result = true;
3776 }
3777 } else {
3778 DBG("[notification-thread] High buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
3779 threshold, sample->highest_usage);
3780
3781 /*
3782 * For high buffer usage scenarios, we want to trigger whenever
3783 * _any_ of the streams has reached the "high" threshold.
3784 */
3785 if (sample->highest_usage >= threshold) {
3786 result = true;
3787 }
3788 }
e8360425 3789
ab0ee2ca
JG
3790 return result;
3791}
3792
3793static
e8360425
JD
3794bool evaluate_session_consumed_size_condition(
3795 const struct lttng_condition *condition,
3796 uint64_t session_consumed_size)
3797{
3798 uint64_t threshold;
3799 const struct lttng_condition_session_consumed_size *size_condition =
3800 container_of(condition,
3801 struct lttng_condition_session_consumed_size,
3802 parent);
3803
3804 threshold = size_condition->consumed_threshold_bytes.value;
3805 DBG("[notification-thread] Session consumed size condition being evaluated: threshold = %" PRIu64 ", current size = %" PRIu64,
3806 threshold, session_consumed_size);
3807 return session_consumed_size >= threshold;
3808}
3809
3810static
ed327204 3811int evaluate_buffer_condition(const struct lttng_condition *condition,
ab0ee2ca 3812 struct lttng_evaluation **evaluation,
e8360425
JD
3813 const struct notification_thread_state *state,
3814 const struct channel_state_sample *previous_sample,
3815 const struct channel_state_sample *latest_sample,
3816 uint64_t previous_session_consumed_total,
3817 uint64_t latest_session_consumed_total,
3818 struct channel_info *channel_info)
ab0ee2ca
JG
3819{
3820 int ret = 0;
3821 enum lttng_condition_type condition_type;
e8360425
JD
3822 const bool previous_sample_available = !!previous_sample;
3823 bool previous_sample_result = false;
ab0ee2ca
JG
3824 bool latest_sample_result;
3825
3826 condition_type = lttng_condition_get_type(condition);
ab0ee2ca 3827
e8360425
JD
3828 switch (condition_type) {
3829 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
3830 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
3831 if (caa_likely(previous_sample_available)) {
3832 previous_sample_result =
3833 evaluate_buffer_usage_condition(condition,
3834 previous_sample, channel_info->capacity);
3835 }
3836 latest_sample_result = evaluate_buffer_usage_condition(
3837 condition, latest_sample,
3838 channel_info->capacity);
3839 break;
3840 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
3841 if (caa_likely(previous_sample_available)) {
3842 previous_sample_result =
3843 evaluate_session_consumed_size_condition(
3844 condition,
3845 previous_session_consumed_total);
3846 }
3847 latest_sample_result =
3848 evaluate_session_consumed_size_condition(
3849 condition,
3850 latest_session_consumed_total);
3851 break;
3852 default:
3853 /* Unknown condition type; internal error. */
3854 abort();
3855 }
ab0ee2ca
JG
3856
3857 if (!latest_sample_result ||
3858 (previous_sample_result == latest_sample_result)) {
3859 /*
3860 * Only trigger on a condition evaluation transition.
3861 *
3862 * NOTE: This edge-triggered logic may not be appropriate for
3863 * future condition types.
3864 */
3865 goto end;
3866 }
3867
e8360425
JD
3868 if (!evaluation || !latest_sample_result) {
3869 goto end;
3870 }
3871
3872 switch (condition_type) {
3873 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
3874 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
ab0ee2ca
JG
3875 *evaluation = lttng_evaluation_buffer_usage_create(
3876 condition_type,
3877 latest_sample->highest_usage,
e8360425
JD
3878 channel_info->capacity);
3879 break;
3880 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
3881 *evaluation = lttng_evaluation_session_consumed_size_create(
e8360425
JD
3882 latest_session_consumed_total);
3883 break;
3884 default:
3885 abort();
3886 }
3887
3888 if (!*evaluation) {
3889 ret = -1;
3890 goto end;
ab0ee2ca
JG
3891 }
3892end:
3893 return ret;
3894}
3895
3896static
f2b3ef9f 3897int client_notification_overflow(struct notification_client *client)
ab0ee2ca 3898{
f2b3ef9f
JG
3899 int ret = 0;
3900 const struct lttng_notification_channel_message msg = {
ab0ee2ca 3901 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED,
ab0ee2ca
JG
3902 };
3903
505b2d90
JG
3904 ASSERT_LOCKED(client->lock);
3905
f2b3ef9f
JG
3906 DBG("Dropping notification addressed to client (socket fd = %i)",
3907 client->socket);
3908 if (client->communication.outbound.dropped_notification) {
3909 /*
3910 * The client already has a "notification dropped" message
3911 * in its outgoing queue. Nothing to do since all
3912 * of those messages are coalesced.
3913 */
3914 goto end;
3915 }
3916
3917 client->communication.outbound.dropped_notification = true;
ab0ee2ca 3918 ret = lttng_dynamic_buffer_append(
882093ee 3919 &client->communication.outbound.payload.buffer, &msg,
ab0ee2ca 3920 sizeof(msg));
f2b3ef9f
JG
3921 if (ret) {
3922 PERROR("Failed to enqueue \"dropped notification\" message in client's (socket fd = %i) outgoing queue",
3923 client->socket);
3924 }
3925end:
ab0ee2ca
JG
3926 return ret;
3927}
3928
f2b3ef9f
JG
3929static int client_handle_transmission_status_wrapper(
3930 struct notification_client *client,
3931 enum client_transmission_status status,
3932 void *user_data)
3933{
3934 return client_handle_transmission_status(client, status,
3935 (struct notification_thread_state *) user_data);
3936}
3937
3938static
3939int send_evaluation_to_clients(const struct lttng_trigger *trigger,
3940 const struct lttng_evaluation *evaluation,
3941 struct notification_client_list* client_list,
3942 struct notification_thread_state *state,
3943 uid_t object_uid, gid_t object_gid)
3944{
ff588497
JR
3945 const struct lttng_credentials creds = {
3946 .uid = LTTNG_OPTIONAL_INIT_VALUE(object_uid),
3947 .gid = LTTNG_OPTIONAL_INIT_VALUE(object_gid),
3948 };
3949
f2b3ef9f
JG
3950 return notification_client_list_send_evaluation(client_list,
3951 lttng_trigger_get_const_condition(trigger), evaluation,
3952 lttng_trigger_get_credentials(trigger),
ff588497 3953 &creds,
f2b3ef9f
JG
3954 client_handle_transmission_status_wrapper, state);
3955}
3956
f37d0f86
JG
3957/*
3958 * Permission checks relative to notification channel clients are performed
3959 * here. Notice how object, client, and trigger credentials are involved in
3960 * this check.
3961 *
3962 * The `object` credentials are the credentials associated with the "subject"
3963 * of a condition. For instance, a `rotation completed` condition applies
3964 * to a session. When that condition is met, it will produce an evaluation
3965 * against a session. Hence, in this case, the `object` credentials are the
3966 * credentials of the "subject" session.
3967 *
3968 * The `trigger` credentials are the credentials of the user that registered the
3969 * trigger.
3970 *
3971 * The `client` credentials are the credentials of the user that created a given
3972 * notification channel.
3973 *
3974 * In terms of visibility, it is expected that non-privilieged users can only
3975 * register triggers against "their" objects (their own sessions and
3976 * applications they are allowed to interact with). They can then open a
3977 * notification channel and subscribe to notifications associated with those
3978 * triggers.
3979 *
3980 * As for privilieged users, they can register triggers against the objects of
3981 * other users. They can then subscribe to the notifications associated to their
3982 * triggers. Privilieged users _can't_ subscribe to the notifications of
3983 * triggers owned by other users; they must create their own triggers.
3984 *
3985 * This is more a concern of usability than security. It would be difficult for
3986 * a root user reliably subscribe to a specific set of conditions without
3987 * interference from external users (those could, for instance, unregister
3988 * their triggers).
3989 */
f2b3ef9f
JG
3990LTTNG_HIDDEN
3991int notification_client_list_send_evaluation(
3992 struct notification_client_list *client_list,
3993 const struct lttng_condition *condition,
9b63a4aa 3994 const struct lttng_evaluation *evaluation,
f2b3ef9f
JG
3995 const struct lttng_credentials *trigger_creds,
3996 const struct lttng_credentials *source_object_creds,
3997 report_client_transmission_result_cb client_report,
3998 void *user_data)
ab0ee2ca
JG
3999{
4000 int ret = 0;
c0a66c84 4001 struct lttng_payload msg_payload;
ab0ee2ca 4002 struct notification_client_list_element *client_list_element, *tmp;
9b63a4aa 4003 const struct lttng_notification notification = {
f2b3ef9f 4004 .condition = (struct lttng_condition *) condition,
9b63a4aa
JG
4005 .evaluation = (struct lttng_evaluation *) evaluation,
4006 };
3647288f
JG
4007 struct lttng_notification_channel_message msg_header = {
4008 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION,
4009 };
ab0ee2ca 4010
c0a66c84 4011 lttng_payload_init(&msg_payload);
ab0ee2ca 4012
c0a66c84 4013 ret = lttng_dynamic_buffer_append(&msg_payload.buffer, &msg_header,
3647288f 4014 sizeof(msg_header));
ab0ee2ca
JG
4015 if (ret) {
4016 goto end;
4017 }
4018
c0a66c84 4019 ret = lttng_notification_serialize(&notification, &msg_payload);
ab0ee2ca 4020 if (ret) {
ab0ee2ca
JG
4021 ERR("[notification-thread] Failed to serialize notification");
4022 ret = -1;
4023 goto end;
4024 }
4025
3647288f 4026 /* Update payload size. */
505b2d90
JG
4027 ((struct lttng_notification_channel_message *) msg_payload.buffer.data)
4028 ->size = (uint32_t)(
4029 msg_payload.buffer.size - sizeof(msg_header));
3647288f 4030
882093ee
JR
4031 /* Update the payload number of fds. */
4032 {
4033 const struct lttng_payload_view pv = lttng_payload_view_from_payload(
4034 &msg_payload, 0, -1);
4035
4036 ((struct lttng_notification_channel_message *)
4037 msg_payload.buffer.data)->fds = (uint32_t)
4038 lttng_payload_view_get_fd_handle_count(&pv);
4039 }
4040
505b2d90 4041 pthread_mutex_lock(&client_list->lock);
ab0ee2ca
JG
4042 cds_list_for_each_entry_safe(client_list_element, tmp,
4043 &client_list->list, node) {
f2b3ef9f 4044 enum client_transmission_status transmission_status;
ab0ee2ca
JG
4045 struct notification_client *client =
4046 client_list_element->client;
4047
505b2d90
JG
4048 ret = 0;
4049 pthread_mutex_lock(&client->lock);
6c24d3fd
JG
4050 if (!client->communication.active) {
4051 /*
4052 * Skip inactive client (protocol error or
4053 * disconnecting).
4054 */
4055 DBG("Skipping client at it is marked as inactive");
4056 goto skip_client;
4057 }
4058
f2b3ef9f 4059 if (source_object_creds) {
ff588497
JR
4060 if (client->uid != lttng_credentials_get_uid(source_object_creds) &&
4061 client->gid != lttng_credentials_get_gid(source_object_creds) &&
f2b3ef9f
JG
4062 client->uid != 0) {
4063 /*
4064 * Client is not allowed to monitor this
4065 * object.
4066 */
4067 DBG("[notification-thread] Skipping client at it does not have the object permission to receive notification for this trigger");
6c24d3fd 4068 goto skip_client;
f2b3ef9f 4069 }
90843f49
JR
4070 }
4071
ff588497 4072 if (client->uid != lttng_credentials_get_uid(trigger_creds) && client->gid != lttng_credentials_get_gid(trigger_creds)) {
90843f49 4073 DBG("[notification-thread] Skipping client at it does not have the permission to receive notification for this trigger");
6c24d3fd 4074 goto skip_client;
ab0ee2ca
JG
4075 }
4076
4077 DBG("[notification-thread] Sending notification to client (fd = %i, %zu bytes)",
c0a66c84 4078 client->socket, msg_payload.buffer.size);
882093ee
JR
4079
4080 if (client_has_outbound_data_left(client)) {
ab0ee2ca
JG
4081 /*
4082 * Outgoing data is already buffered for this client;
4083 * drop the notification and enqueue a "dropped
4084 * notification" message if this is the first dropped
4085 * notification since the socket spilled-over to the
4086 * queue.
4087 */
f2b3ef9f
JG
4088 ret = client_notification_overflow(client);
4089 if (ret) {
6c24d3fd
JG
4090 /* Fatal error. */
4091 goto skip_client;
ab0ee2ca 4092 }
ab0ee2ca
JG
4093 }
4094
882093ee 4095 ret = lttng_payload_copy(&msg_payload, &client->communication.outbound.payload);
ab0ee2ca 4096 if (ret) {
6c24d3fd
JG
4097 /* Fatal error. */
4098 goto skip_client;
ab0ee2ca
JG
4099 }
4100
f2b3ef9f 4101 transmission_status = client_flush_outgoing_queue(client);
6c24d3fd 4102 pthread_mutex_unlock(&client->lock);
f2b3ef9f 4103 ret = client_report(client, transmission_status, user_data);
ab0ee2ca 4104 if (ret) {
6c24d3fd
JG
4105 /* Fatal error. */
4106 goto end_unlock_list;
505b2d90 4107 }
6c24d3fd
JG
4108
4109 continue;
4110
4111skip_client:
505b2d90
JG
4112 pthread_mutex_unlock(&client->lock);
4113 if (ret) {
6c24d3fd 4114 /* Fatal error. */
505b2d90 4115 goto end_unlock_list;
ab0ee2ca
JG
4116 }
4117 }
4118 ret = 0;
505b2d90
JG
4119
4120end_unlock_list:
4121 pthread_mutex_unlock(&client_list->lock);
ab0ee2ca 4122end:
c0a66c84 4123 lttng_payload_reset(&msg_payload);
ab0ee2ca
JG
4124 return ret;
4125}
4126
4127int handle_notification_thread_channel_sample(
4128 struct notification_thread_state *state, int pipe,
4129 enum lttng_domain_type domain)
4130{
4131 int ret = 0;
4132 struct lttcomm_consumer_channel_monitor_msg sample_msg;
ab0ee2ca
JG
4133 struct channel_info *channel_info;
4134 struct cds_lfht_node *node;
4135 struct cds_lfht_iter iter;
4136 struct lttng_channel_trigger_list *trigger_list;
4137 struct lttng_trigger_list_element *trigger_list_element;
4138 bool previous_sample_available = false;
e8360425
JD
4139 struct channel_state_sample previous_sample, latest_sample;
4140 uint64_t previous_session_consumed_total, latest_session_consumed_total;
f2b3ef9f 4141 struct lttng_credentials channel_creds;
ab0ee2ca
JG
4142
4143 /*
4144 * The monitoring pipe only holds messages smaller than PIPE_BUF,
4145 * ensuring that read/write of sampling messages are atomic.
4146 */
7c2551ef 4147 ret = lttng_read(pipe, &sample_msg, sizeof(sample_msg));
ab0ee2ca
JG
4148 if (ret != sizeof(sample_msg)) {
4149 ERR("[notification-thread] Failed to read from monitoring pipe (fd = %i)",
4150 pipe);
4151 ret = -1;
4152 goto end;
4153 }
4154
4155 ret = 0;
4156 latest_sample.key.key = sample_msg.key;
4157 latest_sample.key.domain = domain;
4158 latest_sample.highest_usage = sample_msg.highest;
4159 latest_sample.lowest_usage = sample_msg.lowest;
e8360425 4160 latest_sample.channel_total_consumed = sample_msg.total_consumed;
ab0ee2ca
JG
4161
4162 rcu_read_lock();
4163
4164 /* Retrieve the channel's informations */
4165 cds_lfht_lookup(state->channels_ht,
4166 hash_channel_key(&latest_sample.key),
4167 match_channel_info,
4168 &latest_sample.key,
4169 &iter);
4170 node = cds_lfht_iter_get_node(&iter);
e0b5f87b 4171 if (caa_unlikely(!node)) {
ab0ee2ca
JG
4172 /*
4173 * Not an error since the consumer can push a sample to the pipe
4174 * and the rest of the session daemon could notify us of the
4175 * channel's destruction before we get a chance to process that
4176 * sample.
4177 */
4178 DBG("[notification-thread] Received a sample for an unknown channel from consumerd, key = %" PRIu64 " in %s domain",
4179 latest_sample.key.key,
4180 domain == LTTNG_DOMAIN_KERNEL ? "kernel" :
4181 "user space");
4182 goto end_unlock;
4183 }
4184 channel_info = caa_container_of(node, struct channel_info,
4185 channels_ht_node);
e8360425 4186 DBG("[notification-thread] Handling channel sample for channel %s (key = %" PRIu64 ") in session %s (highest usage = %" PRIu64 ", lowest usage = %" PRIu64", total consumed = %" PRIu64")",
8abe313a 4187 channel_info->name,
ab0ee2ca 4188 latest_sample.key.key,
8abe313a 4189 channel_info->session_info->name,
ab0ee2ca 4190 latest_sample.highest_usage,
e8360425
JD
4191 latest_sample.lowest_usage,
4192 latest_sample.channel_total_consumed);
4193
4194 previous_session_consumed_total =
4195 channel_info->session_info->consumed_data_size;
ab0ee2ca
JG
4196
4197 /* Retrieve the channel's last sample, if it exists, and update it. */
4198 cds_lfht_lookup(state->channel_state_ht,
4199 hash_channel_key(&latest_sample.key),
4200 match_channel_state_sample,
4201 &latest_sample.key,
4202 &iter);
4203 node = cds_lfht_iter_get_node(&iter);
e0b5f87b 4204 if (caa_likely(node)) {
ab0ee2ca
JG
4205 struct channel_state_sample *stored_sample;
4206
4207 /* Update the sample stored. */
4208 stored_sample = caa_container_of(node,
4209 struct channel_state_sample,
4210 channel_state_ht_node);
e8360425 4211
ab0ee2ca
JG
4212 memcpy(&previous_sample, stored_sample,
4213 sizeof(previous_sample));
4214 stored_sample->highest_usage = latest_sample.highest_usage;
4215 stored_sample->lowest_usage = latest_sample.lowest_usage;
0f0479d7 4216 stored_sample->channel_total_consumed = latest_sample.channel_total_consumed;
ab0ee2ca 4217 previous_sample_available = true;
e8360425
JD
4218
4219 latest_session_consumed_total =
4220 previous_session_consumed_total +
4221 (latest_sample.channel_total_consumed - previous_sample.channel_total_consumed);
ab0ee2ca
JG
4222 } else {
4223 /*
4224 * This is the channel's first sample, allocate space for and
4225 * store the new sample.
4226 */
4227 struct channel_state_sample *stored_sample;
4228
4229 stored_sample = zmalloc(sizeof(*stored_sample));
4230 if (!stored_sample) {
4231 ret = -1;
4232 goto end_unlock;
4233 }
4234
4235 memcpy(stored_sample, &latest_sample, sizeof(*stored_sample));
4236 cds_lfht_node_init(&stored_sample->channel_state_ht_node);
4237 cds_lfht_add(state->channel_state_ht,
4238 hash_channel_key(&stored_sample->key),
4239 &stored_sample->channel_state_ht_node);
e8360425
JD
4240
4241 latest_session_consumed_total =
4242 previous_session_consumed_total +
4243 latest_sample.channel_total_consumed;
ab0ee2ca
JG
4244 }
4245
e8360425
JD
4246 channel_info->session_info->consumed_data_size =
4247 latest_session_consumed_total;
4248
ab0ee2ca
JG
4249 /* Find triggers associated with this channel. */
4250 cds_lfht_lookup(state->channel_triggers_ht,
4251 hash_channel_key(&latest_sample.key),
4252 match_channel_trigger_list,
4253 &latest_sample.key,
4254 &iter);
4255 node = cds_lfht_iter_get_node(&iter);
e0b5f87b 4256 if (caa_likely(!node)) {
ab0ee2ca
JG
4257 goto end_unlock;
4258 }
4259
f2b3ef9f 4260 channel_creds = (typeof(channel_creds)) {
ff588497
JR
4261 .uid = LTTNG_OPTIONAL_INIT_VALUE(channel_info->session_info->uid),
4262 .gid = LTTNG_OPTIONAL_INIT_VALUE(channel_info->session_info->gid),
f2b3ef9f
JG
4263 };
4264
ab0ee2ca
JG
4265 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
4266 channel_triggers_ht_node);
4267 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
e742b055 4268 node) {
9b63a4aa 4269 const struct lttng_condition *condition;
f2b3ef9f 4270 struct lttng_trigger *trigger;
505b2d90 4271 struct notification_client_list *client_list = NULL;
ab0ee2ca 4272 struct lttng_evaluation *evaluation = NULL;
f2b3ef9f 4273 enum action_executor_status executor_status;
ab0ee2ca 4274
505b2d90 4275 ret = 0;
ab0ee2ca 4276 trigger = trigger_list_element->trigger;
9b63a4aa 4277 condition = lttng_trigger_get_const_condition(trigger);
ab0ee2ca 4278 assert(condition);
ab0ee2ca
JG
4279
4280 /*
4281 * Check if any client is subscribed to the result of this
4282 * evaluation.
4283 */
ed327204 4284 client_list = get_client_list_from_condition(state, condition);
ab0ee2ca 4285
ed327204 4286 ret = evaluate_buffer_condition(condition, &evaluation, state,
ab0ee2ca 4287 previous_sample_available ? &previous_sample : NULL,
e8360425
JD
4288 &latest_sample,
4289 previous_session_consumed_total,
4290 latest_session_consumed_total,
4291 channel_info);
4292 if (caa_unlikely(ret)) {
505b2d90 4293 goto put_list;
ab0ee2ca
JG
4294 }
4295
e8360425 4296 if (caa_likely(!evaluation)) {
505b2d90 4297 goto put_list;
ab0ee2ca
JG
4298 }
4299
f8522f5c
JR
4300 if (!lttng_trigger_should_fire(trigger)) {
4301 goto put_list;
4302 }
4303
4304 lttng_trigger_fire(trigger);
4305
f2b3ef9f
JG
4306 /*
4307 * Ownership of `evaluation` transferred to the action executor
4308 * no matter the result.
4309 */
4310 executor_status = action_executor_enqueue(state->executor,
4311 trigger, evaluation, &channel_creds,
4312 client_list);
4313 evaluation = NULL;
4314 switch (executor_status) {
4315 case ACTION_EXECUTOR_STATUS_OK:
4316 break;
4317 case ACTION_EXECUTOR_STATUS_ERROR:
4318 case ACTION_EXECUTOR_STATUS_INVALID:
4319 /*
4320 * TODO Add trigger identification (name/id) when
4321 * it is added to the API.
4322 */
4323 ERR("Fatal error occurred while enqueuing action associated with buffer-condition trigger");
4324 ret = -1;
4325 goto put_list;
4326 case ACTION_EXECUTOR_STATUS_OVERFLOW:
4327 /*
4328 * TODO Add trigger identification (name/id) when
4329 * it is added to the API.
4330 *
4331 * Not a fatal error.
4332 */
4333 WARN("No space left when enqueuing action associated with buffer-condition trigger");
4334 ret = 0;
4335 goto put_list;
4336 default:
4337 abort();
4338 }
4339
505b2d90
JG
4340put_list:
4341 notification_client_list_put(client_list);
e0b5f87b 4342 if (caa_unlikely(ret)) {
505b2d90 4343 break;
ab0ee2ca
JG
4344 }
4345 }
4346end_unlock:
4347 rcu_read_unlock();
4348end:
4349 return ret;
4350}
This page took 0.283665 seconds and 4 git commands to generate.