Add UST namespace contexts
[lttng-tools.git] / src / bin / lttng-sessiond / notification-thread-events.c
CommitLineData
ab0ee2ca
JG
1/*
2 * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18#define _LGPL_SOURCE
19#include <urcu.h>
20#include <urcu/rculfhash.h>
21
ab0ee2ca
JG
22#include <common/defaults.h>
23#include <common/error.h>
24#include <common/futex.h>
25#include <common/unix.h>
26#include <common/dynamic-buffer.h>
27#include <common/hashtable/utils.h>
28#include <common/sessiond-comm/sessiond-comm.h>
29#include <common/macros.h>
30#include <lttng/condition/condition.h>
9b63a4aa 31#include <lttng/action/action-internal.h>
ab0ee2ca
JG
32#include <lttng/notification/notification-internal.h>
33#include <lttng/condition/condition-internal.h>
34#include <lttng/condition/buffer-usage-internal.h>
e8360425 35#include <lttng/condition/session-consumed-size-internal.h>
ea9a44f0 36#include <lttng/condition/session-rotation-internal.h>
ab0ee2ca 37#include <lttng/notification/channel-internal.h>
1da26331 38
ab0ee2ca
JG
39#include <time.h>
40#include <unistd.h>
41#include <assert.h>
42#include <inttypes.h>
d53ea4e4 43#include <fcntl.h>
ab0ee2ca 44
1da26331
JG
45#include "notification-thread.h"
46#include "notification-thread-events.h"
47#include "notification-thread-commands.h"
48#include "lttng-sessiond.h"
49#include "kernel.h"
50
ab0ee2ca
JG
51#define CLIENT_POLL_MASK_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
52#define CLIENT_POLL_MASK_IN_OUT (CLIENT_POLL_MASK_IN | LPOLLOUT)
53
51eab943
JG
54enum lttng_object_type {
55 LTTNG_OBJECT_TYPE_UNKNOWN,
56 LTTNG_OBJECT_TYPE_NONE,
57 LTTNG_OBJECT_TYPE_CHANNEL,
58 LTTNG_OBJECT_TYPE_SESSION,
59};
60
ab0ee2ca 61struct lttng_trigger_list_element {
9e0bb80e
JG
62 /* No ownership of the trigger object is assumed. */
63 const struct lttng_trigger *trigger;
ab0ee2ca
JG
64 struct cds_list_head node;
65};
66
67struct lttng_channel_trigger_list {
68 struct channel_key channel_key;
ea9a44f0 69 /* List of struct lttng_trigger_list_element. */
ab0ee2ca 70 struct cds_list_head list;
ea9a44f0 71 /* Node in the channel_triggers_ht */
ab0ee2ca 72 struct cds_lfht_node channel_triggers_ht_node;
83b934ad
MD
73 /* call_rcu delayed reclaim. */
74 struct rcu_head rcu_node;
ab0ee2ca
JG
75};
76
ea9a44f0
JG
77/*
78 * List of triggers applying to a given session.
79 *
80 * See:
81 * - lttng_session_trigger_list_create()
82 * - lttng_session_trigger_list_build()
83 * - lttng_session_trigger_list_destroy()
84 * - lttng_session_trigger_list_add()
85 */
86struct lttng_session_trigger_list {
87 /*
88 * Not owned by this; points to the session_info structure's
89 * session name.
90 */
91 const char *session_name;
92 /* List of struct lttng_trigger_list_element. */
93 struct cds_list_head list;
94 /* Node in the session_triggers_ht */
95 struct cds_lfht_node session_triggers_ht_node;
96 /*
97 * Weak reference to the notification system's session triggers
98 * hashtable.
99 *
100 * The session trigger list structure structure is owned by
101 * the session's session_info.
102 *
103 * The session_info is kept alive the the channel_infos holding a
104 * reference to it (reference counting). When those channels are
105 * destroyed (at runtime or on teardown), the reference they hold
106 * to the session_info are released. On destruction of session_info,
107 * session_info_destroy() will remove the list of triggers applying
108 * to this session from the notification system's state.
109 *
110 * This implies that the session_triggers_ht must be destroyed
111 * after the channels.
112 */
113 struct cds_lfht *session_triggers_ht;
114 /* Used for delayed RCU reclaim. */
115 struct rcu_head rcu_node;
116};
117
ab0ee2ca
JG
118struct lttng_trigger_ht_element {
119 struct lttng_trigger *trigger;
120 struct cds_lfht_node node;
83b934ad
MD
121 /* call_rcu delayed reclaim. */
122 struct rcu_head rcu_node;
ab0ee2ca
JG
123};
124
125struct lttng_condition_list_element {
126 struct lttng_condition *condition;
127 struct cds_list_head node;
128};
129
130struct notification_client_list_element {
131 struct notification_client *client;
132 struct cds_list_head node;
133};
134
135struct notification_client_list {
f82f93a1 136 const struct lttng_trigger *trigger;
ab0ee2ca
JG
137 struct cds_list_head list;
138 struct cds_lfht_node notification_trigger_ht_node;
83b934ad
MD
139 /* call_rcu delayed reclaim. */
140 struct rcu_head rcu_node;
ab0ee2ca
JG
141};
142
143struct notification_client {
144 int socket;
145 /* Client protocol version. */
146 uint8_t major, minor;
147 uid_t uid;
148 gid_t gid;
149 /*
5332364f 150 * Indicates if the credentials and versions of the client have been
ab0ee2ca
JG
151 * checked.
152 */
153 bool validated;
154 /*
155 * Conditions to which the client's notification channel is subscribed.
156 * List of struct lttng_condition_list_node. The condition member is
157 * owned by the client.
158 */
159 struct cds_list_head condition_list;
160 struct cds_lfht_node client_socket_ht_node;
161 struct {
162 struct {
14fa22f8
JG
163 /*
164 * During the reception of a message, the reception
165 * buffers' "size" is set to contain the current
166 * message's complete payload.
167 */
ab0ee2ca
JG
168 struct lttng_dynamic_buffer buffer;
169 /* Bytes left to receive for the current message. */
170 size_t bytes_to_receive;
171 /* Type of the message being received. */
172 enum lttng_notification_channel_message_type msg_type;
173 /*
174 * Indicates whether or not credentials are expected
175 * from the client.
176 */
01ea340e 177 bool expect_creds;
ab0ee2ca
JG
178 /*
179 * Indicates whether or not credentials were received
180 * from the client.
181 */
182 bool creds_received;
14fa22f8 183 /* Only used during credentials reception. */
ab0ee2ca
JG
184 lttng_sock_cred creds;
185 } inbound;
186 struct {
187 /*
188 * Indicates whether or not a notification addressed to
189 * this client was dropped because a command reply was
190 * already buffered.
191 *
192 * A notification is dropped whenever the buffer is not
193 * empty.
194 */
195 bool dropped_notification;
196 /*
197 * Indicates whether or not a command reply is already
198 * buffered. In this case, it means that the client is
199 * not consuming command replies before emitting a new
200 * one. This could be caused by a protocol error or a
201 * misbehaving/malicious client.
202 */
203 bool queued_command_reply;
204 struct lttng_dynamic_buffer buffer;
205 } outbound;
206 } communication;
83b934ad
MD
207 /* call_rcu delayed reclaim. */
208 struct rcu_head rcu_node;
ab0ee2ca
JG
209};
210
211struct channel_state_sample {
212 struct channel_key key;
213 struct cds_lfht_node channel_state_ht_node;
214 uint64_t highest_usage;
215 uint64_t lowest_usage;
e8360425 216 uint64_t channel_total_consumed;
83b934ad
MD
217 /* call_rcu delayed reclaim. */
218 struct rcu_head rcu_node;
ab0ee2ca
JG
219};
220
e4db5ace 221static unsigned long hash_channel_key(struct channel_key *key);
ed327204 222static int evaluate_buffer_condition(const struct lttng_condition *condition,
e4db5ace 223 struct lttng_evaluation **evaluation,
e8360425
JD
224 const struct notification_thread_state *state,
225 const struct channel_state_sample *previous_sample,
226 const struct channel_state_sample *latest_sample,
227 uint64_t previous_session_consumed_total,
228 uint64_t latest_session_consumed_total,
229 struct channel_info *channel_info);
e4db5ace 230static
9b63a4aa
JG
231int send_evaluation_to_clients(const struct lttng_trigger *trigger,
232 const struct lttng_evaluation *evaluation,
e4db5ace
JR
233 struct notification_client_list *client_list,
234 struct notification_thread_state *state,
235 uid_t channel_uid, gid_t channel_gid);
236
8abe313a 237
ea9a44f0 238/* session_info API */
8abe313a
JG
239static
240void session_info_destroy(void *_data);
241static
242void session_info_get(struct session_info *session_info);
243static
244void session_info_put(struct session_info *session_info);
245static
246struct session_info *session_info_create(const char *name,
ea9a44f0 247 uid_t uid, gid_t gid,
1eee26c5
JG
248 struct lttng_session_trigger_list *trigger_list,
249 struct cds_lfht *sessions_ht);
8abe313a
JG
250static
251void session_info_add_channel(struct session_info *session_info,
252 struct channel_info *channel_info);
253static
254void session_info_remove_channel(struct session_info *session_info,
255 struct channel_info *channel_info);
256
ea9a44f0
JG
257/* lttng_session_trigger_list API */
258static
259struct lttng_session_trigger_list *lttng_session_trigger_list_create(
260 const char *session_name,
261 struct cds_lfht *session_triggers_ht);
262static
263struct lttng_session_trigger_list *lttng_session_trigger_list_build(
264 const struct notification_thread_state *state,
265 const char *session_name);
266static
267void lttng_session_trigger_list_destroy(
268 struct lttng_session_trigger_list *list);
269static
270int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
271 const struct lttng_trigger *trigger);
272
273
ab0ee2ca
JG
274static
275int match_client(struct cds_lfht_node *node, const void *key)
276{
277 /* This double-cast is intended to supress pointer-to-cast warning. */
278 int socket = (int) (intptr_t) key;
279 struct notification_client *client;
280
281 client = caa_container_of(node, struct notification_client,
282 client_socket_ht_node);
283
284 return !!(client->socket == socket);
285}
286
287static
288int match_channel_trigger_list(struct cds_lfht_node *node, const void *key)
289{
290 struct channel_key *channel_key = (struct channel_key *) key;
291 struct lttng_channel_trigger_list *trigger_list;
292
293 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
294 channel_triggers_ht_node);
295
296 return !!((channel_key->key == trigger_list->channel_key.key) &&
297 (channel_key->domain == trigger_list->channel_key.domain));
298}
299
51eab943
JG
300static
301int match_session_trigger_list(struct cds_lfht_node *node, const void *key)
302{
303 const char *session_name = (const char *) key;
304 struct lttng_session_trigger_list *trigger_list;
305
306 trigger_list = caa_container_of(node, struct lttng_session_trigger_list,
307 session_triggers_ht_node);
308
309 return !!(strcmp(trigger_list->session_name, session_name) == 0);
310}
311
ab0ee2ca
JG
312static
313int match_channel_state_sample(struct cds_lfht_node *node, const void *key)
314{
315 struct channel_key *channel_key = (struct channel_key *) key;
316 struct channel_state_sample *sample;
317
318 sample = caa_container_of(node, struct channel_state_sample,
319 channel_state_ht_node);
320
321 return !!((channel_key->key == sample->key.key) &&
322 (channel_key->domain == sample->key.domain));
323}
324
325static
326int match_channel_info(struct cds_lfht_node *node, const void *key)
327{
328 struct channel_key *channel_key = (struct channel_key *) key;
329 struct channel_info *channel_info;
330
331 channel_info = caa_container_of(node, struct channel_info,
332 channels_ht_node);
333
334 return !!((channel_key->key == channel_info->key.key) &&
335 (channel_key->domain == channel_info->key.domain));
336}
337
338static
339int match_condition(struct cds_lfht_node *node, const void *key)
340{
341 struct lttng_condition *condition_key = (struct lttng_condition *) key;
342 struct lttng_trigger_ht_element *trigger;
343 struct lttng_condition *condition;
344
345 trigger = caa_container_of(node, struct lttng_trigger_ht_element,
346 node);
347 condition = lttng_trigger_get_condition(trigger->trigger);
348 assert(condition);
349
350 return !!lttng_condition_is_equal(condition_key, condition);
351}
352
ab0ee2ca
JG
353static
354int match_client_list_condition(struct cds_lfht_node *node, const void *key)
355{
356 struct lttng_condition *condition_key = (struct lttng_condition *) key;
357 struct notification_client_list *client_list;
f82f93a1 358 const struct lttng_condition *condition;
ab0ee2ca
JG
359
360 assert(condition_key);
361
362 client_list = caa_container_of(node, struct notification_client_list,
363 notification_trigger_ht_node);
f82f93a1 364 condition = lttng_trigger_get_const_condition(client_list->trigger);
ab0ee2ca
JG
365
366 return !!lttng_condition_is_equal(condition_key, condition);
367}
368
f82f93a1
JG
369static
370int match_session(struct cds_lfht_node *node, const void *key)
371{
372 const char *name = key;
373 struct session_info *session_info = caa_container_of(
374 node, struct session_info, sessions_ht_node);
375
376 return !strcmp(session_info->name, name);
377}
378
ab0ee2ca
JG
379static
380unsigned long lttng_condition_buffer_usage_hash(
9b63a4aa 381 const struct lttng_condition *_condition)
ab0ee2ca 382{
9a2746aa
JG
383 unsigned long hash;
384 unsigned long condition_type;
ab0ee2ca
JG
385 struct lttng_condition_buffer_usage *condition;
386
387 condition = container_of(_condition,
388 struct lttng_condition_buffer_usage, parent);
389
9a2746aa
JG
390 condition_type = (unsigned long) condition->parent.type;
391 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
ab0ee2ca
JG
392 if (condition->session_name) {
393 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
394 }
395 if (condition->channel_name) {
8f56701f 396 hash ^= hash_key_str(condition->channel_name, lttng_ht_seed);
ab0ee2ca
JG
397 }
398 if (condition->domain.set) {
399 hash ^= hash_key_ulong(
400 (void *) condition->domain.type,
401 lttng_ht_seed);
402 }
403 if (condition->threshold_ratio.set) {
404 uint64_t val;
405
406 val = condition->threshold_ratio.value * (double) UINT32_MAX;
407 hash ^= hash_key_u64(&val, lttng_ht_seed);
6633b0dd 408 } else if (condition->threshold_bytes.set) {
ab0ee2ca
JG
409 uint64_t val;
410
411 val = condition->threshold_bytes.value;
412 hash ^= hash_key_u64(&val, lttng_ht_seed);
413 }
414 return hash;
415}
416
e8360425
JD
417static
418unsigned long lttng_condition_session_consumed_size_hash(
9b63a4aa 419 const struct lttng_condition *_condition)
e8360425 420{
9a2746aa
JG
421 unsigned long hash;
422 unsigned long condition_type =
423 (unsigned long) LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE;
e8360425
JD
424 struct lttng_condition_session_consumed_size *condition;
425 uint64_t val;
426
427 condition = container_of(_condition,
428 struct lttng_condition_session_consumed_size, parent);
429
9a2746aa 430 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
e8360425
JD
431 if (condition->session_name) {
432 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
433 }
434 val = condition->consumed_threshold_bytes.value;
435 hash ^= hash_key_u64(&val, lttng_ht_seed);
436 return hash;
437}
438
a8393880
JG
439static
440unsigned long lttng_condition_session_rotation_hash(
441 const struct lttng_condition *_condition)
442{
443 unsigned long hash, condition_type;
444 struct lttng_condition_session_rotation *condition;
445
446 condition = container_of(_condition,
447 struct lttng_condition_session_rotation, parent);
448 condition_type = (unsigned long) condition->parent.type;
449 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
450 assert(condition->session_name);
451 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
452 return hash;
453}
9a2746aa 454
ab0ee2ca
JG
455/*
456 * The lttng_condition hashing code is kept in this file (rather than
457 * condition.c) since it makes use of GPLv2 code (hashtable utils), which we
458 * don't want to link in liblttng-ctl.
459 */
460static
9b63a4aa 461unsigned long lttng_condition_hash(const struct lttng_condition *condition)
ab0ee2ca
JG
462{
463 switch (condition->type) {
464 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
465 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
466 return lttng_condition_buffer_usage_hash(condition);
e8360425
JD
467 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
468 return lttng_condition_session_consumed_size_hash(condition);
a8393880
JG
469 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
470 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
471 return lttng_condition_session_rotation_hash(condition);
ab0ee2ca
JG
472 default:
473 ERR("[notification-thread] Unexpected condition type caught");
474 abort();
475 }
476}
477
e4db5ace
JR
478static
479unsigned long hash_channel_key(struct channel_key *key)
480{
481 unsigned long key_hash = hash_key_u64(&key->key, lttng_ht_seed);
482 unsigned long domain_hash = hash_key_ulong(
483 (void *) (unsigned long) key->domain, lttng_ht_seed);
484
485 return key_hash ^ domain_hash;
486}
487
f82f93a1
JG
488/*
489 * Get the type of object to which a given condition applies. Bindings let
490 * the notification system evaluate a trigger's condition when a given
491 * object's state is updated.
492 *
493 * For instance, a condition bound to a channel will be evaluated everytime
494 * the channel's state is changed by a channel monitoring sample.
495 */
496static
497enum lttng_object_type get_condition_binding_object(
498 const struct lttng_condition *condition)
499{
500 switch (lttng_condition_get_type(condition)) {
501 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
502 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
503 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
504 return LTTNG_OBJECT_TYPE_CHANNEL;
505 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
506 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
507 return LTTNG_OBJECT_TYPE_SESSION;
508 default:
509 return LTTNG_OBJECT_TYPE_UNKNOWN;
510 }
511}
512
83b934ad
MD
513static
514void free_channel_info_rcu(struct rcu_head *node)
515{
516 free(caa_container_of(node, struct channel_info, rcu_node));
517}
518
ab0ee2ca
JG
519static
520void channel_info_destroy(struct channel_info *channel_info)
521{
522 if (!channel_info) {
523 return;
524 }
525
8abe313a
JG
526 if (channel_info->session_info) {
527 session_info_remove_channel(channel_info->session_info,
528 channel_info);
529 session_info_put(channel_info->session_info);
ab0ee2ca 530 }
8abe313a
JG
531 if (channel_info->name) {
532 free(channel_info->name);
ab0ee2ca 533 }
83b934ad
MD
534 call_rcu(&channel_info->rcu_node, free_channel_info_rcu);
535}
536
537static
538void free_session_info_rcu(struct rcu_head *node)
539{
540 free(caa_container_of(node, struct session_info, rcu_node));
ab0ee2ca
JG
541}
542
8abe313a 543/* Don't call directly, use the ref-counting mechanism. */
ab0ee2ca 544static
8abe313a 545void session_info_destroy(void *_data)
ab0ee2ca 546{
8abe313a 547 struct session_info *session_info = _data;
3b68d0a3 548 int ret;
ab0ee2ca 549
8abe313a
JG
550 assert(session_info);
551 if (session_info->channel_infos_ht) {
3b68d0a3
JR
552 ret = cds_lfht_destroy(session_info->channel_infos_ht, NULL);
553 if (ret) {
4c3f302b 554 ERR("[notification-thread] Failed to destroy channel information hash table");
3b68d0a3 555 }
8abe313a 556 }
ea9a44f0 557 lttng_session_trigger_list_destroy(session_info->trigger_list);
1eee26c5
JG
558
559 rcu_read_lock();
560 cds_lfht_del(session_info->sessions_ht,
561 &session_info->sessions_ht_node);
562 rcu_read_unlock();
8abe313a 563 free(session_info->name);
83b934ad 564 call_rcu(&session_info->rcu_node, free_session_info_rcu);
8abe313a 565}
ab0ee2ca 566
8abe313a
JG
567static
568void session_info_get(struct session_info *session_info)
569{
570 if (!session_info) {
571 return;
572 }
573 lttng_ref_get(&session_info->ref);
574}
575
576static
577void session_info_put(struct session_info *session_info)
578{
579 if (!session_info) {
580 return;
ab0ee2ca 581 }
8abe313a
JG
582 lttng_ref_put(&session_info->ref);
583}
584
585static
ea9a44f0 586struct session_info *session_info_create(const char *name, uid_t uid, gid_t gid,
1eee26c5
JG
587 struct lttng_session_trigger_list *trigger_list,
588 struct cds_lfht *sessions_ht)
8abe313a
JG
589{
590 struct session_info *session_info;
ab0ee2ca 591
8abe313a 592 assert(name);
ab0ee2ca 593
8abe313a
JG
594 session_info = zmalloc(sizeof(*session_info));
595 if (!session_info) {
596 goto end;
597 }
598 lttng_ref_init(&session_info->ref, session_info_destroy);
599
600 session_info->channel_infos_ht = cds_lfht_new(DEFAULT_HT_SIZE,
601 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
602 if (!session_info->channel_infos_ht) {
ab0ee2ca
JG
603 goto error;
604 }
8abe313a
JG
605
606 cds_lfht_node_init(&session_info->sessions_ht_node);
607 session_info->name = strdup(name);
608 if (!session_info->name) {
ab0ee2ca
JG
609 goto error;
610 }
8abe313a
JG
611 session_info->uid = uid;
612 session_info->gid = gid;
ea9a44f0 613 session_info->trigger_list = trigger_list;
1eee26c5 614 session_info->sessions_ht = sessions_ht;
8abe313a
JG
615end:
616 return session_info;
617error:
618 session_info_put(session_info);
619 return NULL;
620}
621
622static
623void session_info_add_channel(struct session_info *session_info,
624 struct channel_info *channel_info)
625{
626 rcu_read_lock();
627 cds_lfht_add(session_info->channel_infos_ht,
628 hash_channel_key(&channel_info->key),
629 &channel_info->session_info_channels_ht_node);
630 rcu_read_unlock();
631}
632
633static
634void session_info_remove_channel(struct session_info *session_info,
635 struct channel_info *channel_info)
636{
637 rcu_read_lock();
638 cds_lfht_del(session_info->channel_infos_ht,
639 &channel_info->session_info_channels_ht_node);
640 rcu_read_unlock();
641}
642
643static
644struct channel_info *channel_info_create(const char *channel_name,
645 struct channel_key *channel_key, uint64_t channel_capacity,
646 struct session_info *session_info)
647{
648 struct channel_info *channel_info = zmalloc(sizeof(*channel_info));
649
650 if (!channel_info) {
651 goto end;
652 }
653
ab0ee2ca 654 cds_lfht_node_init(&channel_info->channels_ht_node);
8abe313a
JG
655 cds_lfht_node_init(&channel_info->session_info_channels_ht_node);
656 memcpy(&channel_info->key, channel_key, sizeof(*channel_key));
657 channel_info->capacity = channel_capacity;
658
659 channel_info->name = strdup(channel_name);
660 if (!channel_info->name) {
661 goto error;
662 }
663
664 /*
665 * Set the references between session and channel infos:
666 * - channel_info holds a strong reference to session_info
667 * - session_info holds a weak reference to channel_info
668 */
669 session_info_get(session_info);
670 session_info_add_channel(session_info, channel_info);
671 channel_info->session_info = session_info;
ab0ee2ca 672end:
8abe313a 673 return channel_info;
ab0ee2ca 674error:
8abe313a 675 channel_info_destroy(channel_info);
ab0ee2ca
JG
676 return NULL;
677}
678
ed327204
JG
679/* RCU read lock must be held by the caller. */
680static
681struct notification_client_list *get_client_list_from_condition(
682 struct notification_thread_state *state,
683 const struct lttng_condition *condition)
684{
685 struct cds_lfht_node *node;
686 struct cds_lfht_iter iter;
687
688 cds_lfht_lookup(state->notification_trigger_clients_ht,
689 lttng_condition_hash(condition),
690 match_client_list_condition,
691 condition,
692 &iter);
693 node = cds_lfht_iter_get_node(&iter);
694
695 return node ? caa_container_of(node,
696 struct notification_client_list,
697 notification_trigger_ht_node) : NULL;
698}
699
e4db5ace
JR
700/* This function must be called with the RCU read lock held. */
701static
f82f93a1
JG
702int evaluate_channel_condition_for_client(
703 const struct lttng_condition *condition,
704 struct notification_thread_state *state,
705 struct lttng_evaluation **evaluation,
706 uid_t *session_uid, gid_t *session_gid)
e4db5ace
JR
707{
708 int ret;
709 struct cds_lfht_iter iter;
710 struct cds_lfht_node *node;
711 struct channel_info *channel_info = NULL;
712 struct channel_key *channel_key = NULL;
713 struct channel_state_sample *last_sample = NULL;
714 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
e4db5ace 715
f82f93a1 716 /* Find the channel associated with the condition. */
e4db5ace 717 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter,
f82f93a1 718 channel_trigger_list, channel_triggers_ht_node) {
e4db5ace
JR
719 struct lttng_trigger_list_element *element;
720
721 cds_list_for_each_entry(element, &channel_trigger_list->list, node) {
9b63a4aa 722 const struct lttng_condition *current_condition =
f82f93a1 723 lttng_trigger_get_const_condition(
e4db5ace
JR
724 element->trigger);
725
726 assert(current_condition);
727 if (!lttng_condition_is_equal(condition,
2ae99f0b 728 current_condition)) {
e4db5ace
JR
729 continue;
730 }
731
732 /* Found the trigger, save the channel key. */
733 channel_key = &channel_trigger_list->channel_key;
734 break;
735 }
736 if (channel_key) {
737 /* The channel key was found stop iteration. */
738 break;
739 }
740 }
741
742 if (!channel_key){
743 /* No channel found; normal exit. */
f82f93a1 744 DBG("[notification-thread] No known channel associated with newly subscribed-to condition");
e4db5ace
JR
745 ret = 0;
746 goto end;
747 }
748
749 /* Fetch channel info for the matching channel. */
750 cds_lfht_lookup(state->channels_ht,
751 hash_channel_key(channel_key),
752 match_channel_info,
753 channel_key,
754 &iter);
755 node = cds_lfht_iter_get_node(&iter);
756 assert(node);
757 channel_info = caa_container_of(node, struct channel_info,
758 channels_ht_node);
759
760 /* Retrieve the channel's last sample, if it exists. */
761 cds_lfht_lookup(state->channel_state_ht,
762 hash_channel_key(channel_key),
763 match_channel_state_sample,
764 channel_key,
765 &iter);
766 node = cds_lfht_iter_get_node(&iter);
767 if (node) {
768 last_sample = caa_container_of(node,
769 struct channel_state_sample,
770 channel_state_ht_node);
771 } else {
772 /* Nothing to evaluate, no sample was ever taken. Normal exit */
773 DBG("[notification-thread] No channel sample associated with newly subscribed-to condition");
774 ret = 0;
775 goto end;
776 }
777
f82f93a1 778 ret = evaluate_buffer_condition(condition, evaluation, state,
e8360425
JD
779 NULL, last_sample,
780 0, channel_info->session_info->consumed_data_size,
781 channel_info);
e4db5ace 782 if (ret) {
066a3c82 783 WARN("[notification-thread] Fatal error occurred while evaluating a newly subscribed-to condition");
e4db5ace
JR
784 goto end;
785 }
786
f82f93a1
JG
787 *session_uid = channel_info->session_info->uid;
788 *session_gid = channel_info->session_info->gid;
789end:
790 return ret;
791}
792
793static
794const char *get_condition_session_name(const struct lttng_condition *condition)
795{
796 const char *session_name = NULL;
797 enum lttng_condition_status status;
798
799 switch (lttng_condition_get_type(condition)) {
800 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
801 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
802 status = lttng_condition_buffer_usage_get_session_name(
803 condition, &session_name);
804 break;
805 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
806 status = lttng_condition_session_consumed_size_get_session_name(
807 condition, &session_name);
808 break;
809 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
810 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
811 status = lttng_condition_session_rotation_get_session_name(
812 condition, &session_name);
813 break;
814 default:
815 abort();
816 }
817 if (status != LTTNG_CONDITION_STATUS_OK) {
818 ERR("[notification-thread] Failed to retrieve session rotation condition's session name");
819 goto end;
820 }
821end:
822 return session_name;
823}
824
825/* This function must be called with the RCU read lock held. */
826static
827int evaluate_session_condition_for_client(
828 const struct lttng_condition *condition,
829 struct notification_thread_state *state,
830 struct lttng_evaluation **evaluation,
831 uid_t *session_uid, gid_t *session_gid)
832{
833 int ret;
834 struct cds_lfht_iter iter;
835 struct cds_lfht_node *node;
836 const char *session_name;
837 struct session_info *session_info = NULL;
838
839 session_name = get_condition_session_name(condition);
840
841 /* Find the session associated with the trigger. */
842 cds_lfht_lookup(state->sessions_ht,
843 hash_key_str(session_name, lttng_ht_seed),
844 match_session,
845 session_name,
846 &iter);
847 node = cds_lfht_iter_get_node(&iter);
848 if (!node) {
849 DBG("[notification-thread] No known session matching name \"%s\"",
850 session_name);
851 ret = 0;
852 goto end;
853 }
854
855 session_info = caa_container_of(node, struct session_info,
856 sessions_ht_node);
857 session_info_get(session_info);
858
859 /*
860 * Evaluation is performed in-line here since only one type of
861 * session-bound condition is handled for the moment.
862 */
863 switch (lttng_condition_get_type(condition)) {
864 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
865 if (!session_info->rotation.ongoing) {
be558f88 866 ret = 0;
f82f93a1
JG
867 goto end_session_put;
868 }
869
870 *evaluation = lttng_evaluation_session_rotation_ongoing_create(
871 session_info->rotation.id);
872 if (!*evaluation) {
873 /* Fatal error. */
874 ERR("[notification-thread] Failed to create session rotation ongoing evaluation for session \"%s\"",
875 session_info->name);
876 ret = -1;
877 goto end_session_put;
878 }
879 ret = 0;
880 break;
881 default:
882 ret = 0;
883 goto end_session_put;
884 }
885
886 *session_uid = session_info->uid;
887 *session_gid = session_info->gid;
888
889end_session_put:
890 session_info_put(session_info);
891end:
892 return ret;
893}
894
895/* This function must be called with the RCU read lock held. */
896static
897int evaluate_condition_for_client(const struct lttng_trigger *trigger,
898 const struct lttng_condition *condition,
899 struct notification_client *client,
900 struct notification_thread_state *state)
901{
902 int ret;
903 struct lttng_evaluation *evaluation = NULL;
904 struct notification_client_list client_list = { 0 };
905 struct notification_client_list_element client_list_element = { 0 };
906 uid_t object_uid = 0;
907 gid_t object_gid = 0;
908
909 assert(trigger);
910 assert(condition);
911 assert(client);
912 assert(state);
913
914 switch (get_condition_binding_object(condition)) {
915 case LTTNG_OBJECT_TYPE_SESSION:
916 ret = evaluate_session_condition_for_client(condition, state,
917 &evaluation, &object_uid, &object_gid);
918 break;
919 case LTTNG_OBJECT_TYPE_CHANNEL:
920 ret = evaluate_channel_condition_for_client(condition, state,
921 &evaluation, &object_uid, &object_gid);
922 break;
923 case LTTNG_OBJECT_TYPE_NONE:
924 ret = 0;
925 goto end;
926 case LTTNG_OBJECT_TYPE_UNKNOWN:
927 default:
928 ret = -1;
929 goto end;
930 }
af0c318d
JG
931 if (ret) {
932 /* Fatal error. */
933 goto end;
934 }
e4db5ace
JR
935 if (!evaluation) {
936 /* Evaluation yielded nothing. Normal exit. */
937 DBG("[notification-thread] Newly subscribed-to condition evaluated to false, nothing to report to client");
938 ret = 0;
939 goto end;
940 }
941
942 /*
943 * Create a temporary client list with the client currently
944 * subscribing.
945 */
946 cds_lfht_node_init(&client_list.notification_trigger_ht_node);
947 CDS_INIT_LIST_HEAD(&client_list.list);
948 client_list.trigger = trigger;
949
950 CDS_INIT_LIST_HEAD(&client_list_element.node);
951 client_list_element.client = client;
952 cds_list_add(&client_list_element.node, &client_list.list);
953
954 /* Send evaluation result to the newly-subscribed client. */
955 DBG("[notification-thread] Newly subscribed-to condition evaluated to true, notifying client");
956 ret = send_evaluation_to_clients(trigger, evaluation, &client_list,
f82f93a1 957 state, object_uid, object_gid);
e4db5ace
JR
958
959end:
960 return ret;
961}
962
ab0ee2ca
JG
963static
964int notification_thread_client_subscribe(struct notification_client *client,
965 struct lttng_condition *condition,
966 struct notification_thread_state *state,
967 enum lttng_notification_channel_status *_status)
968{
969 int ret = 0;
ab0ee2ca
JG
970 struct notification_client_list *client_list;
971 struct lttng_condition_list_element *condition_list_element = NULL;
972 struct notification_client_list_element *client_list_element = NULL;
973 enum lttng_notification_channel_status status =
974 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
975
976 /*
977 * Ensure that the client has not already subscribed to this condition
978 * before.
979 */
980 cds_list_for_each_entry(condition_list_element, &client->condition_list, node) {
981 if (lttng_condition_is_equal(condition_list_element->condition,
982 condition)) {
983 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ALREADY_SUBSCRIBED;
984 goto end;
985 }
986 }
987
988 condition_list_element = zmalloc(sizeof(*condition_list_element));
989 if (!condition_list_element) {
990 ret = -1;
991 goto error;
992 }
993 client_list_element = zmalloc(sizeof(*client_list_element));
994 if (!client_list_element) {
995 ret = -1;
996 goto error;
997 }
998
999 rcu_read_lock();
1000
1001 /*
1002 * Add the newly-subscribed condition to the client's subscription list.
1003 */
1004 CDS_INIT_LIST_HEAD(&condition_list_element->node);
1005 condition_list_element->condition = condition;
1006 cds_list_add(&condition_list_element->node, &client->condition_list);
1007
ed327204
JG
1008 client_list = get_client_list_from_condition(state, condition);
1009 if (!client_list) {
2ae99f0b
JG
1010 /*
1011 * No notification-emiting trigger registered with this
1012 * condition. We don't evaluate the condition right away
1013 * since this trigger is not registered yet.
1014 */
4fb43b68 1015 free(client_list_element);
ab0ee2ca
JG
1016 goto end_unlock;
1017 }
1018
2ae99f0b
JG
1019 /*
1020 * The condition to which the client just subscribed is evaluated
1021 * at this point so that conditions that are already TRUE result
1022 * in a notification being sent out.
1023 */
e4db5ace
JR
1024 if (evaluate_condition_for_client(client_list->trigger, condition,
1025 client, state)) {
1026 WARN("[notification-thread] Evaluation of a condition on client subscription failed, aborting.");
1027 ret = -1;
4bd5b4af 1028 free(client_list_element);
e4db5ace
JR
1029 goto end_unlock;
1030 }
1031
1032 /*
1033 * Add the client to the list of clients interested in a given trigger
1034 * if a "notification" trigger with a corresponding condition was
1035 * added prior.
1036 */
ab0ee2ca
JG
1037 client_list_element->client = client;
1038 CDS_INIT_LIST_HEAD(&client_list_element->node);
1039 cds_list_add(&client_list_element->node, &client_list->list);
1040end_unlock:
1041 rcu_read_unlock();
1042end:
1043 if (_status) {
1044 *_status = status;
1045 }
1046 return ret;
1047error:
1048 free(condition_list_element);
1049 free(client_list_element);
1050 return ret;
1051}
1052
1053static
1054int notification_thread_client_unsubscribe(
1055 struct notification_client *client,
1056 struct lttng_condition *condition,
1057 struct notification_thread_state *state,
1058 enum lttng_notification_channel_status *_status)
1059{
ab0ee2ca
JG
1060 struct notification_client_list *client_list;
1061 struct lttng_condition_list_element *condition_list_element,
1062 *condition_tmp;
1063 struct notification_client_list_element *client_list_element,
1064 *client_tmp;
1065 bool condition_found = false;
1066 enum lttng_notification_channel_status status =
1067 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
1068
1069 /* Remove the condition from the client's condition list. */
1070 cds_list_for_each_entry_safe(condition_list_element, condition_tmp,
1071 &client->condition_list, node) {
1072 if (!lttng_condition_is_equal(condition_list_element->condition,
1073 condition)) {
1074 continue;
1075 }
1076
1077 cds_list_del(&condition_list_element->node);
1078 /*
1079 * The caller may be iterating on the client's conditions to
1080 * tear down a client's connection. In this case, the condition
1081 * will be destroyed at the end.
1082 */
1083 if (condition != condition_list_element->condition) {
1084 lttng_condition_destroy(
1085 condition_list_element->condition);
1086 }
1087 free(condition_list_element);
1088 condition_found = true;
1089 break;
1090 }
1091
1092 if (!condition_found) {
1093 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNKNOWN_CONDITION;
1094 goto end;
1095 }
1096
1097 /*
1098 * Remove the client from the list of clients interested the trigger
1099 * matching the condition.
1100 */
1101 rcu_read_lock();
ed327204
JG
1102 client_list = get_client_list_from_condition(state, condition);
1103 if (!client_list) {
ab0ee2ca
JG
1104 goto end_unlock;
1105 }
1106
ab0ee2ca
JG
1107 cds_list_for_each_entry_safe(client_list_element, client_tmp,
1108 &client_list->list, node) {
1109 if (client_list_element->client->socket != client->socket) {
1110 continue;
1111 }
1112 cds_list_del(&client_list_element->node);
1113 free(client_list_element);
1114 break;
1115 }
1116end_unlock:
1117 rcu_read_unlock();
1118end:
1119 lttng_condition_destroy(condition);
1120 if (_status) {
1121 *_status = status;
1122 }
1123 return 0;
1124}
1125
83b934ad
MD
1126static
1127void free_notification_client_rcu(struct rcu_head *node)
1128{
1129 free(caa_container_of(node, struct notification_client, rcu_node));
1130}
1131
ab0ee2ca
JG
1132static
1133void notification_client_destroy(struct notification_client *client,
1134 struct notification_thread_state *state)
1135{
1136 struct lttng_condition_list_element *condition_list_element, *tmp;
1137
1138 if (!client) {
1139 return;
1140 }
1141
1142 /* Release all conditions to which the client was subscribed. */
1143 cds_list_for_each_entry_safe(condition_list_element, tmp,
1144 &client->condition_list, node) {
1145 (void) notification_thread_client_unsubscribe(client,
1146 condition_list_element->condition, state, NULL);
1147 }
1148
1149 if (client->socket >= 0) {
1150 (void) lttcomm_close_unix_sock(client->socket);
1151 }
1152 lttng_dynamic_buffer_reset(&client->communication.inbound.buffer);
1153 lttng_dynamic_buffer_reset(&client->communication.outbound.buffer);
83b934ad 1154 call_rcu(&client->rcu_node, free_notification_client_rcu);
ab0ee2ca
JG
1155}
1156
1157/*
1158 * Call with rcu_read_lock held (and hold for the lifetime of the returned
1159 * client pointer).
1160 */
1161static
1162struct notification_client *get_client_from_socket(int socket,
1163 struct notification_thread_state *state)
1164{
1165 struct cds_lfht_iter iter;
1166 struct cds_lfht_node *node;
1167 struct notification_client *client = NULL;
1168
1169 cds_lfht_lookup(state->client_socket_ht,
1170 hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed),
1171 match_client,
1172 (void *) (unsigned long) socket,
1173 &iter);
1174 node = cds_lfht_iter_get_node(&iter);
1175 if (!node) {
1176 goto end;
1177 }
1178
1179 client = caa_container_of(node, struct notification_client,
1180 client_socket_ht_node);
1181end:
1182 return client;
1183}
1184
1185static
e8360425 1186bool buffer_usage_condition_applies_to_channel(
51eab943
JG
1187 const struct lttng_condition *condition,
1188 const struct channel_info *channel_info)
ab0ee2ca
JG
1189{
1190 enum lttng_condition_status status;
e8360425
JD
1191 enum lttng_domain_type condition_domain;
1192 const char *condition_session_name = NULL;
1193 const char *condition_channel_name = NULL;
ab0ee2ca 1194
e8360425
JD
1195 status = lttng_condition_buffer_usage_get_domain_type(condition,
1196 &condition_domain);
1197 assert(status == LTTNG_CONDITION_STATUS_OK);
1198 if (channel_info->key.domain != condition_domain) {
ab0ee2ca
JG
1199 goto fail;
1200 }
1201
e8360425
JD
1202 status = lttng_condition_buffer_usage_get_session_name(
1203 condition, &condition_session_name);
1204 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
1205
1206 status = lttng_condition_buffer_usage_get_channel_name(
1207 condition, &condition_channel_name);
1208 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_channel_name);
1209
1210 if (strcmp(channel_info->session_info->name, condition_session_name)) {
1211 goto fail;
1212 }
1213 if (strcmp(channel_info->name, condition_channel_name)) {
ab0ee2ca
JG
1214 goto fail;
1215 }
1216
e8360425
JD
1217 return true;
1218fail:
1219 return false;
1220}
1221
1222static
1223bool session_consumed_size_condition_applies_to_channel(
51eab943
JG
1224 const struct lttng_condition *condition,
1225 const struct channel_info *channel_info)
e8360425
JD
1226{
1227 enum lttng_condition_status status;
1228 const char *condition_session_name = NULL;
1229
1230 status = lttng_condition_session_consumed_size_get_session_name(
1231 condition, &condition_session_name);
1232 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
1233
1234 if (strcmp(channel_info->session_info->name, condition_session_name)) {
ab0ee2ca
JG
1235 goto fail;
1236 }
1237
e8360425
JD
1238 return true;
1239fail:
1240 return false;
1241}
ab0ee2ca 1242
51eab943
JG
1243static
1244bool trigger_applies_to_channel(const struct lttng_trigger *trigger,
1245 const struct channel_info *channel_info)
1246{
1247 const struct lttng_condition *condition;
e8360425 1248 bool trigger_applies;
ab0ee2ca 1249
51eab943 1250 condition = lttng_trigger_get_const_condition(trigger);
e8360425 1251 if (!condition) {
ab0ee2ca
JG
1252 goto fail;
1253 }
e8360425
JD
1254
1255 switch (lttng_condition_get_type(condition)) {
1256 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
1257 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
1258 trigger_applies = buffer_usage_condition_applies_to_channel(
1259 condition, channel_info);
1260 break;
1261 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
1262 trigger_applies = session_consumed_size_condition_applies_to_channel(
1263 condition, channel_info);
1264 break;
1265 default:
ab0ee2ca
JG
1266 goto fail;
1267 }
1268
e8360425 1269 return trigger_applies;
ab0ee2ca
JG
1270fail:
1271 return false;
1272}
1273
1274static
1275bool trigger_applies_to_client(struct lttng_trigger *trigger,
1276 struct notification_client *client)
1277{
1278 bool applies = false;
1279 struct lttng_condition_list_element *condition_list_element;
1280
1281 cds_list_for_each_entry(condition_list_element, &client->condition_list,
1282 node) {
1283 applies = lttng_condition_is_equal(
1284 condition_list_element->condition,
1285 lttng_trigger_get_condition(trigger));
1286 if (applies) {
1287 break;
1288 }
1289 }
1290 return applies;
1291}
1292
ed327204
JG
1293/* Must be called with RCU read lock held. */
1294static
1295struct lttng_session_trigger_list *get_session_trigger_list(
1296 struct notification_thread_state *state,
1297 const char *session_name)
1298{
1299 struct lttng_session_trigger_list *list = NULL;
1300 struct cds_lfht_node *node;
1301 struct cds_lfht_iter iter;
1302
1303 cds_lfht_lookup(state->session_triggers_ht,
1304 hash_key_str(session_name, lttng_ht_seed),
1305 match_session_trigger_list,
1306 session_name,
1307 &iter);
1308 node = cds_lfht_iter_get_node(&iter);
1309 if (!node) {
1310 /*
1311 * Not an error, the list of triggers applying to that session
1312 * will be initialized when the session is created.
1313 */
1314 DBG("[notification-thread] No trigger list found for session \"%s\" as it is not yet known to the notification system",
1315 session_name);
1316 goto end;
1317 }
1318
1319 list = caa_container_of(node,
1320 struct lttng_session_trigger_list,
1321 session_triggers_ht_node);
1322end:
1323 return list;
1324}
1325
ea9a44f0
JG
1326/*
1327 * Allocate an empty lttng_session_trigger_list for the session named
1328 * 'session_name'.
1329 *
1330 * No ownership of 'session_name' is assumed by the session trigger list.
1331 * It is the caller's responsability to ensure the session name is alive
1332 * for as long as this list is.
1333 */
1334static
1335struct lttng_session_trigger_list *lttng_session_trigger_list_create(
1336 const char *session_name,
1337 struct cds_lfht *session_triggers_ht)
1338{
1339 struct lttng_session_trigger_list *list;
1340
1341 list = zmalloc(sizeof(*list));
1342 if (!list) {
1343 goto end;
1344 }
1345 list->session_name = session_name;
1346 CDS_INIT_LIST_HEAD(&list->list);
1347 cds_lfht_node_init(&list->session_triggers_ht_node);
1348 list->session_triggers_ht = session_triggers_ht;
1349
1350 rcu_read_lock();
1351 /* Publish the list through the session_triggers_ht. */
1352 cds_lfht_add(session_triggers_ht,
1353 hash_key_str(session_name, lttng_ht_seed),
1354 &list->session_triggers_ht_node);
1355 rcu_read_unlock();
1356end:
1357 return list;
1358}
1359
1360static
1361void free_session_trigger_list_rcu(struct rcu_head *node)
1362{
1363 free(caa_container_of(node, struct lttng_session_trigger_list,
1364 rcu_node));
1365}
1366
1367static
1368void lttng_session_trigger_list_destroy(struct lttng_session_trigger_list *list)
1369{
1370 struct lttng_trigger_list_element *trigger_list_element, *tmp;
1371
1372 /* Empty the list element by element, and then free the list itself. */
1373 cds_list_for_each_entry_safe(trigger_list_element, tmp,
1374 &list->list, node) {
1375 cds_list_del(&trigger_list_element->node);
1376 free(trigger_list_element);
1377 }
1378 rcu_read_lock();
1379 /* Unpublish the list from the session_triggers_ht. */
1380 cds_lfht_del(list->session_triggers_ht,
1381 &list->session_triggers_ht_node);
1382 rcu_read_unlock();
1383 call_rcu(&list->rcu_node, free_session_trigger_list_rcu);
1384}
1385
1386static
1387int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
1388 const struct lttng_trigger *trigger)
1389{
1390 int ret = 0;
1391 struct lttng_trigger_list_element *new_element =
1392 zmalloc(sizeof(*new_element));
1393
1394 if (!new_element) {
1395 ret = -1;
1396 goto end;
1397 }
1398 CDS_INIT_LIST_HEAD(&new_element->node);
1399 new_element->trigger = trigger;
1400 cds_list_add(&new_element->node, &list->list);
1401end:
1402 return ret;
1403}
1404
1405static
1406bool trigger_applies_to_session(const struct lttng_trigger *trigger,
1407 const char *session_name)
1408{
1409 bool applies = false;
1410 const struct lttng_condition *condition;
1411
1412 condition = lttng_trigger_get_const_condition(trigger);
1413 switch (lttng_condition_get_type(condition)) {
1414 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
1415 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
1416 {
1417 enum lttng_condition_status condition_status;
1418 const char *condition_session_name;
1419
1420 condition_status = lttng_condition_session_rotation_get_session_name(
1421 condition, &condition_session_name);
1422 if (condition_status != LTTNG_CONDITION_STATUS_OK) {
1423 ERR("[notification-thread] Failed to retrieve session rotation condition's session name");
1424 goto end;
1425 }
1426
1427 assert(condition_session_name);
1428 applies = !strcmp(condition_session_name, session_name);
1429 break;
1430 }
1431 default:
1432 goto end;
1433 }
1434end:
1435 return applies;
1436}
1437
1438/*
1439 * Allocate and initialize an lttng_session_trigger_list which contains
1440 * all triggers that apply to the session named 'session_name'.
1441 *
1442 * No ownership of 'session_name' is assumed by the session trigger list.
1443 * It is the caller's responsability to ensure the session name is alive
1444 * for as long as this list is.
1445 */
1446static
1447struct lttng_session_trigger_list *lttng_session_trigger_list_build(
1448 const struct notification_thread_state *state,
1449 const char *session_name)
1450{
1451 int trigger_count = 0;
1452 struct lttng_session_trigger_list *session_trigger_list = NULL;
1453 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1454 struct cds_lfht_iter iter;
1455
1456 session_trigger_list = lttng_session_trigger_list_create(session_name,
1457 state->session_triggers_ht);
1458
1459 /* Add all triggers applying to the session named 'session_name'. */
1460 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1461 node) {
1462 int ret;
1463
1464 if (!trigger_applies_to_session(trigger_ht_element->trigger,
1465 session_name)) {
1466 continue;
1467 }
1468
1469 ret = lttng_session_trigger_list_add(session_trigger_list,
1470 trigger_ht_element->trigger);
1471 if (ret) {
1472 goto error;
1473 }
1474
1475 trigger_count++;
1476 }
1477
1478 DBG("[notification-thread] Found %i triggers that apply to newly created session",
1479 trigger_count);
1480 return session_trigger_list;
1481error:
1482 lttng_session_trigger_list_destroy(session_trigger_list);
1483 return NULL;
1484}
1485
8abe313a
JG
1486static
1487struct session_info *find_or_create_session_info(
1488 struct notification_thread_state *state,
1489 const char *name, uid_t uid, gid_t gid)
1490{
1491 struct session_info *session = NULL;
1492 struct cds_lfht_node *node;
1493 struct cds_lfht_iter iter;
ea9a44f0 1494 struct lttng_session_trigger_list *trigger_list;
8abe313a
JG
1495
1496 rcu_read_lock();
1497 cds_lfht_lookup(state->sessions_ht,
1498 hash_key_str(name, lttng_ht_seed),
1499 match_session,
1500 name,
1501 &iter);
1502 node = cds_lfht_iter_get_node(&iter);
1503 if (node) {
1504 DBG("[notification-thread] Found session info of session \"%s\" (uid = %i, gid = %i)",
1505 name, uid, gid);
1506 session = caa_container_of(node, struct session_info,
1507 sessions_ht_node);
1508 assert(session->uid == uid);
1509 assert(session->gid == gid);
1eee26c5
JG
1510 session_info_get(session);
1511 goto end;
ea9a44f0
JG
1512 }
1513
1514 trigger_list = lttng_session_trigger_list_build(state, name);
1515 if (!trigger_list) {
1516 goto error;
8abe313a
JG
1517 }
1518
1eee26c5
JG
1519 session = session_info_create(name, uid, gid, trigger_list,
1520 state->sessions_ht);
8abe313a
JG
1521 if (!session) {
1522 ERR("[notification-thread] Failed to allocation session info for session \"%s\" (uid = %i, gid = %i)",
1523 name, uid, gid);
b248644d 1524 lttng_session_trigger_list_destroy(trigger_list);
ea9a44f0 1525 goto error;
8abe313a 1526 }
ea9a44f0 1527 trigger_list = NULL;
577983cb
JG
1528
1529 cds_lfht_add(state->sessions_ht, hash_key_str(name, lttng_ht_seed),
9b63a4aa 1530 &session->sessions_ht_node);
1eee26c5 1531end:
8abe313a
JG
1532 rcu_read_unlock();
1533 return session;
ea9a44f0
JG
1534error:
1535 rcu_read_unlock();
1536 session_info_put(session);
1537 return NULL;
8abe313a
JG
1538}
1539
ab0ee2ca
JG
1540static
1541int handle_notification_thread_command_add_channel(
8abe313a
JG
1542 struct notification_thread_state *state,
1543 const char *session_name, uid_t session_uid, gid_t session_gid,
1544 const char *channel_name, enum lttng_domain_type channel_domain,
1545 uint64_t channel_key_int, uint64_t channel_capacity,
1546 enum lttng_error_code *cmd_result)
ab0ee2ca
JG
1547{
1548 struct cds_list_head trigger_list;
8abe313a
JG
1549 struct channel_info *new_channel_info = NULL;
1550 struct channel_key channel_key = {
1551 .key = channel_key_int,
1552 .domain = channel_domain,
1553 };
ab0ee2ca
JG
1554 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
1555 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1556 int trigger_count = 0;
1557 struct cds_lfht_iter iter;
8abe313a 1558 struct session_info *session_info = NULL;
ab0ee2ca
JG
1559
1560 DBG("[notification-thread] Adding channel %s from session %s, channel key = %" PRIu64 " in %s domain",
8abe313a
JG
1561 channel_name, session_name, channel_key_int,
1562 channel_domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
ab0ee2ca
JG
1563
1564 CDS_INIT_LIST_HEAD(&trigger_list);
1565
8abe313a
JG
1566 session_info = find_or_create_session_info(state, session_name,
1567 session_uid, session_gid);
1568 if (!session_info) {
a9577b76 1569 /* Allocation error or an internal error occurred. */
ab0ee2ca
JG
1570 goto error;
1571 }
1572
8abe313a
JG
1573 new_channel_info = channel_info_create(channel_name, &channel_key,
1574 channel_capacity, session_info);
1575 if (!new_channel_info) {
1576 goto error;
1577 }
ab0ee2ca 1578
83b934ad 1579 rcu_read_lock();
ab0ee2ca
JG
1580 /* Build a list of all triggers applying to the new channel. */
1581 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1582 node) {
1583 struct lttng_trigger_list_element *new_element;
1584
1585 if (!trigger_applies_to_channel(trigger_ht_element->trigger,
8abe313a 1586 new_channel_info)) {
ab0ee2ca
JG
1587 continue;
1588 }
1589
1590 new_element = zmalloc(sizeof(*new_element));
1591 if (!new_element) {
83b934ad 1592 rcu_read_unlock();
ab0ee2ca
JG
1593 goto error;
1594 }
1595 CDS_INIT_LIST_HEAD(&new_element->node);
1596 new_element->trigger = trigger_ht_element->trigger;
1597 cds_list_add(&new_element->node, &trigger_list);
1598 trigger_count++;
1599 }
83b934ad 1600 rcu_read_unlock();
ab0ee2ca
JG
1601
1602 DBG("[notification-thread] Found %i triggers that apply to newly added channel",
1603 trigger_count);
1604 channel_trigger_list = zmalloc(sizeof(*channel_trigger_list));
1605 if (!channel_trigger_list) {
1606 goto error;
1607 }
8abe313a 1608 channel_trigger_list->channel_key = new_channel_info->key;
ab0ee2ca
JG
1609 CDS_INIT_LIST_HEAD(&channel_trigger_list->list);
1610 cds_lfht_node_init(&channel_trigger_list->channel_triggers_ht_node);
1611 cds_list_splice(&trigger_list, &channel_trigger_list->list);
1612
1613 rcu_read_lock();
1614 /* Add channel to the channel_ht which owns the channel_infos. */
1615 cds_lfht_add(state->channels_ht,
8abe313a 1616 hash_channel_key(&new_channel_info->key),
ab0ee2ca
JG
1617 &new_channel_info->channels_ht_node);
1618 /*
1619 * Add the list of triggers associated with this channel to the
1620 * channel_triggers_ht.
1621 */
1622 cds_lfht_add(state->channel_triggers_ht,
8abe313a 1623 hash_channel_key(&new_channel_info->key),
ab0ee2ca
JG
1624 &channel_trigger_list->channel_triggers_ht_node);
1625 rcu_read_unlock();
1eee26c5 1626 session_info_put(session_info);
ab0ee2ca
JG
1627 *cmd_result = LTTNG_OK;
1628 return 0;
1629error:
ab0ee2ca 1630 channel_info_destroy(new_channel_info);
8abe313a 1631 session_info_put(session_info);
ab0ee2ca
JG
1632 return 1;
1633}
1634
83b934ad
MD
1635static
1636void free_channel_trigger_list_rcu(struct rcu_head *node)
1637{
1638 free(caa_container_of(node, struct lttng_channel_trigger_list,
1639 rcu_node));
1640}
1641
1642static
1643void free_channel_state_sample_rcu(struct rcu_head *node)
1644{
1645 free(caa_container_of(node, struct channel_state_sample,
1646 rcu_node));
1647}
1648
ab0ee2ca
JG
1649static
1650int handle_notification_thread_command_remove_channel(
1651 struct notification_thread_state *state,
1652 uint64_t channel_key, enum lttng_domain_type domain,
1653 enum lttng_error_code *cmd_result)
1654{
1655 struct cds_lfht_node *node;
1656 struct cds_lfht_iter iter;
1657 struct lttng_channel_trigger_list *trigger_list;
1658 struct lttng_trigger_list_element *trigger_list_element, *tmp;
1659 struct channel_key key = { .key = channel_key, .domain = domain };
1660 struct channel_info *channel_info;
1661
1662 DBG("[notification-thread] Removing channel key = %" PRIu64 " in %s domain",
1663 channel_key, domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
1664
1665 rcu_read_lock();
1666
1667 cds_lfht_lookup(state->channel_triggers_ht,
1668 hash_channel_key(&key),
1669 match_channel_trigger_list,
1670 &key,
1671 &iter);
1672 node = cds_lfht_iter_get_node(&iter);
1673 /*
1674 * There is a severe internal error if we are being asked to remove a
1675 * channel that doesn't exist.
1676 */
1677 if (!node) {
1678 ERR("[notification-thread] Channel being removed is unknown to the notification thread");
1679 goto end;
1680 }
1681
1682 /* Free the list of triggers associated with this channel. */
1683 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
1684 channel_triggers_ht_node);
1685 cds_list_for_each_entry_safe(trigger_list_element, tmp,
1686 &trigger_list->list, node) {
1687 cds_list_del(&trigger_list_element->node);
1688 free(trigger_list_element);
1689 }
1690 cds_lfht_del(state->channel_triggers_ht, node);
83b934ad 1691 call_rcu(&trigger_list->rcu_node, free_channel_trigger_list_rcu);
ab0ee2ca
JG
1692
1693 /* Free sampled channel state. */
1694 cds_lfht_lookup(state->channel_state_ht,
1695 hash_channel_key(&key),
1696 match_channel_state_sample,
1697 &key,
1698 &iter);
1699 node = cds_lfht_iter_get_node(&iter);
1700 /*
1701 * This is expected to be NULL if the channel is destroyed before we
1702 * received a sample.
1703 */
1704 if (node) {
1705 struct channel_state_sample *sample = caa_container_of(node,
1706 struct channel_state_sample,
1707 channel_state_ht_node);
1708
1709 cds_lfht_del(state->channel_state_ht, node);
83b934ad 1710 call_rcu(&sample->rcu_node, free_channel_state_sample_rcu);
ab0ee2ca
JG
1711 }
1712
1713 /* Remove the channel from the channels_ht and free it. */
1714 cds_lfht_lookup(state->channels_ht,
1715 hash_channel_key(&key),
1716 match_channel_info,
1717 &key,
1718 &iter);
1719 node = cds_lfht_iter_get_node(&iter);
1720 assert(node);
1721 channel_info = caa_container_of(node, struct channel_info,
1722 channels_ht_node);
1723 cds_lfht_del(state->channels_ht, node);
1724 channel_info_destroy(channel_info);
1725end:
1726 rcu_read_unlock();
1727 *cmd_result = LTTNG_OK;
1728 return 0;
1729}
1730
0ca52944 1731static
ed327204 1732int handle_notification_thread_command_session_rotation(
0ca52944 1733 struct notification_thread_state *state,
ed327204
JG
1734 enum notification_thread_command_type cmd_type,
1735 const char *session_name, uid_t session_uid, gid_t session_gid,
1736 uint64_t trace_archive_chunk_id,
1737 struct lttng_trace_archive_location *location,
1738 enum lttng_error_code *_cmd_result)
0ca52944 1739{
ed327204
JG
1740 int ret = 0;
1741 enum lttng_error_code cmd_result = LTTNG_OK;
1742 struct lttng_session_trigger_list *trigger_list;
1743 struct lttng_trigger_list_element *trigger_list_element;
1744 struct session_info *session_info;
0ca52944 1745
ed327204
JG
1746 rcu_read_lock();
1747
1748 session_info = find_or_create_session_info(state, session_name,
1749 session_uid, session_gid);
1750 if (!session_info) {
a9577b76 1751 /* Allocation error or an internal error occurred. */
ed327204
JG
1752 ret = -1;
1753 cmd_result = LTTNG_ERR_NOMEM;
1754 goto end;
1755 }
1756
1757 session_info->rotation.ongoing =
1758 cmd_type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING;
1759 session_info->rotation.id = trace_archive_chunk_id;
1760 trigger_list = get_session_trigger_list(state, session_name);
1761 if (!trigger_list) {
1762 DBG("[notification-thread] No triggers applying to session \"%s\" found",
1763 session_name);
1764 goto end;
1765 }
1766
1767 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
1768 node) {
1769 const struct lttng_condition *condition;
1770 const struct lttng_action *action;
1771 const struct lttng_trigger *trigger;
1772 struct notification_client_list *client_list;
1773 struct lttng_evaluation *evaluation = NULL;
1774 enum lttng_condition_type condition_type;
1775
1776 trigger = trigger_list_element->trigger;
1777 condition = lttng_trigger_get_const_condition(trigger);
1778 assert(condition);
1779 condition_type = lttng_condition_get_type(condition);
1780
1781 if (condition_type == LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING &&
1782 cmd_type != NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING) {
1783 continue;
1784 } else if (condition_type == LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED &&
1785 cmd_type != NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED) {
1786 continue;
1787 }
1788
1789 action = lttng_trigger_get_const_action(trigger);
1790
1791 /* Notify actions are the only type currently supported. */
1792 assert(lttng_action_get_type_const(action) ==
1793 LTTNG_ACTION_TYPE_NOTIFY);
1794
1795 client_list = get_client_list_from_condition(state, condition);
1796 assert(client_list);
1797
1798 if (cds_list_empty(&client_list->list)) {
1799 /*
1800 * No clients interested in the evaluation's result,
1801 * skip it.
1802 */
1803 continue;
1804 }
1805
1806 if (cmd_type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING) {
1807 evaluation = lttng_evaluation_session_rotation_ongoing_create(
1808 trace_archive_chunk_id);
1809 } else {
1810 evaluation = lttng_evaluation_session_rotation_completed_create(
1811 trace_archive_chunk_id, location);
1812 }
1813
1814 if (!evaluation) {
1815 /* Internal error */
1816 ret = -1;
1817 cmd_result = LTTNG_ERR_UNK;
1818 goto end;
1819 }
1820
1821 /* Dispatch evaluation result to all clients. */
1822 ret = send_evaluation_to_clients(trigger_list_element->trigger,
1823 evaluation, client_list, state,
1824 session_info->uid,
1825 session_info->gid);
1826 lttng_evaluation_destroy(evaluation);
1827 if (caa_unlikely(ret)) {
1828 goto end;
1829 }
1830 }
1831end:
1832 session_info_put(session_info);
1833 *_cmd_result = cmd_result;
1834 rcu_read_unlock();
1835 return ret;
0ca52944
JG
1836}
1837
1da26331
JG
1838static
1839int condition_is_supported(struct lttng_condition *condition)
1840{
1841 int ret;
1842
1843 switch (lttng_condition_get_type(condition)) {
1844 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
1845 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
1846 {
1847 enum lttng_domain_type domain;
1848
1849 ret = lttng_condition_buffer_usage_get_domain_type(condition,
1850 &domain);
1851 if (ret) {
1852 ret = -1;
1853 goto end;
1854 }
1855
1856 if (domain != LTTNG_DOMAIN_KERNEL) {
1857 ret = 1;
1858 goto end;
1859 }
1860
1861 /*
1862 * Older kernel tracers don't expose the API to monitor their
1863 * buffers. Therefore, we reject triggers that require that
1864 * mechanism to be available to be evaluated.
1865 */
7d268848 1866 ret = kernel_supports_ring_buffer_snapshot_sample_positions();
1da26331
JG
1867 break;
1868 }
1869 default:
1870 ret = 1;
1871 }
1872end:
1873 return ret;
1874}
1875
51eab943
JG
1876/* Must be called with RCU read lock held. */
1877static
1878int bind_trigger_to_matching_session(const struct lttng_trigger *trigger,
1879 struct notification_thread_state *state)
1880{
1881 int ret = 0;
51eab943
JG
1882 const struct lttng_condition *condition;
1883 const char *session_name;
1884 struct lttng_session_trigger_list *trigger_list;
1885
1886 condition = lttng_trigger_get_const_condition(trigger);
1887 switch (lttng_condition_get_type(condition)) {
1888 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
1889 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
1890 {
1891 enum lttng_condition_status status;
1892
1893 status = lttng_condition_session_rotation_get_session_name(
1894 condition, &session_name);
1895 if (status != LTTNG_CONDITION_STATUS_OK) {
1896 ERR("[notification-thread] Failed to bind trigger to session: unable to get 'session_rotation' condition's session name");
1897 ret = -1;
1898 goto end;
1899 }
1900 break;
1901 }
1902 default:
1903 ret = -1;
1904 goto end;
1905 }
1906
ed327204
JG
1907 trigger_list = get_session_trigger_list(state, session_name);
1908 if (!trigger_list) {
51eab943
JG
1909 DBG("[notification-thread] Unable to bind trigger applying to session \"%s\" as it is not yet known to the notification system",
1910 session_name);
1911 goto end;
51eab943 1912
ed327204 1913 }
51eab943
JG
1914
1915 DBG("[notification-thread] Newly registered trigger bound to session \"%s\"",
1916 session_name);
1917 ret = lttng_session_trigger_list_add(trigger_list, trigger);
1918end:
1919 return ret;
1920}
1921
1922/* Must be called with RCU read lock held. */
1923static
1924int bind_trigger_to_matching_channels(const struct lttng_trigger *trigger,
1925 struct notification_thread_state *state)
1926{
1927 int ret = 0;
1928 struct cds_lfht_node *node;
1929 struct cds_lfht_iter iter;
1930 struct channel_info *channel;
1931
1932 cds_lfht_for_each_entry(state->channels_ht, &iter, channel,
1933 channels_ht_node) {
1934 struct lttng_trigger_list_element *trigger_list_element;
1935 struct lttng_channel_trigger_list *trigger_list;
a5d64ae7 1936 struct cds_lfht_iter lookup_iter;
51eab943
JG
1937
1938 if (!trigger_applies_to_channel(trigger, channel)) {
1939 continue;
1940 }
1941
1942 cds_lfht_lookup(state->channel_triggers_ht,
1943 hash_channel_key(&channel->key),
1944 match_channel_trigger_list,
1945 &channel->key,
a5d64ae7
MD
1946 &lookup_iter);
1947 node = cds_lfht_iter_get_node(&lookup_iter);
51eab943
JG
1948 assert(node);
1949 trigger_list = caa_container_of(node,
1950 struct lttng_channel_trigger_list,
1951 channel_triggers_ht_node);
1952
1953 trigger_list_element = zmalloc(sizeof(*trigger_list_element));
1954 if (!trigger_list_element) {
1955 ret = -1;
1956 goto end;
1957 }
1958 CDS_INIT_LIST_HEAD(&trigger_list_element->node);
1959 trigger_list_element->trigger = trigger;
1960 cds_list_add(&trigger_list_element->node, &trigger_list->list);
1961 DBG("[notification-thread] Newly registered trigger bound to channel \"%s\"",
1962 channel->name);
1963 }
1964end:
1965 return ret;
1966}
1967
ab0ee2ca
JG
1968/*
1969 * FIXME A client's credentials are not checked when registering a trigger, nor
1970 * are they stored alongside with the trigger.
1971 *
1da26331 1972 * The effects of this are benign since:
ab0ee2ca 1973 * - The client will succeed in registering the trigger, as it is valid,
51eab943 1974 * - The trigger will, internally, be bound to the channel/session,
ab0ee2ca
JG
1975 * - The notifications will not be sent since the client's credentials
1976 * are checked against the channel at that moment.
1da26331
JG
1977 *
1978 * If this function returns a non-zero value, it means something is
50ca7858 1979 * fundamentally broken and the whole subsystem/thread will be torn down.
1da26331
JG
1980 *
1981 * If a non-fatal error occurs, just set the cmd_result to the appropriate
1982 * error code.
ab0ee2ca
JG
1983 */
1984static
1985int handle_notification_thread_command_register_trigger(
8abe313a
JG
1986 struct notification_thread_state *state,
1987 struct lttng_trigger *trigger,
1988 enum lttng_error_code *cmd_result)
ab0ee2ca
JG
1989{
1990 int ret = 0;
1991 struct lttng_condition *condition;
1992 struct notification_client *client;
1993 struct notification_client_list *client_list = NULL;
1994 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1995 struct notification_client_list_element *client_list_element, *tmp;
1996 struct cds_lfht_node *node;
1997 struct cds_lfht_iter iter;
ab0ee2ca
JG
1998 bool free_trigger = true;
1999
2000 rcu_read_lock();
2001
2002 condition = lttng_trigger_get_condition(trigger);
1da26331
JG
2003 assert(condition);
2004
2005 ret = condition_is_supported(condition);
2006 if (ret < 0) {
2007 goto error;
2008 } else if (ret == 0) {
2009 *cmd_result = LTTNG_ERR_NOT_SUPPORTED;
2010 goto error;
2011 } else {
2012 /* Feature is supported, continue. */
2013 ret = 0;
2014 }
2015
ab0ee2ca
JG
2016 trigger_ht_element = zmalloc(sizeof(*trigger_ht_element));
2017 if (!trigger_ht_element) {
2018 ret = -1;
2019 goto error;
2020 }
2021
2022 /* Add trigger to the trigger_ht. */
2023 cds_lfht_node_init(&trigger_ht_element->node);
2024 trigger_ht_element->trigger = trigger;
2025
2026 node = cds_lfht_add_unique(state->triggers_ht,
2027 lttng_condition_hash(condition),
2028 match_condition,
2029 condition,
2030 &trigger_ht_element->node);
2031 if (node != &trigger_ht_element->node) {
2032 /* Not a fatal error, simply report it to the client. */
2033 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
2034 goto error_free_ht_element;
2035 }
2036
2037 /*
2038 * Ownership of the trigger and of its wrapper was transfered to
2039 * the triggers_ht.
2040 */
2041 trigger_ht_element = NULL;
2042 free_trigger = false;
2043
2044 /*
2045 * The rest only applies to triggers that have a "notify" action.
2046 * It is not skipped as this is the only action type currently
2047 * supported.
2048 */
2049 client_list = zmalloc(sizeof(*client_list));
2050 if (!client_list) {
2051 ret = -1;
2052 goto error_free_ht_element;
2053 }
2054 cds_lfht_node_init(&client_list->notification_trigger_ht_node);
2055 CDS_INIT_LIST_HEAD(&client_list->list);
2056 client_list->trigger = trigger;
2057
2058 /* Build a list of clients to which this new trigger applies. */
2059 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
2060 client_socket_ht_node) {
2061 if (!trigger_applies_to_client(trigger, client)) {
2062 continue;
2063 }
2064
2065 client_list_element = zmalloc(sizeof(*client_list_element));
2066 if (!client_list_element) {
2067 ret = -1;
2068 goto error_free_client_list;
2069 }
2070 CDS_INIT_LIST_HEAD(&client_list_element->node);
2071 client_list_element->client = client;
2072 cds_list_add(&client_list_element->node, &client_list->list);
2073 }
2074
2075 cds_lfht_add(state->notification_trigger_clients_ht,
2076 lttng_condition_hash(condition),
2077 &client_list->notification_trigger_ht_node);
ab0ee2ca 2078
f82f93a1 2079 switch (get_condition_binding_object(condition)) {
51eab943
JG
2080 case LTTNG_OBJECT_TYPE_SESSION:
2081 /* Add the trigger to the list if it matches a known session. */
2082 ret = bind_trigger_to_matching_session(trigger, state);
2083 if (ret) {
2084 goto error_free_client_list;
ab0ee2ca 2085 }
f82f93a1 2086 break;
51eab943
JG
2087 case LTTNG_OBJECT_TYPE_CHANNEL:
2088 /*
2089 * Add the trigger to list of triggers bound to the channels
2090 * currently known.
2091 */
2092 ret = bind_trigger_to_matching_channels(trigger, state);
2093 if (ret) {
ab0ee2ca
JG
2094 goto error_free_client_list;
2095 }
51eab943
JG
2096 break;
2097 case LTTNG_OBJECT_TYPE_NONE:
2098 break;
2099 default:
2100 ERR("[notification-thread] Unknown object type on which to bind a newly registered trigger was encountered");
2101 ret = -1;
2102 goto error_free_client_list;
ab0ee2ca
JG
2103 }
2104
2ae99f0b
JG
2105 /*
2106 * Since there is nothing preventing clients from subscribing to a
2107 * condition before the corresponding trigger is registered, we have
2108 * to evaluate this new condition right away.
2109 *
2110 * At some point, we were waiting for the next "evaluation" (e.g. on
2111 * reception of a channel sample) to evaluate this new condition, but
2112 * that was broken.
2113 *
2114 * The reason it was broken is that waiting for the next sample
2115 * does not allow us to properly handle transitions for edge-triggered
2116 * conditions.
2117 *
2118 * Consider this example: when we handle a new channel sample, we
2119 * evaluate each conditions twice: once with the previous state, and
2120 * again with the newest state. We then use those two results to
2121 * determine whether a state change happened: a condition was false and
2122 * became true. If a state change happened, we have to notify clients.
2123 *
2124 * Now, if a client subscribes to a given notification and registers
2125 * a trigger *after* that subscription, we have to make sure the
2126 * condition is evaluated at this point while considering only the
2127 * current state. Otherwise, the next evaluation cycle may only see
2128 * that the evaluations remain the same (true for samples n-1 and n) and
2129 * the client will never know that the condition has been met.
2130 */
2131 cds_list_for_each_entry_safe(client_list_element, tmp,
2132 &client_list->list, node) {
2133 ret = evaluate_condition_for_client(trigger, condition,
2134 client_list_element->client, state);
2135 if (ret) {
2136 goto error_free_client_list;
2137 }
2138 }
2139
2140 /*
2141 * Client list ownership transferred to the
2142 * notification_trigger_clients_ht.
2143 */
2144 client_list = NULL;
2145
ab0ee2ca
JG
2146 *cmd_result = LTTNG_OK;
2147error_free_client_list:
2148 if (client_list) {
2149 cds_list_for_each_entry_safe(client_list_element, tmp,
2150 &client_list->list, node) {
2151 free(client_list_element);
2152 }
2153 free(client_list);
2154 }
2155error_free_ht_element:
2156 free(trigger_ht_element);
2157error:
2158 if (free_trigger) {
2159 struct lttng_action *action = lttng_trigger_get_action(trigger);
2160
2161 lttng_condition_destroy(condition);
2162 lttng_action_destroy(action);
2163 lttng_trigger_destroy(trigger);
2164 }
2165 rcu_read_unlock();
2166 return ret;
2167}
2168
83b934ad
MD
2169static
2170void free_notification_client_list_rcu(struct rcu_head *node)
2171{
2172 free(caa_container_of(node, struct notification_client_list,
2173 rcu_node));
2174}
2175
2176static
2177void free_lttng_trigger_ht_element_rcu(struct rcu_head *node)
2178{
2179 free(caa_container_of(node, struct lttng_trigger_ht_element,
2180 rcu_node));
2181}
2182
cc2295b5 2183static
ab0ee2ca
JG
2184int handle_notification_thread_command_unregister_trigger(
2185 struct notification_thread_state *state,
2186 struct lttng_trigger *trigger,
2187 enum lttng_error_code *_cmd_reply)
2188{
2189 struct cds_lfht_iter iter;
ed327204 2190 struct cds_lfht_node *triggers_ht_node;
ab0ee2ca
JG
2191 struct lttng_channel_trigger_list *trigger_list;
2192 struct notification_client_list *client_list;
2193 struct notification_client_list_element *client_list_element, *tmp;
2194 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
2195 struct lttng_condition *condition = lttng_trigger_get_condition(
2196 trigger);
2197 struct lttng_action *action;
2198 enum lttng_error_code cmd_reply;
2199
2200 rcu_read_lock();
2201
2202 cds_lfht_lookup(state->triggers_ht,
2203 lttng_condition_hash(condition),
2204 match_condition,
2205 condition,
2206 &iter);
2207 triggers_ht_node = cds_lfht_iter_get_node(&iter);
2208 if (!triggers_ht_node) {
2209 cmd_reply = LTTNG_ERR_TRIGGER_NOT_FOUND;
2210 goto end;
2211 } else {
2212 cmd_reply = LTTNG_OK;
2213 }
2214
2215 /* Remove trigger from channel_triggers_ht. */
2216 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter, trigger_list,
2217 channel_triggers_ht_node) {
2218 struct lttng_trigger_list_element *trigger_element, *tmp;
2219
2220 cds_list_for_each_entry_safe(trigger_element, tmp,
2221 &trigger_list->list, node) {
9b63a4aa
JG
2222 const struct lttng_condition *current_condition =
2223 lttng_trigger_get_const_condition(
ab0ee2ca
JG
2224 trigger_element->trigger);
2225
2226 assert(current_condition);
2227 if (!lttng_condition_is_equal(condition,
2228 current_condition)) {
2229 continue;
2230 }
2231
2232 DBG("[notification-thread] Removed trigger from channel_triggers_ht");
2233 cds_list_del(&trigger_element->node);
e4db5ace
JR
2234 /* A trigger can only appear once per channel */
2235 break;
ab0ee2ca
JG
2236 }
2237 }
2238
2239 /*
2240 * Remove and release the client list from
2241 * notification_trigger_clients_ht.
2242 */
ed327204
JG
2243 client_list = get_client_list_from_condition(state, condition);
2244 assert(client_list);
2245
ab0ee2ca
JG
2246 cds_list_for_each_entry_safe(client_list_element, tmp,
2247 &client_list->list, node) {
2248 free(client_list_element);
2249 }
ed327204
JG
2250 cds_lfht_del(state->notification_trigger_clients_ht,
2251 &client_list->notification_trigger_ht_node);
83b934ad 2252 call_rcu(&client_list->rcu_node, free_notification_client_list_rcu);
ab0ee2ca
JG
2253
2254 /* Remove trigger from triggers_ht. */
2255 trigger_ht_element = caa_container_of(triggers_ht_node,
2256 struct lttng_trigger_ht_element, node);
2257 cds_lfht_del(state->triggers_ht, triggers_ht_node);
2258
2259 condition = lttng_trigger_get_condition(trigger_ht_element->trigger);
2260 lttng_condition_destroy(condition);
2261 action = lttng_trigger_get_action(trigger_ht_element->trigger);
2262 lttng_action_destroy(action);
2263 lttng_trigger_destroy(trigger_ht_element->trigger);
83b934ad 2264 call_rcu(&trigger_ht_element->rcu_node, free_lttng_trigger_ht_element_rcu);
ab0ee2ca
JG
2265end:
2266 rcu_read_unlock();
2267 if (_cmd_reply) {
2268 *_cmd_reply = cmd_reply;
2269 }
2270 return 0;
2271}
2272
2273/* Returns 0 on success, 1 on exit requested, negative value on error. */
2274int handle_notification_thread_command(
2275 struct notification_thread_handle *handle,
2276 struct notification_thread_state *state)
2277{
2278 int ret;
2279 uint64_t counter;
2280 struct notification_thread_command *cmd;
2281
8abe313a 2282 /* Read the event pipe to put it back into a quiescent state. */
18aeca05 2283 ret = lttng_read(lttng_pipe_get_readfd(handle->cmd_queue.event_pipe), &counter,
8abe313a 2284 sizeof(counter));
18aeca05 2285 if (ret != sizeof(counter)) {
ab0ee2ca
JG
2286 goto error;
2287 }
2288
2289 pthread_mutex_lock(&handle->cmd_queue.lock);
2290 cmd = cds_list_first_entry(&handle->cmd_queue.list,
2291 struct notification_thread_command, cmd_list_node);
2292 switch (cmd->type) {
2293 case NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER:
2294 DBG("[notification-thread] Received register trigger command");
2295 ret = handle_notification_thread_command_register_trigger(
2296 state, cmd->parameters.trigger,
2297 &cmd->reply_code);
2298 break;
2299 case NOTIFICATION_COMMAND_TYPE_UNREGISTER_TRIGGER:
2300 DBG("[notification-thread] Received unregister trigger command");
2301 ret = handle_notification_thread_command_unregister_trigger(
2302 state, cmd->parameters.trigger,
2303 &cmd->reply_code);
2304 break;
2305 case NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL:
2306 DBG("[notification-thread] Received add channel command");
2307 ret = handle_notification_thread_command_add_channel(
8abe313a
JG
2308 state,
2309 cmd->parameters.add_channel.session.name,
2310 cmd->parameters.add_channel.session.uid,
2311 cmd->parameters.add_channel.session.gid,
2312 cmd->parameters.add_channel.channel.name,
2313 cmd->parameters.add_channel.channel.domain,
2314 cmd->parameters.add_channel.channel.key,
2315 cmd->parameters.add_channel.channel.capacity,
ab0ee2ca
JG
2316 &cmd->reply_code);
2317 break;
2318 case NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL:
2319 DBG("[notification-thread] Received remove channel command");
2320 ret = handle_notification_thread_command_remove_channel(
2321 state, cmd->parameters.remove_channel.key,
2322 cmd->parameters.remove_channel.domain,
2323 &cmd->reply_code);
2324 break;
0ca52944 2325 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING:
0ca52944 2326 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED:
ed327204
JG
2327 DBG("[notification-thread] Received session rotation %s command",
2328 cmd->type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING ?
2329 "ongoing" : "completed");
2330 ret = handle_notification_thread_command_session_rotation(
0ca52944 2331 state,
ed327204
JG
2332 cmd->type,
2333 cmd->parameters.session_rotation.session_name,
2334 cmd->parameters.session_rotation.uid,
2335 cmd->parameters.session_rotation.gid,
2336 cmd->parameters.session_rotation.trace_archive_chunk_id,
2337 cmd->parameters.session_rotation.location,
0ca52944
JG
2338 &cmd->reply_code);
2339 break;
ab0ee2ca
JG
2340 case NOTIFICATION_COMMAND_TYPE_QUIT:
2341 DBG("[notification-thread] Received quit command");
2342 cmd->reply_code = LTTNG_OK;
2343 ret = 1;
2344 goto end;
2345 default:
2346 ERR("[notification-thread] Unknown internal command received");
2347 goto error_unlock;
2348 }
2349
2350 if (ret) {
2351 goto error_unlock;
2352 }
2353end:
2354 cds_list_del(&cmd->cmd_list_node);
8ada111f 2355 lttng_waiter_wake_up(&cmd->reply_waiter);
ab0ee2ca
JG
2356 pthread_mutex_unlock(&handle->cmd_queue.lock);
2357 return ret;
2358error_unlock:
2359 /* Wake-up and return a fatal error to the calling thread. */
8ada111f 2360 lttng_waiter_wake_up(&cmd->reply_waiter);
ab0ee2ca
JG
2361 pthread_mutex_unlock(&handle->cmd_queue.lock);
2362 cmd->reply_code = LTTNG_ERR_FATAL;
2363error:
2364 /* Indicate a fatal error to the caller. */
2365 return -1;
2366}
2367
2368static
2369unsigned long hash_client_socket(int socket)
2370{
2371 return hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed);
2372}
2373
2374static
2375int socket_set_non_blocking(int socket)
2376{
2377 int ret, flags;
2378
2379 /* Set the pipe as non-blocking. */
2380 ret = fcntl(socket, F_GETFL, 0);
2381 if (ret == -1) {
2382 PERROR("fcntl get socket flags");
2383 goto end;
2384 }
2385 flags = ret;
2386
2387 ret = fcntl(socket, F_SETFL, flags | O_NONBLOCK);
2388 if (ret == -1) {
2389 PERROR("fcntl set O_NONBLOCK socket flag");
2390 goto end;
2391 }
2392 DBG("Client socket (fd = %i) set as non-blocking", socket);
2393end:
2394 return ret;
2395}
2396
2397static
14fa22f8 2398int client_reset_inbound_state(struct notification_client *client)
ab0ee2ca
JG
2399{
2400 int ret;
2401
2402 ret = lttng_dynamic_buffer_set_size(
2403 &client->communication.inbound.buffer, 0);
2404 assert(!ret);
2405
2406 client->communication.inbound.bytes_to_receive =
2407 sizeof(struct lttng_notification_channel_message);
2408 client->communication.inbound.msg_type =
2409 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN;
ab0ee2ca
JG
2410 LTTNG_SOCK_SET_UID_CRED(&client->communication.inbound.creds, -1);
2411 LTTNG_SOCK_SET_GID_CRED(&client->communication.inbound.creds, -1);
14fa22f8
JG
2412 ret = lttng_dynamic_buffer_set_size(
2413 &client->communication.inbound.buffer,
2414 client->communication.inbound.bytes_to_receive);
2415 return ret;
ab0ee2ca
JG
2416}
2417
2418int handle_notification_thread_client_connect(
2419 struct notification_thread_state *state)
2420{
2421 int ret;
2422 struct notification_client *client;
2423
2424 DBG("[notification-thread] Handling new notification channel client connection");
2425
2426 client = zmalloc(sizeof(*client));
2427 if (!client) {
2428 /* Fatal error. */
2429 ret = -1;
2430 goto error;
2431 }
2432 CDS_INIT_LIST_HEAD(&client->condition_list);
2433 lttng_dynamic_buffer_init(&client->communication.inbound.buffer);
2434 lttng_dynamic_buffer_init(&client->communication.outbound.buffer);
01ea340e 2435 client->communication.inbound.expect_creds = true;
14fa22f8
JG
2436 ret = client_reset_inbound_state(client);
2437 if (ret) {
2438 ERR("[notification-thread] Failed to reset client communication's inbound state");
2439 ret = 0;
2440 goto error;
2441 }
ab0ee2ca
JG
2442
2443 ret = lttcomm_accept_unix_sock(state->notification_channel_socket);
2444 if (ret < 0) {
2445 ERR("[notification-thread] Failed to accept new notification channel client connection");
2446 ret = 0;
2447 goto error;
2448 }
2449
2450 client->socket = ret;
2451
2452 ret = socket_set_non_blocking(client->socket);
2453 if (ret) {
2454 ERR("[notification-thread] Failed to set new notification channel client connection socket as non-blocking");
2455 goto error;
2456 }
2457
2458 ret = lttcomm_setsockopt_creds_unix_sock(client->socket);
2459 if (ret < 0) {
2460 ERR("[notification-thread] Failed to set socket options on new notification channel client socket");
2461 ret = 0;
2462 goto error;
2463 }
2464
2465 ret = lttng_poll_add(&state->events, client->socket,
2466 LPOLLIN | LPOLLERR |
2467 LPOLLHUP | LPOLLRDHUP);
2468 if (ret < 0) {
2469 ERR("[notification-thread] Failed to add notification channel client socket to poll set");
2470 ret = 0;
2471 goto error;
2472 }
2473 DBG("[notification-thread] Added new notification channel client socket (%i) to poll set",
2474 client->socket);
2475
ab0ee2ca
JG
2476 rcu_read_lock();
2477 cds_lfht_add(state->client_socket_ht,
2478 hash_client_socket(client->socket),
2479 &client->client_socket_ht_node);
2480 rcu_read_unlock();
2481
2482 return ret;
2483error:
2484 notification_client_destroy(client, state);
2485 return ret;
2486}
2487
2488int handle_notification_thread_client_disconnect(
2489 int client_socket,
2490 struct notification_thread_state *state)
2491{
2492 int ret = 0;
2493 struct notification_client *client;
2494
2495 rcu_read_lock();
2496 DBG("[notification-thread] Closing client connection (socket fd = %i)",
2497 client_socket);
2498 client = get_client_from_socket(client_socket, state);
2499 if (!client) {
2500 /* Internal state corruption, fatal error. */
2501 ERR("[notification-thread] Unable to find client (socket fd = %i)",
2502 client_socket);
2503 ret = -1;
2504 goto end;
2505 }
2506
2507 ret = lttng_poll_del(&state->events, client_socket);
2508 if (ret) {
2509 ERR("[notification-thread] Failed to remove client socket from poll set");
2510 }
2511 cds_lfht_del(state->client_socket_ht,
2512 &client->client_socket_ht_node);
2513 notification_client_destroy(client, state);
2514end:
2515 rcu_read_unlock();
2516 return ret;
2517}
2518
2519int handle_notification_thread_client_disconnect_all(
2520 struct notification_thread_state *state)
2521{
2522 struct cds_lfht_iter iter;
2523 struct notification_client *client;
2524 bool error_encoutered = false;
2525
2526 rcu_read_lock();
2527 DBG("[notification-thread] Closing all client connections");
2528 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
2529 client_socket_ht_node) {
2530 int ret;
2531
2532 ret = handle_notification_thread_client_disconnect(
2533 client->socket, state);
2534 if (ret) {
2535 error_encoutered = true;
2536 }
2537 }
2538 rcu_read_unlock();
2539 return error_encoutered ? 1 : 0;
2540}
2541
2542int handle_notification_thread_trigger_unregister_all(
2543 struct notification_thread_state *state)
2544{
4149ace8 2545 bool error_occurred = false;
ab0ee2ca
JG
2546 struct cds_lfht_iter iter;
2547 struct lttng_trigger_ht_element *trigger_ht_element;
2548
763de384 2549 rcu_read_lock();
ab0ee2ca
JG
2550 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
2551 node) {
2552 int ret = handle_notification_thread_command_unregister_trigger(
2553 state, trigger_ht_element->trigger, NULL);
2554 if (ret) {
4149ace8 2555 error_occurred = true;
ab0ee2ca
JG
2556 }
2557 }
763de384 2558 rcu_read_unlock();
4149ace8 2559 return error_occurred ? -1 : 0;
ab0ee2ca
JG
2560}
2561
2562static
2563int client_flush_outgoing_queue(struct notification_client *client,
2564 struct notification_thread_state *state)
2565{
2566 ssize_t ret;
2567 size_t to_send_count;
2568
2569 assert(client->communication.outbound.buffer.size != 0);
2570 to_send_count = client->communication.outbound.buffer.size;
2571 DBG("[notification-thread] Flushing client (socket fd = %i) outgoing queue",
2572 client->socket);
2573
2574 ret = lttcomm_send_unix_sock_non_block(client->socket,
2575 client->communication.outbound.buffer.data,
2576 to_send_count);
2577 if ((ret < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) ||
2578 (ret > 0 && ret < to_send_count)) {
2579 DBG("[notification-thread] Client (socket fd = %i) outgoing queue could not be completely flushed",
2580 client->socket);
2581 to_send_count -= max(ret, 0);
2582
2583 memcpy(client->communication.outbound.buffer.data,
2584 client->communication.outbound.buffer.data +
2585 client->communication.outbound.buffer.size - to_send_count,
2586 to_send_count);
2587 ret = lttng_dynamic_buffer_set_size(
2588 &client->communication.outbound.buffer,
2589 to_send_count);
2590 if (ret) {
2591 goto error;
2592 }
2593
2594 /*
2595 * We want to be notified whenever there is buffer space
2596 * available to send the rest of the payload.
2597 */
2598 ret = lttng_poll_mod(&state->events, client->socket,
2599 CLIENT_POLL_MASK_IN_OUT);
2600 if (ret) {
2601 goto error;
2602 }
2603 } else if (ret < 0) {
2604 /* Generic error, disconnect the client. */
2605 ERR("[notification-thread] Failed to send flush outgoing queue, disconnecting client (socket fd = %i)",
2606 client->socket);
2607 ret = handle_notification_thread_client_disconnect(
2608 client->socket, state);
2609 if (ret) {
2610 goto error;
2611 }
2612 } else {
2613 /* No error and flushed the queue completely. */
2614 ret = lttng_dynamic_buffer_set_size(
2615 &client->communication.outbound.buffer, 0);
2616 if (ret) {
2617 goto error;
2618 }
2619 ret = lttng_poll_mod(&state->events, client->socket,
2620 CLIENT_POLL_MASK_IN);
2621 if (ret) {
2622 goto error;
2623 }
2624
2625 client->communication.outbound.queued_command_reply = false;
2626 client->communication.outbound.dropped_notification = false;
2627 }
2628
2629 return 0;
2630error:
2631 return -1;
2632}
2633
2634static
2635int client_send_command_reply(struct notification_client *client,
2636 struct notification_thread_state *state,
2637 enum lttng_notification_channel_status status)
2638{
2639 int ret;
2640 struct lttng_notification_channel_command_reply reply = {
2641 .status = (int8_t) status,
2642 };
2643 struct lttng_notification_channel_message msg = {
2644 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY,
2645 .size = sizeof(reply),
2646 };
2647 char buffer[sizeof(msg) + sizeof(reply)];
2648
2649 if (client->communication.outbound.queued_command_reply) {
2650 /* Protocol error. */
2651 goto error;
2652 }
2653
2654 memcpy(buffer, &msg, sizeof(msg));
2655 memcpy(buffer + sizeof(msg), &reply, sizeof(reply));
2656 DBG("[notification-thread] Send command reply (%i)", (int) status);
2657
2658 /* Enqueue buffer to outgoing queue and flush it. */
2659 ret = lttng_dynamic_buffer_append(
2660 &client->communication.outbound.buffer,
2661 buffer, sizeof(buffer));
2662 if (ret) {
2663 goto error;
2664 }
2665
2666 ret = client_flush_outgoing_queue(client, state);
2667 if (ret) {
2668 goto error;
2669 }
2670
2671 if (client->communication.outbound.buffer.size != 0) {
2672 /* Queue could not be emptied. */
2673 client->communication.outbound.queued_command_reply = true;
2674 }
2675
2676 return 0;
2677error:
2678 return -1;
2679}
2680
2681static
2682int client_dispatch_message(struct notification_client *client,
2683 struct notification_thread_state *state)
2684{
2685 int ret = 0;
2686
2687 if (client->communication.inbound.msg_type !=
2688 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE &&
2689 client->communication.inbound.msg_type !=
2690 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN &&
2691 !client->validated) {
2692 WARN("[notification-thread] client attempted a command before handshake");
2693 ret = -1;
2694 goto end;
2695 }
2696
2697 switch (client->communication.inbound.msg_type) {
2698 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN:
2699 {
2700 /*
2701 * Receiving message header. The function will be called again
2702 * once the rest of the message as been received and can be
2703 * interpreted.
2704 */
2705 const struct lttng_notification_channel_message *msg;
2706
2707 assert(sizeof(*msg) ==
2708 client->communication.inbound.buffer.size);
2709 msg = (const struct lttng_notification_channel_message *)
2710 client->communication.inbound.buffer.data;
2711
2712 if (msg->size == 0 || msg->size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) {
2713 ERR("[notification-thread] Invalid notification channel message: length = %u", msg->size);
2714 ret = -1;
2715 goto end;
2716 }
2717
2718 switch (msg->type) {
2719 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
2720 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
2721 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
2722 break;
2723 default:
2724 ret = -1;
2725 ERR("[notification-thread] Invalid notification channel message: unexpected message type");
2726 goto end;
2727 }
2728
2729 client->communication.inbound.bytes_to_receive = msg->size;
2730 client->communication.inbound.msg_type =
2731 (enum lttng_notification_channel_message_type) msg->type;
ab0ee2ca 2732 ret = lttng_dynamic_buffer_set_size(
14fa22f8 2733 &client->communication.inbound.buffer, msg->size);
ab0ee2ca
JG
2734 if (ret) {
2735 goto end;
2736 }
2737 break;
2738 }
2739 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
2740 {
2741 struct lttng_notification_channel_command_handshake *handshake_client;
2742 struct lttng_notification_channel_command_handshake handshake_reply = {
2743 .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
2744 .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
2745 };
2746 struct lttng_notification_channel_message msg_header = {
2747 .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE,
2748 .size = sizeof(handshake_reply),
2749 };
2750 enum lttng_notification_channel_status status =
2751 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
2752 char send_buffer[sizeof(msg_header) + sizeof(handshake_reply)];
2753
2754 memcpy(send_buffer, &msg_header, sizeof(msg_header));
2755 memcpy(send_buffer + sizeof(msg_header), &handshake_reply,
2756 sizeof(handshake_reply));
2757
2758 handshake_client =
2759 (struct lttng_notification_channel_command_handshake *)
2760 client->communication.inbound.buffer.data;
47a32869 2761 client->major = handshake_client->major;
ab0ee2ca
JG
2762 client->minor = handshake_client->minor;
2763 if (!client->communication.inbound.creds_received) {
2764 ERR("[notification-thread] No credentials received from client");
2765 ret = -1;
2766 goto end;
2767 }
2768
2769 client->uid = LTTNG_SOCK_GET_UID_CRED(
2770 &client->communication.inbound.creds);
2771 client->gid = LTTNG_SOCK_GET_GID_CRED(
2772 &client->communication.inbound.creds);
2773 DBG("[notification-thread] Received handshake from client (uid = %u, gid = %u) with version %i.%i",
2774 client->uid, client->gid, (int) client->major,
2775 (int) client->minor);
2776
2777 if (handshake_client->major != LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) {
2778 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNSUPPORTED_VERSION;
2779 }
2780
2781 ret = lttng_dynamic_buffer_append(&client->communication.outbound.buffer,
2782 send_buffer, sizeof(send_buffer));
2783 if (ret) {
2784 ERR("[notification-thread] Failed to send protocol version to notification channel client");
2785 goto end;
2786 }
2787
2788 ret = client_flush_outgoing_queue(client, state);
2789 if (ret) {
2790 goto end;
2791 }
2792
2793 ret = client_send_command_reply(client, state, status);
2794 if (ret) {
2795 ERR("[notification-thread] Failed to send reply to notification channel client");
2796 goto end;
2797 }
2798
2799 /* Set reception state to receive the next message header. */
14fa22f8
JG
2800 ret = client_reset_inbound_state(client);
2801 if (ret) {
2802 ERR("[notification-thread] Failed to reset client communication's inbound state");
2803 goto end;
2804 }
ab0ee2ca
JG
2805 client->validated = true;
2806 break;
2807 }
2808 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
2809 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
2810 {
2811 struct lttng_condition *condition;
2812 enum lttng_notification_channel_status status =
2813 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
2814 const struct lttng_buffer_view condition_view =
2815 lttng_buffer_view_from_dynamic_buffer(
2816 &client->communication.inbound.buffer,
2817 0, -1);
2818 size_t expected_condition_size =
2819 client->communication.inbound.buffer.size;
2820
2821 ret = lttng_condition_create_from_buffer(&condition_view,
2822 &condition);
2823 if (ret != expected_condition_size) {
2824 ERR("[notification-thread] Malformed condition received from client");
2825 goto end;
2826 }
2827
2828 if (client->communication.inbound.msg_type ==
2829 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE) {
ab0ee2ca
JG
2830 ret = notification_thread_client_subscribe(client,
2831 condition, state, &status);
2832 } else {
2833 ret = notification_thread_client_unsubscribe(client,
2834 condition, state, &status);
2835 }
2836 if (ret) {
2837 goto end;
2838 }
2839
2840 ret = client_send_command_reply(client, state, status);
2841 if (ret) {
2842 ERR("[notification-thread] Failed to send reply to notification channel client");
2843 goto end;
2844 }
2845
2846 /* Set reception state to receive the next message header. */
14fa22f8
JG
2847 ret = client_reset_inbound_state(client);
2848 if (ret) {
2849 ERR("[notification-thread] Failed to reset client communication's inbound state");
2850 goto end;
2851 }
ab0ee2ca
JG
2852 break;
2853 }
2854 default:
2855 abort();
2856 }
2857end:
2858 return ret;
2859}
2860
2861/* Incoming data from client. */
2862int handle_notification_thread_client_in(
2863 struct notification_thread_state *state, int socket)
2864{
14fa22f8 2865 int ret = 0;
ab0ee2ca
JG
2866 struct notification_client *client;
2867 ssize_t recv_ret;
2868 size_t offset;
2869
2870 client = get_client_from_socket(socket, state);
2871 if (!client) {
2872 /* Internal error, abort. */
2873 ret = -1;
2874 goto end;
2875 }
2876
14fa22f8
JG
2877 offset = client->communication.inbound.buffer.size -
2878 client->communication.inbound.bytes_to_receive;
01ea340e 2879 if (client->communication.inbound.expect_creds) {
ab0ee2ca
JG
2880 recv_ret = lttcomm_recv_creds_unix_sock(socket,
2881 client->communication.inbound.buffer.data + offset,
2882 client->communication.inbound.bytes_to_receive,
2883 &client->communication.inbound.creds);
2884 if (recv_ret > 0) {
01ea340e 2885 client->communication.inbound.expect_creds = false;
ab0ee2ca
JG
2886 client->communication.inbound.creds_received = true;
2887 }
2888 } else {
2889 recv_ret = lttcomm_recv_unix_sock_non_block(socket,
2890 client->communication.inbound.buffer.data + offset,
2891 client->communication.inbound.bytes_to_receive);
2892 }
2893 if (recv_ret < 0) {
2894 goto error_disconnect_client;
2895 }
2896
2897 client->communication.inbound.bytes_to_receive -= recv_ret;
ab0ee2ca
JG
2898 if (client->communication.inbound.bytes_to_receive == 0) {
2899 ret = client_dispatch_message(client, state);
2900 if (ret) {
2901 /*
2902 * Only returns an error if this client must be
2903 * disconnected.
2904 */
2905 goto error_disconnect_client;
2906 }
2907 } else {
2908 goto end;
2909 }
2910end:
2911 return ret;
2912error_disconnect_client:
2913 ret = handle_notification_thread_client_disconnect(socket, state);
2914 return ret;
2915}
2916
2917/* Client ready to receive outgoing data. */
2918int handle_notification_thread_client_out(
2919 struct notification_thread_state *state, int socket)
2920{
2921 int ret;
2922 struct notification_client *client;
2923
2924 client = get_client_from_socket(socket, state);
2925 if (!client) {
2926 /* Internal error, abort. */
2927 ret = -1;
2928 goto end;
2929 }
2930
2931 ret = client_flush_outgoing_queue(client, state);
2932 if (ret) {
2933 goto end;
2934 }
2935end:
2936 return ret;
2937}
2938
2939static
e8360425
JD
2940bool evaluate_buffer_usage_condition(const struct lttng_condition *condition,
2941 const struct channel_state_sample *sample,
2942 uint64_t buffer_capacity)
ab0ee2ca
JG
2943{
2944 bool result = false;
2945 uint64_t threshold;
2946 enum lttng_condition_type condition_type;
e8360425 2947 const struct lttng_condition_buffer_usage *use_condition = container_of(
ab0ee2ca
JG
2948 condition, struct lttng_condition_buffer_usage,
2949 parent);
2950
ab0ee2ca
JG
2951 if (use_condition->threshold_bytes.set) {
2952 threshold = use_condition->threshold_bytes.value;
2953 } else {
2954 /*
2955 * Threshold was expressed as a ratio.
2956 *
2957 * TODO the threshold (in bytes) of conditions expressed
2958 * as a ratio of total buffer size could be cached to
2959 * forego this double-multiplication or it could be performed
2960 * as fixed-point math.
2961 *
2962 * Note that caching should accomodate the case where the
2963 * condition applies to multiple channels (i.e. don't assume
2964 * that all channels matching my_chann* have the same size...)
2965 */
2966 threshold = (uint64_t) (use_condition->threshold_ratio.value *
2967 (double) buffer_capacity);
2968 }
2969
2970 condition_type = lttng_condition_get_type(condition);
2971 if (condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW) {
2972 DBG("[notification-thread] Low buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
2973 threshold, sample->highest_usage);
2974
2975 /*
2976 * The low condition should only be triggered once _all_ of the
2977 * streams in a channel have gone below the "low" threshold.
2978 */
2979 if (sample->highest_usage <= threshold) {
2980 result = true;
2981 }
2982 } else {
2983 DBG("[notification-thread] High buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
2984 threshold, sample->highest_usage);
2985
2986 /*
2987 * For high buffer usage scenarios, we want to trigger whenever
2988 * _any_ of the streams has reached the "high" threshold.
2989 */
2990 if (sample->highest_usage >= threshold) {
2991 result = true;
2992 }
2993 }
e8360425 2994
ab0ee2ca
JG
2995 return result;
2996}
2997
2998static
e8360425
JD
2999bool evaluate_session_consumed_size_condition(
3000 const struct lttng_condition *condition,
3001 uint64_t session_consumed_size)
3002{
3003 uint64_t threshold;
3004 const struct lttng_condition_session_consumed_size *size_condition =
3005 container_of(condition,
3006 struct lttng_condition_session_consumed_size,
3007 parent);
3008
3009 threshold = size_condition->consumed_threshold_bytes.value;
3010 DBG("[notification-thread] Session consumed size condition being evaluated: threshold = %" PRIu64 ", current size = %" PRIu64,
3011 threshold, session_consumed_size);
3012 return session_consumed_size >= threshold;
3013}
3014
3015static
ed327204 3016int evaluate_buffer_condition(const struct lttng_condition *condition,
ab0ee2ca 3017 struct lttng_evaluation **evaluation,
e8360425
JD
3018 const struct notification_thread_state *state,
3019 const struct channel_state_sample *previous_sample,
3020 const struct channel_state_sample *latest_sample,
3021 uint64_t previous_session_consumed_total,
3022 uint64_t latest_session_consumed_total,
3023 struct channel_info *channel_info)
ab0ee2ca
JG
3024{
3025 int ret = 0;
3026 enum lttng_condition_type condition_type;
e8360425
JD
3027 const bool previous_sample_available = !!previous_sample;
3028 bool previous_sample_result = false;
ab0ee2ca
JG
3029 bool latest_sample_result;
3030
3031 condition_type = lttng_condition_get_type(condition);
ab0ee2ca 3032
e8360425
JD
3033 switch (condition_type) {
3034 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
3035 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
3036 if (caa_likely(previous_sample_available)) {
3037 previous_sample_result =
3038 evaluate_buffer_usage_condition(condition,
3039 previous_sample, channel_info->capacity);
3040 }
3041 latest_sample_result = evaluate_buffer_usage_condition(
3042 condition, latest_sample,
3043 channel_info->capacity);
3044 break;
3045 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
3046 if (caa_likely(previous_sample_available)) {
3047 previous_sample_result =
3048 evaluate_session_consumed_size_condition(
3049 condition,
3050 previous_session_consumed_total);
3051 }
3052 latest_sample_result =
3053 evaluate_session_consumed_size_condition(
3054 condition,
3055 latest_session_consumed_total);
3056 break;
3057 default:
3058 /* Unknown condition type; internal error. */
3059 abort();
3060 }
ab0ee2ca
JG
3061
3062 if (!latest_sample_result ||
3063 (previous_sample_result == latest_sample_result)) {
3064 /*
3065 * Only trigger on a condition evaluation transition.
3066 *
3067 * NOTE: This edge-triggered logic may not be appropriate for
3068 * future condition types.
3069 */
3070 goto end;
3071 }
3072
e8360425
JD
3073 if (!evaluation || !latest_sample_result) {
3074 goto end;
3075 }
3076
3077 switch (condition_type) {
3078 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
3079 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
ab0ee2ca
JG
3080 *evaluation = lttng_evaluation_buffer_usage_create(
3081 condition_type,
3082 latest_sample->highest_usage,
e8360425
JD
3083 channel_info->capacity);
3084 break;
3085 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
3086 *evaluation = lttng_evaluation_session_consumed_size_create(
e8360425
JD
3087 latest_session_consumed_total);
3088 break;
3089 default:
3090 abort();
3091 }
3092
3093 if (!*evaluation) {
3094 ret = -1;
3095 goto end;
ab0ee2ca
JG
3096 }
3097end:
3098 return ret;
3099}
3100
3101static
3102int client_enqueue_dropped_notification(struct notification_client *client,
3103 struct notification_thread_state *state)
3104{
3105 int ret;
3106 struct lttng_notification_channel_message msg = {
3107 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED,
3108 .size = 0,
3109 };
3110
3111 ret = lttng_dynamic_buffer_append(
3112 &client->communication.outbound.buffer, &msg,
3113 sizeof(msg));
3114 return ret;
3115}
3116
3117static
9b63a4aa
JG
3118int send_evaluation_to_clients(const struct lttng_trigger *trigger,
3119 const struct lttng_evaluation *evaluation,
ab0ee2ca
JG
3120 struct notification_client_list* client_list,
3121 struct notification_thread_state *state,
3122 uid_t channel_uid, gid_t channel_gid)
3123{
3124 int ret = 0;
3125 struct lttng_dynamic_buffer msg_buffer;
3126 struct notification_client_list_element *client_list_element, *tmp;
9b63a4aa
JG
3127 const struct lttng_notification notification = {
3128 .condition = (struct lttng_condition *) lttng_trigger_get_const_condition(trigger),
3129 .evaluation = (struct lttng_evaluation *) evaluation,
3130 };
3647288f
JG
3131 struct lttng_notification_channel_message msg_header = {
3132 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION,
3133 };
ab0ee2ca
JG
3134
3135 lttng_dynamic_buffer_init(&msg_buffer);
3136
3647288f
JG
3137 ret = lttng_dynamic_buffer_append(&msg_buffer, &msg_header,
3138 sizeof(msg_header));
ab0ee2ca
JG
3139 if (ret) {
3140 goto end;
3141 }
3142
9b63a4aa 3143 ret = lttng_notification_serialize(&notification, &msg_buffer);
ab0ee2ca 3144 if (ret) {
ab0ee2ca
JG
3145 ERR("[notification-thread] Failed to serialize notification");
3146 ret = -1;
3147 goto end;
3148 }
3149
3647288f
JG
3150 /* Update payload size. */
3151 ((struct lttng_notification_channel_message * ) msg_buffer.data)->size =
3152 (uint32_t) (msg_buffer.size - sizeof(msg_header));
3153
ab0ee2ca
JG
3154 cds_list_for_each_entry_safe(client_list_element, tmp,
3155 &client_list->list, node) {
3156 struct notification_client *client =
3157 client_list_element->client;
3158
3159 if (client->uid != channel_uid && client->gid != channel_gid &&
3160 client->uid != 0) {
3161 /* Client is not allowed to monitor this channel. */
3162 DBG("[notification-thread] Skipping client at it does not have the permission to receive notification for this channel");
3163 continue;
3164 }
3165
3166 DBG("[notification-thread] Sending notification to client (fd = %i, %zu bytes)",
3167 client->socket, msg_buffer.size);
3168 if (client->communication.outbound.buffer.size) {
3169 /*
3170 * Outgoing data is already buffered for this client;
3171 * drop the notification and enqueue a "dropped
3172 * notification" message if this is the first dropped
3173 * notification since the socket spilled-over to the
3174 * queue.
3175 */
3176 DBG("[notification-thread] Dropping notification addressed to client (socket fd = %i)",
3177 client->socket);
3178 if (!client->communication.outbound.dropped_notification) {
3179 client->communication.outbound.dropped_notification = true;
3180 ret = client_enqueue_dropped_notification(
3181 client, state);
3182 if (ret) {
3183 goto end;
3184 }
3185 }
3186 continue;
3187 }
3188
3189 ret = lttng_dynamic_buffer_append_buffer(
3190 &client->communication.outbound.buffer,
3191 &msg_buffer);
3192 if (ret) {
3193 goto end;
3194 }
3195
3196 ret = client_flush_outgoing_queue(client, state);
3197 if (ret) {
3198 goto end;
3199 }
3200 }
3201 ret = 0;
3202end:
ab0ee2ca
JG
3203 lttng_dynamic_buffer_reset(&msg_buffer);
3204 return ret;
3205}
3206
3207int handle_notification_thread_channel_sample(
3208 struct notification_thread_state *state, int pipe,
3209 enum lttng_domain_type domain)
3210{
3211 int ret = 0;
3212 struct lttcomm_consumer_channel_monitor_msg sample_msg;
ab0ee2ca
JG
3213 struct channel_info *channel_info;
3214 struct cds_lfht_node *node;
3215 struct cds_lfht_iter iter;
3216 struct lttng_channel_trigger_list *trigger_list;
3217 struct lttng_trigger_list_element *trigger_list_element;
3218 bool previous_sample_available = false;
e8360425
JD
3219 struct channel_state_sample previous_sample, latest_sample;
3220 uint64_t previous_session_consumed_total, latest_session_consumed_total;
ab0ee2ca
JG
3221
3222 /*
3223 * The monitoring pipe only holds messages smaller than PIPE_BUF,
3224 * ensuring that read/write of sampling messages are atomic.
3225 */
7c2551ef 3226 ret = lttng_read(pipe, &sample_msg, sizeof(sample_msg));
ab0ee2ca
JG
3227 if (ret != sizeof(sample_msg)) {
3228 ERR("[notification-thread] Failed to read from monitoring pipe (fd = %i)",
3229 pipe);
3230 ret = -1;
3231 goto end;
3232 }
3233
3234 ret = 0;
3235 latest_sample.key.key = sample_msg.key;
3236 latest_sample.key.domain = domain;
3237 latest_sample.highest_usage = sample_msg.highest;
3238 latest_sample.lowest_usage = sample_msg.lowest;
e8360425 3239 latest_sample.channel_total_consumed = sample_msg.total_consumed;
ab0ee2ca
JG
3240
3241 rcu_read_lock();
3242
3243 /* Retrieve the channel's informations */
3244 cds_lfht_lookup(state->channels_ht,
3245 hash_channel_key(&latest_sample.key),
3246 match_channel_info,
3247 &latest_sample.key,
3248 &iter);
3249 node = cds_lfht_iter_get_node(&iter);
e0b5f87b 3250 if (caa_unlikely(!node)) {
ab0ee2ca
JG
3251 /*
3252 * Not an error since the consumer can push a sample to the pipe
3253 * and the rest of the session daemon could notify us of the
3254 * channel's destruction before we get a chance to process that
3255 * sample.
3256 */
3257 DBG("[notification-thread] Received a sample for an unknown channel from consumerd, key = %" PRIu64 " in %s domain",
3258 latest_sample.key.key,
3259 domain == LTTNG_DOMAIN_KERNEL ? "kernel" :
3260 "user space");
3261 goto end_unlock;
3262 }
3263 channel_info = caa_container_of(node, struct channel_info,
3264 channels_ht_node);
e8360425 3265 DBG("[notification-thread] Handling channel sample for channel %s (key = %" PRIu64 ") in session %s (highest usage = %" PRIu64 ", lowest usage = %" PRIu64", total consumed = %" PRIu64")",
8abe313a 3266 channel_info->name,
ab0ee2ca 3267 latest_sample.key.key,
8abe313a 3268 channel_info->session_info->name,
ab0ee2ca 3269 latest_sample.highest_usage,
e8360425
JD
3270 latest_sample.lowest_usage,
3271 latest_sample.channel_total_consumed);
3272
3273 previous_session_consumed_total =
3274 channel_info->session_info->consumed_data_size;
ab0ee2ca
JG
3275
3276 /* Retrieve the channel's last sample, if it exists, and update it. */
3277 cds_lfht_lookup(state->channel_state_ht,
3278 hash_channel_key(&latest_sample.key),
3279 match_channel_state_sample,
3280 &latest_sample.key,
3281 &iter);
3282 node = cds_lfht_iter_get_node(&iter);
e0b5f87b 3283 if (caa_likely(node)) {
ab0ee2ca
JG
3284 struct channel_state_sample *stored_sample;
3285
3286 /* Update the sample stored. */
3287 stored_sample = caa_container_of(node,
3288 struct channel_state_sample,
3289 channel_state_ht_node);
e8360425 3290
ab0ee2ca
JG
3291 memcpy(&previous_sample, stored_sample,
3292 sizeof(previous_sample));
3293 stored_sample->highest_usage = latest_sample.highest_usage;
3294 stored_sample->lowest_usage = latest_sample.lowest_usage;
0f0479d7 3295 stored_sample->channel_total_consumed = latest_sample.channel_total_consumed;
ab0ee2ca 3296 previous_sample_available = true;
e8360425
JD
3297
3298 latest_session_consumed_total =
3299 previous_session_consumed_total +
3300 (latest_sample.channel_total_consumed - previous_sample.channel_total_consumed);
ab0ee2ca
JG
3301 } else {
3302 /*
3303 * This is the channel's first sample, allocate space for and
3304 * store the new sample.
3305 */
3306 struct channel_state_sample *stored_sample;
3307
3308 stored_sample = zmalloc(sizeof(*stored_sample));
3309 if (!stored_sample) {
3310 ret = -1;
3311 goto end_unlock;
3312 }
3313
3314 memcpy(stored_sample, &latest_sample, sizeof(*stored_sample));
3315 cds_lfht_node_init(&stored_sample->channel_state_ht_node);
3316 cds_lfht_add(state->channel_state_ht,
3317 hash_channel_key(&stored_sample->key),
3318 &stored_sample->channel_state_ht_node);
e8360425
JD
3319
3320 latest_session_consumed_total =
3321 previous_session_consumed_total +
3322 latest_sample.channel_total_consumed;
ab0ee2ca
JG
3323 }
3324
e8360425
JD
3325 channel_info->session_info->consumed_data_size =
3326 latest_session_consumed_total;
3327
ab0ee2ca
JG
3328 /* Find triggers associated with this channel. */
3329 cds_lfht_lookup(state->channel_triggers_ht,
3330 hash_channel_key(&latest_sample.key),
3331 match_channel_trigger_list,
3332 &latest_sample.key,
3333 &iter);
3334 node = cds_lfht_iter_get_node(&iter);
e0b5f87b 3335 if (caa_likely(!node)) {
ab0ee2ca
JG
3336 goto end_unlock;
3337 }
3338
3339 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
3340 channel_triggers_ht_node);
3341 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
3342 node) {
9b63a4aa
JG
3343 const struct lttng_condition *condition;
3344 const struct lttng_action *action;
3345 const struct lttng_trigger *trigger;
ab0ee2ca
JG
3346 struct notification_client_list *client_list;
3347 struct lttng_evaluation *evaluation = NULL;
3348
3349 trigger = trigger_list_element->trigger;
9b63a4aa 3350 condition = lttng_trigger_get_const_condition(trigger);
ab0ee2ca 3351 assert(condition);
9b63a4aa 3352 action = lttng_trigger_get_const_action(trigger);
ab0ee2ca
JG
3353
3354 /* Notify actions are the only type currently supported. */
9b63a4aa 3355 assert(lttng_action_get_type_const(action) ==
ab0ee2ca
JG
3356 LTTNG_ACTION_TYPE_NOTIFY);
3357
3358 /*
3359 * Check if any client is subscribed to the result of this
3360 * evaluation.
3361 */
ed327204
JG
3362 client_list = get_client_list_from_condition(state, condition);
3363 assert(client_list);
ab0ee2ca
JG
3364 if (cds_list_empty(&client_list->list)) {
3365 /*
3366 * No clients interested in the evaluation's result,
3367 * skip it.
3368 */
3369 continue;
3370 }
3371
ed327204 3372 ret = evaluate_buffer_condition(condition, &evaluation, state,
ab0ee2ca 3373 previous_sample_available ? &previous_sample : NULL,
e8360425
JD
3374 &latest_sample,
3375 previous_session_consumed_total,
3376 latest_session_consumed_total,
3377 channel_info);
3378 if (caa_unlikely(ret)) {
ab0ee2ca
JG
3379 goto end_unlock;
3380 }
3381
e8360425 3382 if (caa_likely(!evaluation)) {
ab0ee2ca
JG
3383 continue;
3384 }
3385
3386 /* Dispatch evaluation result to all clients. */
3387 ret = send_evaluation_to_clients(trigger_list_element->trigger,
3388 evaluation, client_list, state,
8abe313a
JG
3389 channel_info->session_info->uid,
3390 channel_info->session_info->gid);
3391 lttng_evaluation_destroy(evaluation);
e0b5f87b 3392 if (caa_unlikely(ret)) {
ab0ee2ca
JG
3393 goto end_unlock;
3394 }
3395 }
3396end_unlock:
3397 rcu_read_unlock();
3398end:
3399 return ret;
3400}
This page took 0.187896 seconds and 4 git commands to generate.