Docs: clarify the contents of channel_infos_ht
[lttng-tools.git] / src / bin / lttng-sessiond / notification-thread-events.c
CommitLineData
ab0ee2ca
JG
1/*
2 * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18#define _LGPL_SOURCE
19#include <urcu.h>
20#include <urcu/rculfhash.h>
21
ab0ee2ca
JG
22#include <common/defaults.h>
23#include <common/error.h>
24#include <common/futex.h>
25#include <common/unix.h>
26#include <common/dynamic-buffer.h>
27#include <common/hashtable/utils.h>
28#include <common/sessiond-comm/sessiond-comm.h>
29#include <common/macros.h>
30#include <lttng/condition/condition.h>
9b63a4aa 31#include <lttng/action/action-internal.h>
ab0ee2ca
JG
32#include <lttng/notification/notification-internal.h>
33#include <lttng/condition/condition-internal.h>
34#include <lttng/condition/buffer-usage-internal.h>
e8360425 35#include <lttng/condition/session-consumed-size-internal.h>
ea9a44f0 36#include <lttng/condition/session-rotation-internal.h>
ab0ee2ca 37#include <lttng/notification/channel-internal.h>
1da26331 38
ab0ee2ca
JG
39#include <time.h>
40#include <unistd.h>
41#include <assert.h>
42#include <inttypes.h>
d53ea4e4 43#include <fcntl.h>
ab0ee2ca 44
1da26331
JG
45#include "notification-thread.h"
46#include "notification-thread-events.h"
47#include "notification-thread-commands.h"
48#include "lttng-sessiond.h"
49#include "kernel.h"
50
ab0ee2ca
JG
51#define CLIENT_POLL_MASK_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
52#define CLIENT_POLL_MASK_IN_OUT (CLIENT_POLL_MASK_IN | LPOLLOUT)
53
54struct lttng_trigger_list_element {
9e0bb80e
JG
55 /* No ownership of the trigger object is assumed. */
56 const struct lttng_trigger *trigger;
ab0ee2ca
JG
57 struct cds_list_head node;
58};
59
60struct lttng_channel_trigger_list {
61 struct channel_key channel_key;
ea9a44f0 62 /* List of struct lttng_trigger_list_element. */
ab0ee2ca 63 struct cds_list_head list;
ea9a44f0 64 /* Node in the channel_triggers_ht */
ab0ee2ca
JG
65 struct cds_lfht_node channel_triggers_ht_node;
66};
67
ea9a44f0
JG
68/*
69 * List of triggers applying to a given session.
70 *
71 * See:
72 * - lttng_session_trigger_list_create()
73 * - lttng_session_trigger_list_build()
74 * - lttng_session_trigger_list_destroy()
75 * - lttng_session_trigger_list_add()
76 */
77struct lttng_session_trigger_list {
78 /*
79 * Not owned by this; points to the session_info structure's
80 * session name.
81 */
82 const char *session_name;
83 /* List of struct lttng_trigger_list_element. */
84 struct cds_list_head list;
85 /* Node in the session_triggers_ht */
86 struct cds_lfht_node session_triggers_ht_node;
87 /*
88 * Weak reference to the notification system's session triggers
89 * hashtable.
90 *
91 * The session trigger list structure structure is owned by
92 * the session's session_info.
93 *
94 * The session_info is kept alive the the channel_infos holding a
95 * reference to it (reference counting). When those channels are
96 * destroyed (at runtime or on teardown), the reference they hold
97 * to the session_info are released. On destruction of session_info,
98 * session_info_destroy() will remove the list of triggers applying
99 * to this session from the notification system's state.
100 *
101 * This implies that the session_triggers_ht must be destroyed
102 * after the channels.
103 */
104 struct cds_lfht *session_triggers_ht;
105 /* Used for delayed RCU reclaim. */
106 struct rcu_head rcu_node;
107};
108
ab0ee2ca
JG
109struct lttng_trigger_ht_element {
110 struct lttng_trigger *trigger;
111 struct cds_lfht_node node;
112};
113
114struct lttng_condition_list_element {
115 struct lttng_condition *condition;
116 struct cds_list_head node;
117};
118
119struct notification_client_list_element {
120 struct notification_client *client;
121 struct cds_list_head node;
122};
123
124struct notification_client_list {
125 struct lttng_trigger *trigger;
126 struct cds_list_head list;
127 struct cds_lfht_node notification_trigger_ht_node;
128};
129
130struct notification_client {
131 int socket;
132 /* Client protocol version. */
133 uint8_t major, minor;
134 uid_t uid;
135 gid_t gid;
136 /*
5332364f 137 * Indicates if the credentials and versions of the client have been
ab0ee2ca
JG
138 * checked.
139 */
140 bool validated;
141 /*
142 * Conditions to which the client's notification channel is subscribed.
143 * List of struct lttng_condition_list_node. The condition member is
144 * owned by the client.
145 */
146 struct cds_list_head condition_list;
147 struct cds_lfht_node client_socket_ht_node;
148 struct {
149 struct {
14fa22f8
JG
150 /*
151 * During the reception of a message, the reception
152 * buffers' "size" is set to contain the current
153 * message's complete payload.
154 */
ab0ee2ca
JG
155 struct lttng_dynamic_buffer buffer;
156 /* Bytes left to receive for the current message. */
157 size_t bytes_to_receive;
158 /* Type of the message being received. */
159 enum lttng_notification_channel_message_type msg_type;
160 /*
161 * Indicates whether or not credentials are expected
162 * from the client.
163 */
01ea340e 164 bool expect_creds;
ab0ee2ca
JG
165 /*
166 * Indicates whether or not credentials were received
167 * from the client.
168 */
169 bool creds_received;
14fa22f8 170 /* Only used during credentials reception. */
ab0ee2ca
JG
171 lttng_sock_cred creds;
172 } inbound;
173 struct {
174 /*
175 * Indicates whether or not a notification addressed to
176 * this client was dropped because a command reply was
177 * already buffered.
178 *
179 * A notification is dropped whenever the buffer is not
180 * empty.
181 */
182 bool dropped_notification;
183 /*
184 * Indicates whether or not a command reply is already
185 * buffered. In this case, it means that the client is
186 * not consuming command replies before emitting a new
187 * one. This could be caused by a protocol error or a
188 * misbehaving/malicious client.
189 */
190 bool queued_command_reply;
191 struct lttng_dynamic_buffer buffer;
192 } outbound;
193 } communication;
194};
195
196struct channel_state_sample {
197 struct channel_key key;
198 struct cds_lfht_node channel_state_ht_node;
199 uint64_t highest_usage;
200 uint64_t lowest_usage;
e8360425 201 uint64_t channel_total_consumed;
ab0ee2ca
JG
202};
203
e4db5ace 204static unsigned long hash_channel_key(struct channel_key *key);
e8360425 205static int evaluate_condition(const struct lttng_condition *condition,
e4db5ace 206 struct lttng_evaluation **evaluation,
e8360425
JD
207 const struct notification_thread_state *state,
208 const struct channel_state_sample *previous_sample,
209 const struct channel_state_sample *latest_sample,
210 uint64_t previous_session_consumed_total,
211 uint64_t latest_session_consumed_total,
212 struct channel_info *channel_info);
e4db5ace 213static
9b63a4aa
JG
214int send_evaluation_to_clients(const struct lttng_trigger *trigger,
215 const struct lttng_evaluation *evaluation,
e4db5ace
JR
216 struct notification_client_list *client_list,
217 struct notification_thread_state *state,
218 uid_t channel_uid, gid_t channel_gid);
219
8abe313a 220
ea9a44f0 221/* session_info API */
8abe313a
JG
222static
223void session_info_destroy(void *_data);
224static
225void session_info_get(struct session_info *session_info);
226static
227void session_info_put(struct session_info *session_info);
228static
229struct session_info *session_info_create(const char *name,
ea9a44f0
JG
230 uid_t uid, gid_t gid,
231 struct lttng_session_trigger_list *trigger_list);
8abe313a
JG
232static
233void session_info_add_channel(struct session_info *session_info,
234 struct channel_info *channel_info);
235static
236void session_info_remove_channel(struct session_info *session_info,
237 struct channel_info *channel_info);
238
ea9a44f0
JG
239/* lttng_session_trigger_list API */
240static
241struct lttng_session_trigger_list *lttng_session_trigger_list_create(
242 const char *session_name,
243 struct cds_lfht *session_triggers_ht);
244static
245struct lttng_session_trigger_list *lttng_session_trigger_list_build(
246 const struct notification_thread_state *state,
247 const char *session_name);
248static
249void lttng_session_trigger_list_destroy(
250 struct lttng_session_trigger_list *list);
251static
252int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
253 const struct lttng_trigger *trigger);
254
255
ab0ee2ca
JG
256static
257int match_client(struct cds_lfht_node *node, const void *key)
258{
259 /* This double-cast is intended to supress pointer-to-cast warning. */
260 int socket = (int) (intptr_t) key;
261 struct notification_client *client;
262
263 client = caa_container_of(node, struct notification_client,
264 client_socket_ht_node);
265
266 return !!(client->socket == socket);
267}
268
269static
270int match_channel_trigger_list(struct cds_lfht_node *node, const void *key)
271{
272 struct channel_key *channel_key = (struct channel_key *) key;
273 struct lttng_channel_trigger_list *trigger_list;
274
275 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
276 channel_triggers_ht_node);
277
278 return !!((channel_key->key == trigger_list->channel_key.key) &&
279 (channel_key->domain == trigger_list->channel_key.domain));
280}
281
282static
283int match_channel_state_sample(struct cds_lfht_node *node, const void *key)
284{
285 struct channel_key *channel_key = (struct channel_key *) key;
286 struct channel_state_sample *sample;
287
288 sample = caa_container_of(node, struct channel_state_sample,
289 channel_state_ht_node);
290
291 return !!((channel_key->key == sample->key.key) &&
292 (channel_key->domain == sample->key.domain));
293}
294
295static
296int match_channel_info(struct cds_lfht_node *node, const void *key)
297{
298 struct channel_key *channel_key = (struct channel_key *) key;
299 struct channel_info *channel_info;
300
301 channel_info = caa_container_of(node, struct channel_info,
302 channels_ht_node);
303
304 return !!((channel_key->key == channel_info->key.key) &&
305 (channel_key->domain == channel_info->key.domain));
306}
307
308static
309int match_condition(struct cds_lfht_node *node, const void *key)
310{
311 struct lttng_condition *condition_key = (struct lttng_condition *) key;
312 struct lttng_trigger_ht_element *trigger;
313 struct lttng_condition *condition;
314
315 trigger = caa_container_of(node, struct lttng_trigger_ht_element,
316 node);
317 condition = lttng_trigger_get_condition(trigger->trigger);
318 assert(condition);
319
320 return !!lttng_condition_is_equal(condition_key, condition);
321}
322
323static
324int match_client_list(struct cds_lfht_node *node, const void *key)
325{
326 struct lttng_trigger *trigger_key = (struct lttng_trigger *) key;
327 struct notification_client_list *client_list;
328 struct lttng_condition *condition;
329 struct lttng_condition *condition_key = lttng_trigger_get_condition(
330 trigger_key);
331
332 assert(condition_key);
333
334 client_list = caa_container_of(node, struct notification_client_list,
335 notification_trigger_ht_node);
336 condition = lttng_trigger_get_condition(client_list->trigger);
337
338 return !!lttng_condition_is_equal(condition_key, condition);
339}
340
341static
342int match_client_list_condition(struct cds_lfht_node *node, const void *key)
343{
344 struct lttng_condition *condition_key = (struct lttng_condition *) key;
345 struct notification_client_list *client_list;
346 struct lttng_condition *condition;
347
348 assert(condition_key);
349
350 client_list = caa_container_of(node, struct notification_client_list,
351 notification_trigger_ht_node);
352 condition = lttng_trigger_get_condition(client_list->trigger);
353
354 return !!lttng_condition_is_equal(condition_key, condition);
355}
356
357static
358unsigned long lttng_condition_buffer_usage_hash(
9b63a4aa 359 const struct lttng_condition *_condition)
ab0ee2ca 360{
9a2746aa
JG
361 unsigned long hash;
362 unsigned long condition_type;
ab0ee2ca
JG
363 struct lttng_condition_buffer_usage *condition;
364
365 condition = container_of(_condition,
366 struct lttng_condition_buffer_usage, parent);
367
9a2746aa
JG
368 condition_type = (unsigned long) condition->parent.type;
369 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
ab0ee2ca
JG
370 if (condition->session_name) {
371 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
372 }
373 if (condition->channel_name) {
8f56701f 374 hash ^= hash_key_str(condition->channel_name, lttng_ht_seed);
ab0ee2ca
JG
375 }
376 if (condition->domain.set) {
377 hash ^= hash_key_ulong(
378 (void *) condition->domain.type,
379 lttng_ht_seed);
380 }
381 if (condition->threshold_ratio.set) {
382 uint64_t val;
383
384 val = condition->threshold_ratio.value * (double) UINT32_MAX;
385 hash ^= hash_key_u64(&val, lttng_ht_seed);
6633b0dd 386 } else if (condition->threshold_bytes.set) {
ab0ee2ca
JG
387 uint64_t val;
388
389 val = condition->threshold_bytes.value;
390 hash ^= hash_key_u64(&val, lttng_ht_seed);
391 }
392 return hash;
393}
394
e8360425
JD
395static
396unsigned long lttng_condition_session_consumed_size_hash(
9b63a4aa 397 const struct lttng_condition *_condition)
e8360425 398{
9a2746aa
JG
399 unsigned long hash;
400 unsigned long condition_type =
401 (unsigned long) LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE;
e8360425
JD
402 struct lttng_condition_session_consumed_size *condition;
403 uint64_t val;
404
405 condition = container_of(_condition,
406 struct lttng_condition_session_consumed_size, parent);
407
9a2746aa 408 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
e8360425
JD
409 if (condition->session_name) {
410 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
411 }
412 val = condition->consumed_threshold_bytes.value;
413 hash ^= hash_key_u64(&val, lttng_ht_seed);
414 return hash;
415}
416
a8393880
JG
417static
418unsigned long lttng_condition_session_rotation_hash(
419 const struct lttng_condition *_condition)
420{
421 unsigned long hash, condition_type;
422 struct lttng_condition_session_rotation *condition;
423
424 condition = container_of(_condition,
425 struct lttng_condition_session_rotation, parent);
426 condition_type = (unsigned long) condition->parent.type;
427 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
428 assert(condition->session_name);
429 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
430 return hash;
431}
9a2746aa 432
ab0ee2ca
JG
433/*
434 * The lttng_condition hashing code is kept in this file (rather than
435 * condition.c) since it makes use of GPLv2 code (hashtable utils), which we
436 * don't want to link in liblttng-ctl.
437 */
438static
9b63a4aa 439unsigned long lttng_condition_hash(const struct lttng_condition *condition)
ab0ee2ca
JG
440{
441 switch (condition->type) {
442 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
443 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
444 return lttng_condition_buffer_usage_hash(condition);
e8360425
JD
445 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
446 return lttng_condition_session_consumed_size_hash(condition);
a8393880
JG
447 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
448 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
449 return lttng_condition_session_rotation_hash(condition);
ab0ee2ca
JG
450 default:
451 ERR("[notification-thread] Unexpected condition type caught");
452 abort();
453 }
454}
455
e4db5ace
JR
456static
457unsigned long hash_channel_key(struct channel_key *key)
458{
459 unsigned long key_hash = hash_key_u64(&key->key, lttng_ht_seed);
460 unsigned long domain_hash = hash_key_ulong(
461 (void *) (unsigned long) key->domain, lttng_ht_seed);
462
463 return key_hash ^ domain_hash;
464}
465
ab0ee2ca
JG
466static
467void channel_info_destroy(struct channel_info *channel_info)
468{
469 if (!channel_info) {
470 return;
471 }
472
8abe313a
JG
473 if (channel_info->session_info) {
474 session_info_remove_channel(channel_info->session_info,
475 channel_info);
476 session_info_put(channel_info->session_info);
ab0ee2ca 477 }
8abe313a
JG
478 if (channel_info->name) {
479 free(channel_info->name);
ab0ee2ca
JG
480 }
481 free(channel_info);
482}
483
8abe313a 484/* Don't call directly, use the ref-counting mechanism. */
ab0ee2ca 485static
8abe313a 486void session_info_destroy(void *_data)
ab0ee2ca 487{
8abe313a 488 struct session_info *session_info = _data;
3b68d0a3 489 int ret;
ab0ee2ca 490
8abe313a
JG
491 assert(session_info);
492 if (session_info->channel_infos_ht) {
3b68d0a3
JR
493 ret = cds_lfht_destroy(session_info->channel_infos_ht, NULL);
494 if (ret) {
4c3f302b 495 ERR("[notification-thread] Failed to destroy channel information hash table");
3b68d0a3 496 }
8abe313a 497 }
ea9a44f0 498 lttng_session_trigger_list_destroy(session_info->trigger_list);
8abe313a
JG
499 free(session_info->name);
500 free(session_info);
501}
ab0ee2ca 502
8abe313a
JG
503static
504void session_info_get(struct session_info *session_info)
505{
506 if (!session_info) {
507 return;
508 }
509 lttng_ref_get(&session_info->ref);
510}
511
512static
513void session_info_put(struct session_info *session_info)
514{
515 if (!session_info) {
516 return;
ab0ee2ca 517 }
8abe313a
JG
518 lttng_ref_put(&session_info->ref);
519}
520
521static
ea9a44f0
JG
522struct session_info *session_info_create(const char *name, uid_t uid, gid_t gid,
523 struct lttng_session_trigger_list *trigger_list)
8abe313a
JG
524{
525 struct session_info *session_info;
ab0ee2ca 526
8abe313a 527 assert(name);
ab0ee2ca 528
8abe313a
JG
529 session_info = zmalloc(sizeof(*session_info));
530 if (!session_info) {
531 goto end;
532 }
533 lttng_ref_init(&session_info->ref, session_info_destroy);
534
535 session_info->channel_infos_ht = cds_lfht_new(DEFAULT_HT_SIZE,
536 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
537 if (!session_info->channel_infos_ht) {
ab0ee2ca
JG
538 goto error;
539 }
8abe313a
JG
540
541 cds_lfht_node_init(&session_info->sessions_ht_node);
542 session_info->name = strdup(name);
543 if (!session_info->name) {
ab0ee2ca
JG
544 goto error;
545 }
8abe313a
JG
546 session_info->uid = uid;
547 session_info->gid = gid;
ea9a44f0 548 session_info->trigger_list = trigger_list;
8abe313a
JG
549end:
550 return session_info;
551error:
552 session_info_put(session_info);
553 return NULL;
554}
555
556static
557void session_info_add_channel(struct session_info *session_info,
558 struct channel_info *channel_info)
559{
560 rcu_read_lock();
561 cds_lfht_add(session_info->channel_infos_ht,
562 hash_channel_key(&channel_info->key),
563 &channel_info->session_info_channels_ht_node);
564 rcu_read_unlock();
565}
566
567static
568void session_info_remove_channel(struct session_info *session_info,
569 struct channel_info *channel_info)
570{
571 rcu_read_lock();
572 cds_lfht_del(session_info->channel_infos_ht,
573 &channel_info->session_info_channels_ht_node);
574 rcu_read_unlock();
575}
576
577static
578struct channel_info *channel_info_create(const char *channel_name,
579 struct channel_key *channel_key, uint64_t channel_capacity,
580 struct session_info *session_info)
581{
582 struct channel_info *channel_info = zmalloc(sizeof(*channel_info));
583
584 if (!channel_info) {
585 goto end;
586 }
587
ab0ee2ca 588 cds_lfht_node_init(&channel_info->channels_ht_node);
8abe313a
JG
589 cds_lfht_node_init(&channel_info->session_info_channels_ht_node);
590 memcpy(&channel_info->key, channel_key, sizeof(*channel_key));
591 channel_info->capacity = channel_capacity;
592
593 channel_info->name = strdup(channel_name);
594 if (!channel_info->name) {
595 goto error;
596 }
597
598 /*
599 * Set the references between session and channel infos:
600 * - channel_info holds a strong reference to session_info
601 * - session_info holds a weak reference to channel_info
602 */
603 session_info_get(session_info);
604 session_info_add_channel(session_info, channel_info);
605 channel_info->session_info = session_info;
ab0ee2ca 606end:
8abe313a 607 return channel_info;
ab0ee2ca 608error:
8abe313a 609 channel_info_destroy(channel_info);
ab0ee2ca
JG
610 return NULL;
611}
612
e4db5ace
JR
613/* This function must be called with the RCU read lock held. */
614static
615int evaluate_condition_for_client(struct lttng_trigger *trigger,
616 struct lttng_condition *condition,
617 struct notification_client *client,
618 struct notification_thread_state *state)
619{
620 int ret;
621 struct cds_lfht_iter iter;
622 struct cds_lfht_node *node;
623 struct channel_info *channel_info = NULL;
624 struct channel_key *channel_key = NULL;
625 struct channel_state_sample *last_sample = NULL;
626 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
627 struct lttng_evaluation *evaluation = NULL;
628 struct notification_client_list client_list = { 0 };
629 struct notification_client_list_element client_list_element = { 0 };
630
631 assert(trigger);
632 assert(condition);
633 assert(client);
634 assert(state);
635
636 /* Find the channel associated with the trigger. */
637 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter,
638 channel_trigger_list , channel_triggers_ht_node) {
639 struct lttng_trigger_list_element *element;
640
641 cds_list_for_each_entry(element, &channel_trigger_list->list, node) {
9b63a4aa
JG
642 const struct lttng_condition *current_condition =
643 lttng_trigger_get_const_condition(
e4db5ace
JR
644 element->trigger);
645
646 assert(current_condition);
647 if (!lttng_condition_is_equal(condition,
2ae99f0b 648 current_condition)) {
e4db5ace
JR
649 continue;
650 }
651
652 /* Found the trigger, save the channel key. */
653 channel_key = &channel_trigger_list->channel_key;
654 break;
655 }
656 if (channel_key) {
657 /* The channel key was found stop iteration. */
658 break;
659 }
660 }
661
662 if (!channel_key){
663 /* No channel found; normal exit. */
664 DBG("[notification-thread] No channel associated with newly subscribed-to condition");
665 ret = 0;
666 goto end;
667 }
668
669 /* Fetch channel info for the matching channel. */
670 cds_lfht_lookup(state->channels_ht,
671 hash_channel_key(channel_key),
672 match_channel_info,
673 channel_key,
674 &iter);
675 node = cds_lfht_iter_get_node(&iter);
676 assert(node);
677 channel_info = caa_container_of(node, struct channel_info,
678 channels_ht_node);
679
680 /* Retrieve the channel's last sample, if it exists. */
681 cds_lfht_lookup(state->channel_state_ht,
682 hash_channel_key(channel_key),
683 match_channel_state_sample,
684 channel_key,
685 &iter);
686 node = cds_lfht_iter_get_node(&iter);
687 if (node) {
688 last_sample = caa_container_of(node,
689 struct channel_state_sample,
690 channel_state_ht_node);
691 } else {
692 /* Nothing to evaluate, no sample was ever taken. Normal exit */
693 DBG("[notification-thread] No channel sample associated with newly subscribed-to condition");
694 ret = 0;
695 goto end;
696 }
697
e8360425
JD
698 ret = evaluate_condition(condition, &evaluation, state,
699 NULL, last_sample,
700 0, channel_info->session_info->consumed_data_size,
701 channel_info);
e4db5ace 702 if (ret) {
066a3c82 703 WARN("[notification-thread] Fatal error occurred while evaluating a newly subscribed-to condition");
e4db5ace
JR
704 goto end;
705 }
706
707 if (!evaluation) {
708 /* Evaluation yielded nothing. Normal exit. */
709 DBG("[notification-thread] Newly subscribed-to condition evaluated to false, nothing to report to client");
710 ret = 0;
711 goto end;
712 }
713
714 /*
715 * Create a temporary client list with the client currently
716 * subscribing.
717 */
718 cds_lfht_node_init(&client_list.notification_trigger_ht_node);
719 CDS_INIT_LIST_HEAD(&client_list.list);
720 client_list.trigger = trigger;
721
722 CDS_INIT_LIST_HEAD(&client_list_element.node);
723 client_list_element.client = client;
724 cds_list_add(&client_list_element.node, &client_list.list);
725
726 /* Send evaluation result to the newly-subscribed client. */
727 DBG("[notification-thread] Newly subscribed-to condition evaluated to true, notifying client");
728 ret = send_evaluation_to_clients(trigger, evaluation, &client_list,
8abe313a
JG
729 state, channel_info->session_info->uid,
730 channel_info->session_info->gid);
e4db5ace
JR
731
732end:
733 return ret;
734}
735
ab0ee2ca
JG
736static
737int notification_thread_client_subscribe(struct notification_client *client,
738 struct lttng_condition *condition,
739 struct notification_thread_state *state,
740 enum lttng_notification_channel_status *_status)
741{
742 int ret = 0;
743 struct cds_lfht_iter iter;
744 struct cds_lfht_node *node;
745 struct notification_client_list *client_list;
746 struct lttng_condition_list_element *condition_list_element = NULL;
747 struct notification_client_list_element *client_list_element = NULL;
748 enum lttng_notification_channel_status status =
749 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
750
751 /*
752 * Ensure that the client has not already subscribed to this condition
753 * before.
754 */
755 cds_list_for_each_entry(condition_list_element, &client->condition_list, node) {
756 if (lttng_condition_is_equal(condition_list_element->condition,
757 condition)) {
758 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ALREADY_SUBSCRIBED;
759 goto end;
760 }
761 }
762
763 condition_list_element = zmalloc(sizeof(*condition_list_element));
764 if (!condition_list_element) {
765 ret = -1;
766 goto error;
767 }
768 client_list_element = zmalloc(sizeof(*client_list_element));
769 if (!client_list_element) {
770 ret = -1;
771 goto error;
772 }
773
774 rcu_read_lock();
775
776 /*
777 * Add the newly-subscribed condition to the client's subscription list.
778 */
779 CDS_INIT_LIST_HEAD(&condition_list_element->node);
780 condition_list_element->condition = condition;
781 cds_list_add(&condition_list_element->node, &client->condition_list);
782
ab0ee2ca
JG
783 cds_lfht_lookup(state->notification_trigger_clients_ht,
784 lttng_condition_hash(condition),
785 match_client_list_condition,
786 condition,
787 &iter);
788 node = cds_lfht_iter_get_node(&iter);
789 if (!node) {
2ae99f0b
JG
790 /*
791 * No notification-emiting trigger registered with this
792 * condition. We don't evaluate the condition right away
793 * since this trigger is not registered yet.
794 */
4fb43b68 795 free(client_list_element);
ab0ee2ca
JG
796 goto end_unlock;
797 }
798
799 client_list = caa_container_of(node, struct notification_client_list,
800 notification_trigger_ht_node);
2ae99f0b
JG
801 /*
802 * The condition to which the client just subscribed is evaluated
803 * at this point so that conditions that are already TRUE result
804 * in a notification being sent out.
805 */
e4db5ace
JR
806 if (evaluate_condition_for_client(client_list->trigger, condition,
807 client, state)) {
808 WARN("[notification-thread] Evaluation of a condition on client subscription failed, aborting.");
809 ret = -1;
4bd5b4af 810 free(client_list_element);
e4db5ace
JR
811 goto end_unlock;
812 }
813
814 /*
815 * Add the client to the list of clients interested in a given trigger
816 * if a "notification" trigger with a corresponding condition was
817 * added prior.
818 */
ab0ee2ca
JG
819 client_list_element->client = client;
820 CDS_INIT_LIST_HEAD(&client_list_element->node);
821 cds_list_add(&client_list_element->node, &client_list->list);
822end_unlock:
823 rcu_read_unlock();
824end:
825 if (_status) {
826 *_status = status;
827 }
828 return ret;
829error:
830 free(condition_list_element);
831 free(client_list_element);
832 return ret;
833}
834
835static
836int notification_thread_client_unsubscribe(
837 struct notification_client *client,
838 struct lttng_condition *condition,
839 struct notification_thread_state *state,
840 enum lttng_notification_channel_status *_status)
841{
842 struct cds_lfht_iter iter;
843 struct cds_lfht_node *node;
844 struct notification_client_list *client_list;
845 struct lttng_condition_list_element *condition_list_element,
846 *condition_tmp;
847 struct notification_client_list_element *client_list_element,
848 *client_tmp;
849 bool condition_found = false;
850 enum lttng_notification_channel_status status =
851 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
852
853 /* Remove the condition from the client's condition list. */
854 cds_list_for_each_entry_safe(condition_list_element, condition_tmp,
855 &client->condition_list, node) {
856 if (!lttng_condition_is_equal(condition_list_element->condition,
857 condition)) {
858 continue;
859 }
860
861 cds_list_del(&condition_list_element->node);
862 /*
863 * The caller may be iterating on the client's conditions to
864 * tear down a client's connection. In this case, the condition
865 * will be destroyed at the end.
866 */
867 if (condition != condition_list_element->condition) {
868 lttng_condition_destroy(
869 condition_list_element->condition);
870 }
871 free(condition_list_element);
872 condition_found = true;
873 break;
874 }
875
876 if (!condition_found) {
877 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNKNOWN_CONDITION;
878 goto end;
879 }
880
881 /*
882 * Remove the client from the list of clients interested the trigger
883 * matching the condition.
884 */
885 rcu_read_lock();
886 cds_lfht_lookup(state->notification_trigger_clients_ht,
887 lttng_condition_hash(condition),
888 match_client_list_condition,
889 condition,
890 &iter);
891 node = cds_lfht_iter_get_node(&iter);
892 if (!node) {
893 goto end_unlock;
894 }
895
896 client_list = caa_container_of(node, struct notification_client_list,
897 notification_trigger_ht_node);
898 cds_list_for_each_entry_safe(client_list_element, client_tmp,
899 &client_list->list, node) {
900 if (client_list_element->client->socket != client->socket) {
901 continue;
902 }
903 cds_list_del(&client_list_element->node);
904 free(client_list_element);
905 break;
906 }
907end_unlock:
908 rcu_read_unlock();
909end:
910 lttng_condition_destroy(condition);
911 if (_status) {
912 *_status = status;
913 }
914 return 0;
915}
916
917static
918void notification_client_destroy(struct notification_client *client,
919 struct notification_thread_state *state)
920{
921 struct lttng_condition_list_element *condition_list_element, *tmp;
922
923 if (!client) {
924 return;
925 }
926
927 /* Release all conditions to which the client was subscribed. */
928 cds_list_for_each_entry_safe(condition_list_element, tmp,
929 &client->condition_list, node) {
930 (void) notification_thread_client_unsubscribe(client,
931 condition_list_element->condition, state, NULL);
932 }
933
934 if (client->socket >= 0) {
935 (void) lttcomm_close_unix_sock(client->socket);
936 }
937 lttng_dynamic_buffer_reset(&client->communication.inbound.buffer);
938 lttng_dynamic_buffer_reset(&client->communication.outbound.buffer);
939 free(client);
940}
941
942/*
943 * Call with rcu_read_lock held (and hold for the lifetime of the returned
944 * client pointer).
945 */
946static
947struct notification_client *get_client_from_socket(int socket,
948 struct notification_thread_state *state)
949{
950 struct cds_lfht_iter iter;
951 struct cds_lfht_node *node;
952 struct notification_client *client = NULL;
953
954 cds_lfht_lookup(state->client_socket_ht,
955 hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed),
956 match_client,
957 (void *) (unsigned long) socket,
958 &iter);
959 node = cds_lfht_iter_get_node(&iter);
960 if (!node) {
961 goto end;
962 }
963
964 client = caa_container_of(node, struct notification_client,
965 client_socket_ht_node);
966end:
967 return client;
968}
969
970static
e8360425
JD
971bool buffer_usage_condition_applies_to_channel(
972 struct lttng_condition *condition,
8abe313a 973 struct channel_info *channel_info)
ab0ee2ca
JG
974{
975 enum lttng_condition_status status;
e8360425
JD
976 enum lttng_domain_type condition_domain;
977 const char *condition_session_name = NULL;
978 const char *condition_channel_name = NULL;
ab0ee2ca 979
e8360425
JD
980 status = lttng_condition_buffer_usage_get_domain_type(condition,
981 &condition_domain);
982 assert(status == LTTNG_CONDITION_STATUS_OK);
983 if (channel_info->key.domain != condition_domain) {
ab0ee2ca
JG
984 goto fail;
985 }
986
e8360425
JD
987 status = lttng_condition_buffer_usage_get_session_name(
988 condition, &condition_session_name);
989 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
990
991 status = lttng_condition_buffer_usage_get_channel_name(
992 condition, &condition_channel_name);
993 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_channel_name);
994
995 if (strcmp(channel_info->session_info->name, condition_session_name)) {
996 goto fail;
997 }
998 if (strcmp(channel_info->name, condition_channel_name)) {
ab0ee2ca
JG
999 goto fail;
1000 }
1001
e8360425
JD
1002 return true;
1003fail:
1004 return false;
1005}
1006
1007static
1008bool session_consumed_size_condition_applies_to_channel(
1009 struct lttng_condition *condition,
1010 struct channel_info *channel_info)
1011{
1012 enum lttng_condition_status status;
1013 const char *condition_session_name = NULL;
1014
1015 status = lttng_condition_session_consumed_size_get_session_name(
1016 condition, &condition_session_name);
1017 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
1018
1019 if (strcmp(channel_info->session_info->name, condition_session_name)) {
ab0ee2ca
JG
1020 goto fail;
1021 }
1022
e8360425
JD
1023 return true;
1024fail:
1025 return false;
1026}
ab0ee2ca 1027
e8360425
JD
1028static
1029bool trigger_applies_to_channel(struct lttng_trigger *trigger,
1030 struct channel_info *channel_info)
1031{
1032 struct lttng_condition *condition;
1033 bool trigger_applies;
ab0ee2ca 1034
e8360425
JD
1035 condition = lttng_trigger_get_condition(trigger);
1036 if (!condition) {
ab0ee2ca
JG
1037 goto fail;
1038 }
e8360425
JD
1039
1040 switch (lttng_condition_get_type(condition)) {
1041 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
1042 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
1043 trigger_applies = buffer_usage_condition_applies_to_channel(
1044 condition, channel_info);
1045 break;
1046 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
1047 trigger_applies = session_consumed_size_condition_applies_to_channel(
1048 condition, channel_info);
1049 break;
1050 default:
ab0ee2ca
JG
1051 goto fail;
1052 }
1053
e8360425 1054 return trigger_applies;
ab0ee2ca
JG
1055fail:
1056 return false;
1057}
1058
1059static
1060bool trigger_applies_to_client(struct lttng_trigger *trigger,
1061 struct notification_client *client)
1062{
1063 bool applies = false;
1064 struct lttng_condition_list_element *condition_list_element;
1065
1066 cds_list_for_each_entry(condition_list_element, &client->condition_list,
1067 node) {
1068 applies = lttng_condition_is_equal(
1069 condition_list_element->condition,
1070 lttng_trigger_get_condition(trigger));
1071 if (applies) {
1072 break;
1073 }
1074 }
1075 return applies;
1076}
1077
8abe313a
JG
1078static
1079int match_session(struct cds_lfht_node *node, const void *key)
1080{
1081 const char *name = key;
1082 struct session_info *session_info = caa_container_of(
1083 node, struct session_info, sessions_ht_node);
1084
1085 return !strcmp(session_info->name, name);
1086}
1087
ea9a44f0
JG
1088/*
1089 * Allocate an empty lttng_session_trigger_list for the session named
1090 * 'session_name'.
1091 *
1092 * No ownership of 'session_name' is assumed by the session trigger list.
1093 * It is the caller's responsability to ensure the session name is alive
1094 * for as long as this list is.
1095 */
1096static
1097struct lttng_session_trigger_list *lttng_session_trigger_list_create(
1098 const char *session_name,
1099 struct cds_lfht *session_triggers_ht)
1100{
1101 struct lttng_session_trigger_list *list;
1102
1103 list = zmalloc(sizeof(*list));
1104 if (!list) {
1105 goto end;
1106 }
1107 list->session_name = session_name;
1108 CDS_INIT_LIST_HEAD(&list->list);
1109 cds_lfht_node_init(&list->session_triggers_ht_node);
1110 list->session_triggers_ht = session_triggers_ht;
1111
1112 rcu_read_lock();
1113 /* Publish the list through the session_triggers_ht. */
1114 cds_lfht_add(session_triggers_ht,
1115 hash_key_str(session_name, lttng_ht_seed),
1116 &list->session_triggers_ht_node);
1117 rcu_read_unlock();
1118end:
1119 return list;
1120}
1121
1122static
1123void free_session_trigger_list_rcu(struct rcu_head *node)
1124{
1125 free(caa_container_of(node, struct lttng_session_trigger_list,
1126 rcu_node));
1127}
1128
1129static
1130void lttng_session_trigger_list_destroy(struct lttng_session_trigger_list *list)
1131{
1132 struct lttng_trigger_list_element *trigger_list_element, *tmp;
1133
1134 /* Empty the list element by element, and then free the list itself. */
1135 cds_list_for_each_entry_safe(trigger_list_element, tmp,
1136 &list->list, node) {
1137 cds_list_del(&trigger_list_element->node);
1138 free(trigger_list_element);
1139 }
1140 rcu_read_lock();
1141 /* Unpublish the list from the session_triggers_ht. */
1142 cds_lfht_del(list->session_triggers_ht,
1143 &list->session_triggers_ht_node);
1144 rcu_read_unlock();
1145 call_rcu(&list->rcu_node, free_session_trigger_list_rcu);
1146}
1147
1148static
1149int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
1150 const struct lttng_trigger *trigger)
1151{
1152 int ret = 0;
1153 struct lttng_trigger_list_element *new_element =
1154 zmalloc(sizeof(*new_element));
1155
1156 if (!new_element) {
1157 ret = -1;
1158 goto end;
1159 }
1160 CDS_INIT_LIST_HEAD(&new_element->node);
1161 new_element->trigger = trigger;
1162 cds_list_add(&new_element->node, &list->list);
1163end:
1164 return ret;
1165}
1166
1167static
1168bool trigger_applies_to_session(const struct lttng_trigger *trigger,
1169 const char *session_name)
1170{
1171 bool applies = false;
1172 const struct lttng_condition *condition;
1173
1174 condition = lttng_trigger_get_const_condition(trigger);
1175 switch (lttng_condition_get_type(condition)) {
1176 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
1177 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
1178 {
1179 enum lttng_condition_status condition_status;
1180 const char *condition_session_name;
1181
1182 condition_status = lttng_condition_session_rotation_get_session_name(
1183 condition, &condition_session_name);
1184 if (condition_status != LTTNG_CONDITION_STATUS_OK) {
1185 ERR("[notification-thread] Failed to retrieve session rotation condition's session name");
1186 goto end;
1187 }
1188
1189 assert(condition_session_name);
1190 applies = !strcmp(condition_session_name, session_name);
1191 break;
1192 }
1193 default:
1194 goto end;
1195 }
1196end:
1197 return applies;
1198}
1199
1200/*
1201 * Allocate and initialize an lttng_session_trigger_list which contains
1202 * all triggers that apply to the session named 'session_name'.
1203 *
1204 * No ownership of 'session_name' is assumed by the session trigger list.
1205 * It is the caller's responsability to ensure the session name is alive
1206 * for as long as this list is.
1207 */
1208static
1209struct lttng_session_trigger_list *lttng_session_trigger_list_build(
1210 const struct notification_thread_state *state,
1211 const char *session_name)
1212{
1213 int trigger_count = 0;
1214 struct lttng_session_trigger_list *session_trigger_list = NULL;
1215 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1216 struct cds_lfht_iter iter;
1217
1218 session_trigger_list = lttng_session_trigger_list_create(session_name,
1219 state->session_triggers_ht);
1220
1221 /* Add all triggers applying to the session named 'session_name'. */
1222 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1223 node) {
1224 int ret;
1225
1226 if (!trigger_applies_to_session(trigger_ht_element->trigger,
1227 session_name)) {
1228 continue;
1229 }
1230
1231 ret = lttng_session_trigger_list_add(session_trigger_list,
1232 trigger_ht_element->trigger);
1233 if (ret) {
1234 goto error;
1235 }
1236
1237 trigger_count++;
1238 }
1239
1240 DBG("[notification-thread] Found %i triggers that apply to newly created session",
1241 trigger_count);
1242 return session_trigger_list;
1243error:
1244 lttng_session_trigger_list_destroy(session_trigger_list);
1245 return NULL;
1246}
1247
8abe313a
JG
1248static
1249struct session_info *find_or_create_session_info(
1250 struct notification_thread_state *state,
1251 const char *name, uid_t uid, gid_t gid)
1252{
1253 struct session_info *session = NULL;
1254 struct cds_lfht_node *node;
1255 struct cds_lfht_iter iter;
ea9a44f0 1256 struct lttng_session_trigger_list *trigger_list;
8abe313a
JG
1257
1258 rcu_read_lock();
1259 cds_lfht_lookup(state->sessions_ht,
1260 hash_key_str(name, lttng_ht_seed),
1261 match_session,
1262 name,
1263 &iter);
1264 node = cds_lfht_iter_get_node(&iter);
1265 if (node) {
1266 DBG("[notification-thread] Found session info of session \"%s\" (uid = %i, gid = %i)",
1267 name, uid, gid);
1268 session = caa_container_of(node, struct session_info,
1269 sessions_ht_node);
1270 assert(session->uid == uid);
1271 assert(session->gid == gid);
ea9a44f0
JG
1272 goto error;
1273 }
1274
1275 trigger_list = lttng_session_trigger_list_build(state, name);
1276 if (!trigger_list) {
1277 goto error;
8abe313a
JG
1278 }
1279
ea9a44f0 1280 session = session_info_create(name, uid, gid, trigger_list);
8abe313a
JG
1281 if (!session) {
1282 ERR("[notification-thread] Failed to allocation session info for session \"%s\" (uid = %i, gid = %i)",
1283 name, uid, gid);
ea9a44f0 1284 goto error;
8abe313a 1285 }
ea9a44f0 1286 trigger_list = NULL;
577983cb
JG
1287
1288 cds_lfht_add(state->sessions_ht, hash_key_str(name, lttng_ht_seed),
9b63a4aa 1289 &session->sessions_ht_node);
8abe313a
JG
1290 rcu_read_unlock();
1291 return session;
ea9a44f0
JG
1292error:
1293 rcu_read_unlock();
1294 session_info_put(session);
1295 return NULL;
8abe313a
JG
1296}
1297
ab0ee2ca
JG
1298static
1299int handle_notification_thread_command_add_channel(
8abe313a
JG
1300 struct notification_thread_state *state,
1301 const char *session_name, uid_t session_uid, gid_t session_gid,
1302 const char *channel_name, enum lttng_domain_type channel_domain,
1303 uint64_t channel_key_int, uint64_t channel_capacity,
1304 enum lttng_error_code *cmd_result)
ab0ee2ca
JG
1305{
1306 struct cds_list_head trigger_list;
8abe313a
JG
1307 struct channel_info *new_channel_info = NULL;
1308 struct channel_key channel_key = {
1309 .key = channel_key_int,
1310 .domain = channel_domain,
1311 };
ab0ee2ca
JG
1312 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
1313 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1314 int trigger_count = 0;
1315 struct cds_lfht_iter iter;
8abe313a 1316 struct session_info *session_info = NULL;
ab0ee2ca
JG
1317
1318 DBG("[notification-thread] Adding channel %s from session %s, channel key = %" PRIu64 " in %s domain",
8abe313a
JG
1319 channel_name, session_name, channel_key_int,
1320 channel_domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
ab0ee2ca
JG
1321
1322 CDS_INIT_LIST_HEAD(&trigger_list);
1323
8abe313a
JG
1324 session_info = find_or_create_session_info(state, session_name,
1325 session_uid, session_gid);
1326 if (!session_info) {
1327 /* Allocation error or an internal error occured. */
ab0ee2ca
JG
1328 goto error;
1329 }
1330
8abe313a
JG
1331 new_channel_info = channel_info_create(channel_name, &channel_key,
1332 channel_capacity, session_info);
1333 if (!new_channel_info) {
1334 goto error;
1335 }
ab0ee2ca
JG
1336
1337 /* Build a list of all triggers applying to the new channel. */
1338 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1339 node) {
1340 struct lttng_trigger_list_element *new_element;
1341
1342 if (!trigger_applies_to_channel(trigger_ht_element->trigger,
8abe313a 1343 new_channel_info)) {
ab0ee2ca
JG
1344 continue;
1345 }
1346
1347 new_element = zmalloc(sizeof(*new_element));
1348 if (!new_element) {
1349 goto error;
1350 }
1351 CDS_INIT_LIST_HEAD(&new_element->node);
1352 new_element->trigger = trigger_ht_element->trigger;
1353 cds_list_add(&new_element->node, &trigger_list);
1354 trigger_count++;
1355 }
1356
1357 DBG("[notification-thread] Found %i triggers that apply to newly added channel",
1358 trigger_count);
1359 channel_trigger_list = zmalloc(sizeof(*channel_trigger_list));
1360 if (!channel_trigger_list) {
1361 goto error;
1362 }
8abe313a 1363 channel_trigger_list->channel_key = new_channel_info->key;
ab0ee2ca
JG
1364 CDS_INIT_LIST_HEAD(&channel_trigger_list->list);
1365 cds_lfht_node_init(&channel_trigger_list->channel_triggers_ht_node);
1366 cds_list_splice(&trigger_list, &channel_trigger_list->list);
1367
1368 rcu_read_lock();
1369 /* Add channel to the channel_ht which owns the channel_infos. */
1370 cds_lfht_add(state->channels_ht,
8abe313a 1371 hash_channel_key(&new_channel_info->key),
ab0ee2ca
JG
1372 &new_channel_info->channels_ht_node);
1373 /*
1374 * Add the list of triggers associated with this channel to the
1375 * channel_triggers_ht.
1376 */
1377 cds_lfht_add(state->channel_triggers_ht,
8abe313a 1378 hash_channel_key(&new_channel_info->key),
ab0ee2ca
JG
1379 &channel_trigger_list->channel_triggers_ht_node);
1380 rcu_read_unlock();
1381 *cmd_result = LTTNG_OK;
1382 return 0;
1383error:
ab0ee2ca 1384 channel_info_destroy(new_channel_info);
8abe313a 1385 session_info_put(session_info);
ab0ee2ca
JG
1386 return 1;
1387}
1388
1389static
1390int handle_notification_thread_command_remove_channel(
1391 struct notification_thread_state *state,
1392 uint64_t channel_key, enum lttng_domain_type domain,
1393 enum lttng_error_code *cmd_result)
1394{
1395 struct cds_lfht_node *node;
1396 struct cds_lfht_iter iter;
1397 struct lttng_channel_trigger_list *trigger_list;
1398 struct lttng_trigger_list_element *trigger_list_element, *tmp;
1399 struct channel_key key = { .key = channel_key, .domain = domain };
1400 struct channel_info *channel_info;
1401
1402 DBG("[notification-thread] Removing channel key = %" PRIu64 " in %s domain",
1403 channel_key, domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
1404
1405 rcu_read_lock();
1406
1407 cds_lfht_lookup(state->channel_triggers_ht,
1408 hash_channel_key(&key),
1409 match_channel_trigger_list,
1410 &key,
1411 &iter);
1412 node = cds_lfht_iter_get_node(&iter);
1413 /*
1414 * There is a severe internal error if we are being asked to remove a
1415 * channel that doesn't exist.
1416 */
1417 if (!node) {
1418 ERR("[notification-thread] Channel being removed is unknown to the notification thread");
1419 goto end;
1420 }
1421
1422 /* Free the list of triggers associated with this channel. */
1423 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
1424 channel_triggers_ht_node);
1425 cds_list_for_each_entry_safe(trigger_list_element, tmp,
1426 &trigger_list->list, node) {
1427 cds_list_del(&trigger_list_element->node);
1428 free(trigger_list_element);
1429 }
1430 cds_lfht_del(state->channel_triggers_ht, node);
1431 free(trigger_list);
1432
1433 /* Free sampled channel state. */
1434 cds_lfht_lookup(state->channel_state_ht,
1435 hash_channel_key(&key),
1436 match_channel_state_sample,
1437 &key,
1438 &iter);
1439 node = cds_lfht_iter_get_node(&iter);
1440 /*
1441 * This is expected to be NULL if the channel is destroyed before we
1442 * received a sample.
1443 */
1444 if (node) {
1445 struct channel_state_sample *sample = caa_container_of(node,
1446 struct channel_state_sample,
1447 channel_state_ht_node);
1448
1449 cds_lfht_del(state->channel_state_ht, node);
1450 free(sample);
1451 }
1452
1453 /* Remove the channel from the channels_ht and free it. */
1454 cds_lfht_lookup(state->channels_ht,
1455 hash_channel_key(&key),
1456 match_channel_info,
1457 &key,
1458 &iter);
1459 node = cds_lfht_iter_get_node(&iter);
1460 assert(node);
1461 channel_info = caa_container_of(node, struct channel_info,
1462 channels_ht_node);
1463 cds_lfht_del(state->channels_ht, node);
1464 channel_info_destroy(channel_info);
1465end:
1466 rcu_read_unlock();
1467 *cmd_result = LTTNG_OK;
1468 return 0;
1469}
1470
1da26331
JG
1471static
1472int condition_is_supported(struct lttng_condition *condition)
1473{
1474 int ret;
1475
1476 switch (lttng_condition_get_type(condition)) {
1477 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
1478 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
1479 {
1480 enum lttng_domain_type domain;
1481
1482 ret = lttng_condition_buffer_usage_get_domain_type(condition,
1483 &domain);
1484 if (ret) {
1485 ret = -1;
1486 goto end;
1487 }
1488
1489 if (domain != LTTNG_DOMAIN_KERNEL) {
1490 ret = 1;
1491 goto end;
1492 }
1493
1494 /*
1495 * Older kernel tracers don't expose the API to monitor their
1496 * buffers. Therefore, we reject triggers that require that
1497 * mechanism to be available to be evaluated.
1498 */
1499 ret = kernel_supports_ring_buffer_snapshot_sample_positions(
1500 kernel_tracer_fd);
1501 break;
1502 }
1503 default:
1504 ret = 1;
1505 }
1506end:
1507 return ret;
1508}
1509
ab0ee2ca
JG
1510/*
1511 * FIXME A client's credentials are not checked when registering a trigger, nor
1512 * are they stored alongside with the trigger.
1513 *
1da26331 1514 * The effects of this are benign since:
ab0ee2ca
JG
1515 * - The client will succeed in registering the trigger, as it is valid,
1516 * - The trigger will, internally, be bound to the channel,
1517 * - The notifications will not be sent since the client's credentials
1518 * are checked against the channel at that moment.
1da26331
JG
1519 *
1520 * If this function returns a non-zero value, it means something is
50ca7858 1521 * fundamentally broken and the whole subsystem/thread will be torn down.
1da26331
JG
1522 *
1523 * If a non-fatal error occurs, just set the cmd_result to the appropriate
1524 * error code.
ab0ee2ca
JG
1525 */
1526static
1527int handle_notification_thread_command_register_trigger(
8abe313a
JG
1528 struct notification_thread_state *state,
1529 struct lttng_trigger *trigger,
1530 enum lttng_error_code *cmd_result)
ab0ee2ca
JG
1531{
1532 int ret = 0;
1533 struct lttng_condition *condition;
1534 struct notification_client *client;
1535 struct notification_client_list *client_list = NULL;
1536 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1537 struct notification_client_list_element *client_list_element, *tmp;
1538 struct cds_lfht_node *node;
1539 struct cds_lfht_iter iter;
1540 struct channel_info *channel;
1541 bool free_trigger = true;
1542
1543 rcu_read_lock();
1544
1545 condition = lttng_trigger_get_condition(trigger);
1da26331
JG
1546 assert(condition);
1547
1548 ret = condition_is_supported(condition);
1549 if (ret < 0) {
1550 goto error;
1551 } else if (ret == 0) {
1552 *cmd_result = LTTNG_ERR_NOT_SUPPORTED;
1553 goto error;
1554 } else {
1555 /* Feature is supported, continue. */
1556 ret = 0;
1557 }
1558
ab0ee2ca
JG
1559 trigger_ht_element = zmalloc(sizeof(*trigger_ht_element));
1560 if (!trigger_ht_element) {
1561 ret = -1;
1562 goto error;
1563 }
1564
1565 /* Add trigger to the trigger_ht. */
1566 cds_lfht_node_init(&trigger_ht_element->node);
1567 trigger_ht_element->trigger = trigger;
1568
1569 node = cds_lfht_add_unique(state->triggers_ht,
1570 lttng_condition_hash(condition),
1571 match_condition,
1572 condition,
1573 &trigger_ht_element->node);
1574 if (node != &trigger_ht_element->node) {
1575 /* Not a fatal error, simply report it to the client. */
1576 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
1577 goto error_free_ht_element;
1578 }
1579
1580 /*
1581 * Ownership of the trigger and of its wrapper was transfered to
1582 * the triggers_ht.
1583 */
1584 trigger_ht_element = NULL;
1585 free_trigger = false;
1586
1587 /*
1588 * The rest only applies to triggers that have a "notify" action.
1589 * It is not skipped as this is the only action type currently
1590 * supported.
1591 */
1592 client_list = zmalloc(sizeof(*client_list));
1593 if (!client_list) {
1594 ret = -1;
1595 goto error_free_ht_element;
1596 }
1597 cds_lfht_node_init(&client_list->notification_trigger_ht_node);
1598 CDS_INIT_LIST_HEAD(&client_list->list);
1599 client_list->trigger = trigger;
1600
1601 /* Build a list of clients to which this new trigger applies. */
1602 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
1603 client_socket_ht_node) {
1604 if (!trigger_applies_to_client(trigger, client)) {
1605 continue;
1606 }
1607
1608 client_list_element = zmalloc(sizeof(*client_list_element));
1609 if (!client_list_element) {
1610 ret = -1;
1611 goto error_free_client_list;
1612 }
1613 CDS_INIT_LIST_HEAD(&client_list_element->node);
1614 client_list_element->client = client;
1615 cds_list_add(&client_list_element->node, &client_list->list);
1616 }
1617
1618 cds_lfht_add(state->notification_trigger_clients_ht,
1619 lttng_condition_hash(condition),
1620 &client_list->notification_trigger_ht_node);
ab0ee2ca
JG
1621
1622 /*
1623 * Add the trigger to list of triggers bound to the channels currently
1624 * known.
1625 */
1626 cds_lfht_for_each_entry(state->channels_ht, &iter, channel,
1627 channels_ht_node) {
1628 struct lttng_trigger_list_element *trigger_list_element;
1629 struct lttng_channel_trigger_list *trigger_list;
1630
1631 if (!trigger_applies_to_channel(trigger, channel)) {
1632 continue;
1633 }
1634
1635 cds_lfht_lookup(state->channel_triggers_ht,
1636 hash_channel_key(&channel->key),
1637 match_channel_trigger_list,
1638 &channel->key,
1639 &iter);
1640 node = cds_lfht_iter_get_node(&iter);
1641 assert(node);
ab0ee2ca
JG
1642 trigger_list = caa_container_of(node,
1643 struct lttng_channel_trigger_list,
1644 channel_triggers_ht_node);
1645
1646 trigger_list_element = zmalloc(sizeof(*trigger_list_element));
1647 if (!trigger_list_element) {
1648 ret = -1;
1649 goto error_free_client_list;
1650 }
1651 CDS_INIT_LIST_HEAD(&trigger_list_element->node);
1652 trigger_list_element->trigger = trigger;
1653 cds_list_add(&trigger_list_element->node, &trigger_list->list);
ab0ee2ca
JG
1654 }
1655
2ae99f0b
JG
1656 /*
1657 * Since there is nothing preventing clients from subscribing to a
1658 * condition before the corresponding trigger is registered, we have
1659 * to evaluate this new condition right away.
1660 *
1661 * At some point, we were waiting for the next "evaluation" (e.g. on
1662 * reception of a channel sample) to evaluate this new condition, but
1663 * that was broken.
1664 *
1665 * The reason it was broken is that waiting for the next sample
1666 * does not allow us to properly handle transitions for edge-triggered
1667 * conditions.
1668 *
1669 * Consider this example: when we handle a new channel sample, we
1670 * evaluate each conditions twice: once with the previous state, and
1671 * again with the newest state. We then use those two results to
1672 * determine whether a state change happened: a condition was false and
1673 * became true. If a state change happened, we have to notify clients.
1674 *
1675 * Now, if a client subscribes to a given notification and registers
1676 * a trigger *after* that subscription, we have to make sure the
1677 * condition is evaluated at this point while considering only the
1678 * current state. Otherwise, the next evaluation cycle may only see
1679 * that the evaluations remain the same (true for samples n-1 and n) and
1680 * the client will never know that the condition has been met.
1681 */
1682 cds_list_for_each_entry_safe(client_list_element, tmp,
1683 &client_list->list, node) {
1684 ret = evaluate_condition_for_client(trigger, condition,
1685 client_list_element->client, state);
1686 if (ret) {
1687 goto error_free_client_list;
1688 }
1689 }
1690
1691 /*
1692 * Client list ownership transferred to the
1693 * notification_trigger_clients_ht.
1694 */
1695 client_list = NULL;
1696
ab0ee2ca
JG
1697 *cmd_result = LTTNG_OK;
1698error_free_client_list:
1699 if (client_list) {
1700 cds_list_for_each_entry_safe(client_list_element, tmp,
1701 &client_list->list, node) {
1702 free(client_list_element);
1703 }
1704 free(client_list);
1705 }
1706error_free_ht_element:
1707 free(trigger_ht_element);
1708error:
1709 if (free_trigger) {
1710 struct lttng_action *action = lttng_trigger_get_action(trigger);
1711
1712 lttng_condition_destroy(condition);
1713 lttng_action_destroy(action);
1714 lttng_trigger_destroy(trigger);
1715 }
1716 rcu_read_unlock();
1717 return ret;
1718}
1719
cc2295b5 1720static
ab0ee2ca
JG
1721int handle_notification_thread_command_unregister_trigger(
1722 struct notification_thread_state *state,
1723 struct lttng_trigger *trigger,
1724 enum lttng_error_code *_cmd_reply)
1725{
1726 struct cds_lfht_iter iter;
1727 struct cds_lfht_node *node, *triggers_ht_node;
1728 struct lttng_channel_trigger_list *trigger_list;
1729 struct notification_client_list *client_list;
1730 struct notification_client_list_element *client_list_element, *tmp;
1731 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1732 struct lttng_condition *condition = lttng_trigger_get_condition(
1733 trigger);
1734 struct lttng_action *action;
1735 enum lttng_error_code cmd_reply;
1736
1737 rcu_read_lock();
1738
1739 cds_lfht_lookup(state->triggers_ht,
1740 lttng_condition_hash(condition),
1741 match_condition,
1742 condition,
1743 &iter);
1744 triggers_ht_node = cds_lfht_iter_get_node(&iter);
1745 if (!triggers_ht_node) {
1746 cmd_reply = LTTNG_ERR_TRIGGER_NOT_FOUND;
1747 goto end;
1748 } else {
1749 cmd_reply = LTTNG_OK;
1750 }
1751
1752 /* Remove trigger from channel_triggers_ht. */
1753 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter, trigger_list,
1754 channel_triggers_ht_node) {
1755 struct lttng_trigger_list_element *trigger_element, *tmp;
1756
1757 cds_list_for_each_entry_safe(trigger_element, tmp,
1758 &trigger_list->list, node) {
9b63a4aa
JG
1759 const struct lttng_condition *current_condition =
1760 lttng_trigger_get_const_condition(
ab0ee2ca
JG
1761 trigger_element->trigger);
1762
1763 assert(current_condition);
1764 if (!lttng_condition_is_equal(condition,
1765 current_condition)) {
1766 continue;
1767 }
1768
1769 DBG("[notification-thread] Removed trigger from channel_triggers_ht");
1770 cds_list_del(&trigger_element->node);
e4db5ace
JR
1771 /* A trigger can only appear once per channel */
1772 break;
ab0ee2ca
JG
1773 }
1774 }
1775
1776 /*
1777 * Remove and release the client list from
1778 * notification_trigger_clients_ht.
1779 */
1780 cds_lfht_lookup(state->notification_trigger_clients_ht,
1781 lttng_condition_hash(condition),
1782 match_client_list,
1783 trigger,
1784 &iter);
1785 node = cds_lfht_iter_get_node(&iter);
1786 assert(node);
1787 client_list = caa_container_of(node, struct notification_client_list,
1788 notification_trigger_ht_node);
1789 cds_list_for_each_entry_safe(client_list_element, tmp,
1790 &client_list->list, node) {
1791 free(client_list_element);
1792 }
1793 cds_lfht_del(state->notification_trigger_clients_ht, node);
1794 free(client_list);
1795
1796 /* Remove trigger from triggers_ht. */
1797 trigger_ht_element = caa_container_of(triggers_ht_node,
1798 struct lttng_trigger_ht_element, node);
1799 cds_lfht_del(state->triggers_ht, triggers_ht_node);
1800
1801 condition = lttng_trigger_get_condition(trigger_ht_element->trigger);
1802 lttng_condition_destroy(condition);
1803 action = lttng_trigger_get_action(trigger_ht_element->trigger);
1804 lttng_action_destroy(action);
1805 lttng_trigger_destroy(trigger_ht_element->trigger);
1806 free(trigger_ht_element);
1807end:
1808 rcu_read_unlock();
1809 if (_cmd_reply) {
1810 *_cmd_reply = cmd_reply;
1811 }
1812 return 0;
1813}
1814
1815/* Returns 0 on success, 1 on exit requested, negative value on error. */
1816int handle_notification_thread_command(
1817 struct notification_thread_handle *handle,
1818 struct notification_thread_state *state)
1819{
1820 int ret;
1821 uint64_t counter;
1822 struct notification_thread_command *cmd;
1823
8abe313a
JG
1824 /* Read the event pipe to put it back into a quiescent state. */
1825 ret = read(lttng_pipe_get_readfd(handle->cmd_queue.event_pipe), &counter,
1826 sizeof(counter));
ab0ee2ca
JG
1827 if (ret == -1) {
1828 goto error;
1829 }
1830
1831 pthread_mutex_lock(&handle->cmd_queue.lock);
1832 cmd = cds_list_first_entry(&handle->cmd_queue.list,
1833 struct notification_thread_command, cmd_list_node);
1834 switch (cmd->type) {
1835 case NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER:
1836 DBG("[notification-thread] Received register trigger command");
1837 ret = handle_notification_thread_command_register_trigger(
1838 state, cmd->parameters.trigger,
1839 &cmd->reply_code);
1840 break;
1841 case NOTIFICATION_COMMAND_TYPE_UNREGISTER_TRIGGER:
1842 DBG("[notification-thread] Received unregister trigger command");
1843 ret = handle_notification_thread_command_unregister_trigger(
1844 state, cmd->parameters.trigger,
1845 &cmd->reply_code);
1846 break;
1847 case NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL:
1848 DBG("[notification-thread] Received add channel command");
1849 ret = handle_notification_thread_command_add_channel(
8abe313a
JG
1850 state,
1851 cmd->parameters.add_channel.session.name,
1852 cmd->parameters.add_channel.session.uid,
1853 cmd->parameters.add_channel.session.gid,
1854 cmd->parameters.add_channel.channel.name,
1855 cmd->parameters.add_channel.channel.domain,
1856 cmd->parameters.add_channel.channel.key,
1857 cmd->parameters.add_channel.channel.capacity,
ab0ee2ca
JG
1858 &cmd->reply_code);
1859 break;
1860 case NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL:
1861 DBG("[notification-thread] Received remove channel command");
1862 ret = handle_notification_thread_command_remove_channel(
1863 state, cmd->parameters.remove_channel.key,
1864 cmd->parameters.remove_channel.domain,
1865 &cmd->reply_code);
1866 break;
1867 case NOTIFICATION_COMMAND_TYPE_QUIT:
1868 DBG("[notification-thread] Received quit command");
1869 cmd->reply_code = LTTNG_OK;
1870 ret = 1;
1871 goto end;
1872 default:
1873 ERR("[notification-thread] Unknown internal command received");
1874 goto error_unlock;
1875 }
1876
1877 if (ret) {
1878 goto error_unlock;
1879 }
1880end:
1881 cds_list_del(&cmd->cmd_list_node);
8ada111f 1882 lttng_waiter_wake_up(&cmd->reply_waiter);
ab0ee2ca
JG
1883 pthread_mutex_unlock(&handle->cmd_queue.lock);
1884 return ret;
1885error_unlock:
1886 /* Wake-up and return a fatal error to the calling thread. */
8ada111f 1887 lttng_waiter_wake_up(&cmd->reply_waiter);
ab0ee2ca
JG
1888 pthread_mutex_unlock(&handle->cmd_queue.lock);
1889 cmd->reply_code = LTTNG_ERR_FATAL;
1890error:
1891 /* Indicate a fatal error to the caller. */
1892 return -1;
1893}
1894
1895static
1896unsigned long hash_client_socket(int socket)
1897{
1898 return hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed);
1899}
1900
1901static
1902int socket_set_non_blocking(int socket)
1903{
1904 int ret, flags;
1905
1906 /* Set the pipe as non-blocking. */
1907 ret = fcntl(socket, F_GETFL, 0);
1908 if (ret == -1) {
1909 PERROR("fcntl get socket flags");
1910 goto end;
1911 }
1912 flags = ret;
1913
1914 ret = fcntl(socket, F_SETFL, flags | O_NONBLOCK);
1915 if (ret == -1) {
1916 PERROR("fcntl set O_NONBLOCK socket flag");
1917 goto end;
1918 }
1919 DBG("Client socket (fd = %i) set as non-blocking", socket);
1920end:
1921 return ret;
1922}
1923
1924static
14fa22f8 1925int client_reset_inbound_state(struct notification_client *client)
ab0ee2ca
JG
1926{
1927 int ret;
1928
1929 ret = lttng_dynamic_buffer_set_size(
1930 &client->communication.inbound.buffer, 0);
1931 assert(!ret);
1932
1933 client->communication.inbound.bytes_to_receive =
1934 sizeof(struct lttng_notification_channel_message);
1935 client->communication.inbound.msg_type =
1936 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN;
ab0ee2ca
JG
1937 LTTNG_SOCK_SET_UID_CRED(&client->communication.inbound.creds, -1);
1938 LTTNG_SOCK_SET_GID_CRED(&client->communication.inbound.creds, -1);
14fa22f8
JG
1939 ret = lttng_dynamic_buffer_set_size(
1940 &client->communication.inbound.buffer,
1941 client->communication.inbound.bytes_to_receive);
1942 return ret;
ab0ee2ca
JG
1943}
1944
1945int handle_notification_thread_client_connect(
1946 struct notification_thread_state *state)
1947{
1948 int ret;
1949 struct notification_client *client;
1950
1951 DBG("[notification-thread] Handling new notification channel client connection");
1952
1953 client = zmalloc(sizeof(*client));
1954 if (!client) {
1955 /* Fatal error. */
1956 ret = -1;
1957 goto error;
1958 }
1959 CDS_INIT_LIST_HEAD(&client->condition_list);
1960 lttng_dynamic_buffer_init(&client->communication.inbound.buffer);
1961 lttng_dynamic_buffer_init(&client->communication.outbound.buffer);
01ea340e 1962 client->communication.inbound.expect_creds = true;
14fa22f8
JG
1963 ret = client_reset_inbound_state(client);
1964 if (ret) {
1965 ERR("[notification-thread] Failed to reset client communication's inbound state");
1966 ret = 0;
1967 goto error;
1968 }
ab0ee2ca
JG
1969
1970 ret = lttcomm_accept_unix_sock(state->notification_channel_socket);
1971 if (ret < 0) {
1972 ERR("[notification-thread] Failed to accept new notification channel client connection");
1973 ret = 0;
1974 goto error;
1975 }
1976
1977 client->socket = ret;
1978
1979 ret = socket_set_non_blocking(client->socket);
1980 if (ret) {
1981 ERR("[notification-thread] Failed to set new notification channel client connection socket as non-blocking");
1982 goto error;
1983 }
1984
1985 ret = lttcomm_setsockopt_creds_unix_sock(client->socket);
1986 if (ret < 0) {
1987 ERR("[notification-thread] Failed to set socket options on new notification channel client socket");
1988 ret = 0;
1989 goto error;
1990 }
1991
1992 ret = lttng_poll_add(&state->events, client->socket,
1993 LPOLLIN | LPOLLERR |
1994 LPOLLHUP | LPOLLRDHUP);
1995 if (ret < 0) {
1996 ERR("[notification-thread] Failed to add notification channel client socket to poll set");
1997 ret = 0;
1998 goto error;
1999 }
2000 DBG("[notification-thread] Added new notification channel client socket (%i) to poll set",
2001 client->socket);
2002
ab0ee2ca
JG
2003 rcu_read_lock();
2004 cds_lfht_add(state->client_socket_ht,
2005 hash_client_socket(client->socket),
2006 &client->client_socket_ht_node);
2007 rcu_read_unlock();
2008
2009 return ret;
2010error:
2011 notification_client_destroy(client, state);
2012 return ret;
2013}
2014
2015int handle_notification_thread_client_disconnect(
2016 int client_socket,
2017 struct notification_thread_state *state)
2018{
2019 int ret = 0;
2020 struct notification_client *client;
2021
2022 rcu_read_lock();
2023 DBG("[notification-thread] Closing client connection (socket fd = %i)",
2024 client_socket);
2025 client = get_client_from_socket(client_socket, state);
2026 if (!client) {
2027 /* Internal state corruption, fatal error. */
2028 ERR("[notification-thread] Unable to find client (socket fd = %i)",
2029 client_socket);
2030 ret = -1;
2031 goto end;
2032 }
2033
2034 ret = lttng_poll_del(&state->events, client_socket);
2035 if (ret) {
2036 ERR("[notification-thread] Failed to remove client socket from poll set");
2037 }
2038 cds_lfht_del(state->client_socket_ht,
2039 &client->client_socket_ht_node);
2040 notification_client_destroy(client, state);
2041end:
2042 rcu_read_unlock();
2043 return ret;
2044}
2045
2046int handle_notification_thread_client_disconnect_all(
2047 struct notification_thread_state *state)
2048{
2049 struct cds_lfht_iter iter;
2050 struct notification_client *client;
2051 bool error_encoutered = false;
2052
2053 rcu_read_lock();
2054 DBG("[notification-thread] Closing all client connections");
2055 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
2056 client_socket_ht_node) {
2057 int ret;
2058
2059 ret = handle_notification_thread_client_disconnect(
2060 client->socket, state);
2061 if (ret) {
2062 error_encoutered = true;
2063 }
2064 }
2065 rcu_read_unlock();
2066 return error_encoutered ? 1 : 0;
2067}
2068
2069int handle_notification_thread_trigger_unregister_all(
2070 struct notification_thread_state *state)
2071{
4149ace8 2072 bool error_occurred = false;
ab0ee2ca
JG
2073 struct cds_lfht_iter iter;
2074 struct lttng_trigger_ht_element *trigger_ht_element;
2075
2076 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
2077 node) {
2078 int ret = handle_notification_thread_command_unregister_trigger(
2079 state, trigger_ht_element->trigger, NULL);
2080 if (ret) {
4149ace8 2081 error_occurred = true;
ab0ee2ca
JG
2082 }
2083 }
4149ace8 2084 return error_occurred ? -1 : 0;
ab0ee2ca
JG
2085}
2086
2087static
2088int client_flush_outgoing_queue(struct notification_client *client,
2089 struct notification_thread_state *state)
2090{
2091 ssize_t ret;
2092 size_t to_send_count;
2093
2094 assert(client->communication.outbound.buffer.size != 0);
2095 to_send_count = client->communication.outbound.buffer.size;
2096 DBG("[notification-thread] Flushing client (socket fd = %i) outgoing queue",
2097 client->socket);
2098
2099 ret = lttcomm_send_unix_sock_non_block(client->socket,
2100 client->communication.outbound.buffer.data,
2101 to_send_count);
2102 if ((ret < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) ||
2103 (ret > 0 && ret < to_send_count)) {
2104 DBG("[notification-thread] Client (socket fd = %i) outgoing queue could not be completely flushed",
2105 client->socket);
2106 to_send_count -= max(ret, 0);
2107
2108 memcpy(client->communication.outbound.buffer.data,
2109 client->communication.outbound.buffer.data +
2110 client->communication.outbound.buffer.size - to_send_count,
2111 to_send_count);
2112 ret = lttng_dynamic_buffer_set_size(
2113 &client->communication.outbound.buffer,
2114 to_send_count);
2115 if (ret) {
2116 goto error;
2117 }
2118
2119 /*
2120 * We want to be notified whenever there is buffer space
2121 * available to send the rest of the payload.
2122 */
2123 ret = lttng_poll_mod(&state->events, client->socket,
2124 CLIENT_POLL_MASK_IN_OUT);
2125 if (ret) {
2126 goto error;
2127 }
2128 } else if (ret < 0) {
2129 /* Generic error, disconnect the client. */
2130 ERR("[notification-thread] Failed to send flush outgoing queue, disconnecting client (socket fd = %i)",
2131 client->socket);
2132 ret = handle_notification_thread_client_disconnect(
2133 client->socket, state);
2134 if (ret) {
2135 goto error;
2136 }
2137 } else {
2138 /* No error and flushed the queue completely. */
2139 ret = lttng_dynamic_buffer_set_size(
2140 &client->communication.outbound.buffer, 0);
2141 if (ret) {
2142 goto error;
2143 }
2144 ret = lttng_poll_mod(&state->events, client->socket,
2145 CLIENT_POLL_MASK_IN);
2146 if (ret) {
2147 goto error;
2148 }
2149
2150 client->communication.outbound.queued_command_reply = false;
2151 client->communication.outbound.dropped_notification = false;
2152 }
2153
2154 return 0;
2155error:
2156 return -1;
2157}
2158
2159static
2160int client_send_command_reply(struct notification_client *client,
2161 struct notification_thread_state *state,
2162 enum lttng_notification_channel_status status)
2163{
2164 int ret;
2165 struct lttng_notification_channel_command_reply reply = {
2166 .status = (int8_t) status,
2167 };
2168 struct lttng_notification_channel_message msg = {
2169 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY,
2170 .size = sizeof(reply),
2171 };
2172 char buffer[sizeof(msg) + sizeof(reply)];
2173
2174 if (client->communication.outbound.queued_command_reply) {
2175 /* Protocol error. */
2176 goto error;
2177 }
2178
2179 memcpy(buffer, &msg, sizeof(msg));
2180 memcpy(buffer + sizeof(msg), &reply, sizeof(reply));
2181 DBG("[notification-thread] Send command reply (%i)", (int) status);
2182
2183 /* Enqueue buffer to outgoing queue and flush it. */
2184 ret = lttng_dynamic_buffer_append(
2185 &client->communication.outbound.buffer,
2186 buffer, sizeof(buffer));
2187 if (ret) {
2188 goto error;
2189 }
2190
2191 ret = client_flush_outgoing_queue(client, state);
2192 if (ret) {
2193 goto error;
2194 }
2195
2196 if (client->communication.outbound.buffer.size != 0) {
2197 /* Queue could not be emptied. */
2198 client->communication.outbound.queued_command_reply = true;
2199 }
2200
2201 return 0;
2202error:
2203 return -1;
2204}
2205
2206static
2207int client_dispatch_message(struct notification_client *client,
2208 struct notification_thread_state *state)
2209{
2210 int ret = 0;
2211
2212 if (client->communication.inbound.msg_type !=
2213 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE &&
2214 client->communication.inbound.msg_type !=
2215 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN &&
2216 !client->validated) {
2217 WARN("[notification-thread] client attempted a command before handshake");
2218 ret = -1;
2219 goto end;
2220 }
2221
2222 switch (client->communication.inbound.msg_type) {
2223 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN:
2224 {
2225 /*
2226 * Receiving message header. The function will be called again
2227 * once the rest of the message as been received and can be
2228 * interpreted.
2229 */
2230 const struct lttng_notification_channel_message *msg;
2231
2232 assert(sizeof(*msg) ==
2233 client->communication.inbound.buffer.size);
2234 msg = (const struct lttng_notification_channel_message *)
2235 client->communication.inbound.buffer.data;
2236
2237 if (msg->size == 0 || msg->size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) {
2238 ERR("[notification-thread] Invalid notification channel message: length = %u", msg->size);
2239 ret = -1;
2240 goto end;
2241 }
2242
2243 switch (msg->type) {
2244 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
2245 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
2246 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
2247 break;
2248 default:
2249 ret = -1;
2250 ERR("[notification-thread] Invalid notification channel message: unexpected message type");
2251 goto end;
2252 }
2253
2254 client->communication.inbound.bytes_to_receive = msg->size;
2255 client->communication.inbound.msg_type =
2256 (enum lttng_notification_channel_message_type) msg->type;
ab0ee2ca 2257 ret = lttng_dynamic_buffer_set_size(
14fa22f8 2258 &client->communication.inbound.buffer, msg->size);
ab0ee2ca
JG
2259 if (ret) {
2260 goto end;
2261 }
2262 break;
2263 }
2264 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
2265 {
2266 struct lttng_notification_channel_command_handshake *handshake_client;
2267 struct lttng_notification_channel_command_handshake handshake_reply = {
2268 .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
2269 .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
2270 };
2271 struct lttng_notification_channel_message msg_header = {
2272 .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE,
2273 .size = sizeof(handshake_reply),
2274 };
2275 enum lttng_notification_channel_status status =
2276 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
2277 char send_buffer[sizeof(msg_header) + sizeof(handshake_reply)];
2278
2279 memcpy(send_buffer, &msg_header, sizeof(msg_header));
2280 memcpy(send_buffer + sizeof(msg_header), &handshake_reply,
2281 sizeof(handshake_reply));
2282
2283 handshake_client =
2284 (struct lttng_notification_channel_command_handshake *)
2285 client->communication.inbound.buffer.data;
47a32869 2286 client->major = handshake_client->major;
ab0ee2ca
JG
2287 client->minor = handshake_client->minor;
2288 if (!client->communication.inbound.creds_received) {
2289 ERR("[notification-thread] No credentials received from client");
2290 ret = -1;
2291 goto end;
2292 }
2293
2294 client->uid = LTTNG_SOCK_GET_UID_CRED(
2295 &client->communication.inbound.creds);
2296 client->gid = LTTNG_SOCK_GET_GID_CRED(
2297 &client->communication.inbound.creds);
2298 DBG("[notification-thread] Received handshake from client (uid = %u, gid = %u) with version %i.%i",
2299 client->uid, client->gid, (int) client->major,
2300 (int) client->minor);
2301
2302 if (handshake_client->major != LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) {
2303 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNSUPPORTED_VERSION;
2304 }
2305
2306 ret = lttng_dynamic_buffer_append(&client->communication.outbound.buffer,
2307 send_buffer, sizeof(send_buffer));
2308 if (ret) {
2309 ERR("[notification-thread] Failed to send protocol version to notification channel client");
2310 goto end;
2311 }
2312
2313 ret = client_flush_outgoing_queue(client, state);
2314 if (ret) {
2315 goto end;
2316 }
2317
2318 ret = client_send_command_reply(client, state, status);
2319 if (ret) {
2320 ERR("[notification-thread] Failed to send reply to notification channel client");
2321 goto end;
2322 }
2323
2324 /* Set reception state to receive the next message header. */
14fa22f8
JG
2325 ret = client_reset_inbound_state(client);
2326 if (ret) {
2327 ERR("[notification-thread] Failed to reset client communication's inbound state");
2328 goto end;
2329 }
ab0ee2ca
JG
2330 client->validated = true;
2331 break;
2332 }
2333 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
2334 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
2335 {
2336 struct lttng_condition *condition;
2337 enum lttng_notification_channel_status status =
2338 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
2339 const struct lttng_buffer_view condition_view =
2340 lttng_buffer_view_from_dynamic_buffer(
2341 &client->communication.inbound.buffer,
2342 0, -1);
2343 size_t expected_condition_size =
2344 client->communication.inbound.buffer.size;
2345
2346 ret = lttng_condition_create_from_buffer(&condition_view,
2347 &condition);
2348 if (ret != expected_condition_size) {
2349 ERR("[notification-thread] Malformed condition received from client");
2350 goto end;
2351 }
2352
2353 if (client->communication.inbound.msg_type ==
2354 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE) {
ab0ee2ca
JG
2355 ret = notification_thread_client_subscribe(client,
2356 condition, state, &status);
2357 } else {
2358 ret = notification_thread_client_unsubscribe(client,
2359 condition, state, &status);
2360 }
2361 if (ret) {
2362 goto end;
2363 }
2364
2365 ret = client_send_command_reply(client, state, status);
2366 if (ret) {
2367 ERR("[notification-thread] Failed to send reply to notification channel client");
2368 goto end;
2369 }
2370
2371 /* Set reception state to receive the next message header. */
14fa22f8
JG
2372 ret = client_reset_inbound_state(client);
2373 if (ret) {
2374 ERR("[notification-thread] Failed to reset client communication's inbound state");
2375 goto end;
2376 }
ab0ee2ca
JG
2377 break;
2378 }
2379 default:
2380 abort();
2381 }
2382end:
2383 return ret;
2384}
2385
2386/* Incoming data from client. */
2387int handle_notification_thread_client_in(
2388 struct notification_thread_state *state, int socket)
2389{
14fa22f8 2390 int ret = 0;
ab0ee2ca
JG
2391 struct notification_client *client;
2392 ssize_t recv_ret;
2393 size_t offset;
2394
2395 client = get_client_from_socket(socket, state);
2396 if (!client) {
2397 /* Internal error, abort. */
2398 ret = -1;
2399 goto end;
2400 }
2401
14fa22f8
JG
2402 offset = client->communication.inbound.buffer.size -
2403 client->communication.inbound.bytes_to_receive;
01ea340e 2404 if (client->communication.inbound.expect_creds) {
ab0ee2ca
JG
2405 recv_ret = lttcomm_recv_creds_unix_sock(socket,
2406 client->communication.inbound.buffer.data + offset,
2407 client->communication.inbound.bytes_to_receive,
2408 &client->communication.inbound.creds);
2409 if (recv_ret > 0) {
01ea340e 2410 client->communication.inbound.expect_creds = false;
ab0ee2ca
JG
2411 client->communication.inbound.creds_received = true;
2412 }
2413 } else {
2414 recv_ret = lttcomm_recv_unix_sock_non_block(socket,
2415 client->communication.inbound.buffer.data + offset,
2416 client->communication.inbound.bytes_to_receive);
2417 }
2418 if (recv_ret < 0) {
2419 goto error_disconnect_client;
2420 }
2421
2422 client->communication.inbound.bytes_to_receive -= recv_ret;
ab0ee2ca
JG
2423 if (client->communication.inbound.bytes_to_receive == 0) {
2424 ret = client_dispatch_message(client, state);
2425 if (ret) {
2426 /*
2427 * Only returns an error if this client must be
2428 * disconnected.
2429 */
2430 goto error_disconnect_client;
2431 }
2432 } else {
2433 goto end;
2434 }
2435end:
2436 return ret;
2437error_disconnect_client:
2438 ret = handle_notification_thread_client_disconnect(socket, state);
2439 return ret;
2440}
2441
2442/* Client ready to receive outgoing data. */
2443int handle_notification_thread_client_out(
2444 struct notification_thread_state *state, int socket)
2445{
2446 int ret;
2447 struct notification_client *client;
2448
2449 client = get_client_from_socket(socket, state);
2450 if (!client) {
2451 /* Internal error, abort. */
2452 ret = -1;
2453 goto end;
2454 }
2455
2456 ret = client_flush_outgoing_queue(client, state);
2457 if (ret) {
2458 goto end;
2459 }
2460end:
2461 return ret;
2462}
2463
2464static
e8360425
JD
2465bool evaluate_buffer_usage_condition(const struct lttng_condition *condition,
2466 const struct channel_state_sample *sample,
2467 uint64_t buffer_capacity)
ab0ee2ca
JG
2468{
2469 bool result = false;
2470 uint64_t threshold;
2471 enum lttng_condition_type condition_type;
e8360425 2472 const struct lttng_condition_buffer_usage *use_condition = container_of(
ab0ee2ca
JG
2473 condition, struct lttng_condition_buffer_usage,
2474 parent);
2475
ab0ee2ca
JG
2476 if (use_condition->threshold_bytes.set) {
2477 threshold = use_condition->threshold_bytes.value;
2478 } else {
2479 /*
2480 * Threshold was expressed as a ratio.
2481 *
2482 * TODO the threshold (in bytes) of conditions expressed
2483 * as a ratio of total buffer size could be cached to
2484 * forego this double-multiplication or it could be performed
2485 * as fixed-point math.
2486 *
2487 * Note that caching should accomodate the case where the
2488 * condition applies to multiple channels (i.e. don't assume
2489 * that all channels matching my_chann* have the same size...)
2490 */
2491 threshold = (uint64_t) (use_condition->threshold_ratio.value *
2492 (double) buffer_capacity);
2493 }
2494
2495 condition_type = lttng_condition_get_type(condition);
2496 if (condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW) {
2497 DBG("[notification-thread] Low buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
2498 threshold, sample->highest_usage);
2499
2500 /*
2501 * The low condition should only be triggered once _all_ of the
2502 * streams in a channel have gone below the "low" threshold.
2503 */
2504 if (sample->highest_usage <= threshold) {
2505 result = true;
2506 }
2507 } else {
2508 DBG("[notification-thread] High buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
2509 threshold, sample->highest_usage);
2510
2511 /*
2512 * For high buffer usage scenarios, we want to trigger whenever
2513 * _any_ of the streams has reached the "high" threshold.
2514 */
2515 if (sample->highest_usage >= threshold) {
2516 result = true;
2517 }
2518 }
e8360425 2519
ab0ee2ca
JG
2520 return result;
2521}
2522
2523static
e8360425
JD
2524bool evaluate_session_consumed_size_condition(
2525 const struct lttng_condition *condition,
2526 uint64_t session_consumed_size)
2527{
2528 uint64_t threshold;
2529 const struct lttng_condition_session_consumed_size *size_condition =
2530 container_of(condition,
2531 struct lttng_condition_session_consumed_size,
2532 parent);
2533
2534 threshold = size_condition->consumed_threshold_bytes.value;
2535 DBG("[notification-thread] Session consumed size condition being evaluated: threshold = %" PRIu64 ", current size = %" PRIu64,
2536 threshold, session_consumed_size);
2537 return session_consumed_size >= threshold;
2538}
2539
2540static
2541int evaluate_condition(const struct lttng_condition *condition,
ab0ee2ca 2542 struct lttng_evaluation **evaluation,
e8360425
JD
2543 const struct notification_thread_state *state,
2544 const struct channel_state_sample *previous_sample,
2545 const struct channel_state_sample *latest_sample,
2546 uint64_t previous_session_consumed_total,
2547 uint64_t latest_session_consumed_total,
2548 struct channel_info *channel_info)
ab0ee2ca
JG
2549{
2550 int ret = 0;
2551 enum lttng_condition_type condition_type;
e8360425
JD
2552 const bool previous_sample_available = !!previous_sample;
2553 bool previous_sample_result = false;
ab0ee2ca
JG
2554 bool latest_sample_result;
2555
2556 condition_type = lttng_condition_get_type(condition);
ab0ee2ca 2557
e8360425
JD
2558 switch (condition_type) {
2559 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
2560 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
2561 if (caa_likely(previous_sample_available)) {
2562 previous_sample_result =
2563 evaluate_buffer_usage_condition(condition,
2564 previous_sample, channel_info->capacity);
2565 }
2566 latest_sample_result = evaluate_buffer_usage_condition(
2567 condition, latest_sample,
2568 channel_info->capacity);
2569 break;
2570 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
2571 if (caa_likely(previous_sample_available)) {
2572 previous_sample_result =
2573 evaluate_session_consumed_size_condition(
2574 condition,
2575 previous_session_consumed_total);
2576 }
2577 latest_sample_result =
2578 evaluate_session_consumed_size_condition(
2579 condition,
2580 latest_session_consumed_total);
2581 break;
2582 default:
2583 /* Unknown condition type; internal error. */
2584 abort();
2585 }
ab0ee2ca
JG
2586
2587 if (!latest_sample_result ||
2588 (previous_sample_result == latest_sample_result)) {
2589 /*
2590 * Only trigger on a condition evaluation transition.
2591 *
2592 * NOTE: This edge-triggered logic may not be appropriate for
2593 * future condition types.
2594 */
2595 goto end;
2596 }
2597
e8360425
JD
2598 if (!evaluation || !latest_sample_result) {
2599 goto end;
2600 }
2601
2602 switch (condition_type) {
2603 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
2604 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
ab0ee2ca
JG
2605 *evaluation = lttng_evaluation_buffer_usage_create(
2606 condition_type,
2607 latest_sample->highest_usage,
e8360425
JD
2608 channel_info->capacity);
2609 break;
2610 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
2611 *evaluation = lttng_evaluation_session_consumed_size_create(
e8360425
JD
2612 latest_session_consumed_total);
2613 break;
2614 default:
2615 abort();
2616 }
2617
2618 if (!*evaluation) {
2619 ret = -1;
2620 goto end;
ab0ee2ca
JG
2621 }
2622end:
2623 return ret;
2624}
2625
2626static
2627int client_enqueue_dropped_notification(struct notification_client *client,
2628 struct notification_thread_state *state)
2629{
2630 int ret;
2631 struct lttng_notification_channel_message msg = {
2632 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED,
2633 .size = 0,
2634 };
2635
2636 ret = lttng_dynamic_buffer_append(
2637 &client->communication.outbound.buffer, &msg,
2638 sizeof(msg));
2639 return ret;
2640}
2641
2642static
9b63a4aa
JG
2643int send_evaluation_to_clients(const struct lttng_trigger *trigger,
2644 const struct lttng_evaluation *evaluation,
ab0ee2ca
JG
2645 struct notification_client_list* client_list,
2646 struct notification_thread_state *state,
2647 uid_t channel_uid, gid_t channel_gid)
2648{
2649 int ret = 0;
2650 struct lttng_dynamic_buffer msg_buffer;
2651 struct notification_client_list_element *client_list_element, *tmp;
9b63a4aa
JG
2652 const struct lttng_notification notification = {
2653 .condition = (struct lttng_condition *) lttng_trigger_get_const_condition(trigger),
2654 .evaluation = (struct lttng_evaluation *) evaluation,
2655 };
3647288f
JG
2656 struct lttng_notification_channel_message msg_header = {
2657 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION,
2658 };
ab0ee2ca
JG
2659
2660 lttng_dynamic_buffer_init(&msg_buffer);
2661
3647288f
JG
2662 ret = lttng_dynamic_buffer_append(&msg_buffer, &msg_header,
2663 sizeof(msg_header));
ab0ee2ca
JG
2664 if (ret) {
2665 goto end;
2666 }
2667
9b63a4aa 2668 ret = lttng_notification_serialize(&notification, &msg_buffer);
ab0ee2ca 2669 if (ret) {
ab0ee2ca
JG
2670 ERR("[notification-thread] Failed to serialize notification");
2671 ret = -1;
2672 goto end;
2673 }
2674
3647288f
JG
2675 /* Update payload size. */
2676 ((struct lttng_notification_channel_message * ) msg_buffer.data)->size =
2677 (uint32_t) (msg_buffer.size - sizeof(msg_header));
2678
ab0ee2ca
JG
2679 cds_list_for_each_entry_safe(client_list_element, tmp,
2680 &client_list->list, node) {
2681 struct notification_client *client =
2682 client_list_element->client;
2683
2684 if (client->uid != channel_uid && client->gid != channel_gid &&
2685 client->uid != 0) {
2686 /* Client is not allowed to monitor this channel. */
2687 DBG("[notification-thread] Skipping client at it does not have the permission to receive notification for this channel");
2688 continue;
2689 }
2690
2691 DBG("[notification-thread] Sending notification to client (fd = %i, %zu bytes)",
2692 client->socket, msg_buffer.size);
2693 if (client->communication.outbound.buffer.size) {
2694 /*
2695 * Outgoing data is already buffered for this client;
2696 * drop the notification and enqueue a "dropped
2697 * notification" message if this is the first dropped
2698 * notification since the socket spilled-over to the
2699 * queue.
2700 */
2701 DBG("[notification-thread] Dropping notification addressed to client (socket fd = %i)",
2702 client->socket);
2703 if (!client->communication.outbound.dropped_notification) {
2704 client->communication.outbound.dropped_notification = true;
2705 ret = client_enqueue_dropped_notification(
2706 client, state);
2707 if (ret) {
2708 goto end;
2709 }
2710 }
2711 continue;
2712 }
2713
2714 ret = lttng_dynamic_buffer_append_buffer(
2715 &client->communication.outbound.buffer,
2716 &msg_buffer);
2717 if (ret) {
2718 goto end;
2719 }
2720
2721 ret = client_flush_outgoing_queue(client, state);
2722 if (ret) {
2723 goto end;
2724 }
2725 }
2726 ret = 0;
2727end:
ab0ee2ca
JG
2728 lttng_dynamic_buffer_reset(&msg_buffer);
2729 return ret;
2730}
2731
2732int handle_notification_thread_channel_sample(
2733 struct notification_thread_state *state, int pipe,
2734 enum lttng_domain_type domain)
2735{
2736 int ret = 0;
2737 struct lttcomm_consumer_channel_monitor_msg sample_msg;
ab0ee2ca
JG
2738 struct channel_info *channel_info;
2739 struct cds_lfht_node *node;
2740 struct cds_lfht_iter iter;
2741 struct lttng_channel_trigger_list *trigger_list;
2742 struct lttng_trigger_list_element *trigger_list_element;
2743 bool previous_sample_available = false;
e8360425
JD
2744 struct channel_state_sample previous_sample, latest_sample;
2745 uint64_t previous_session_consumed_total, latest_session_consumed_total;
ab0ee2ca
JG
2746
2747 /*
2748 * The monitoring pipe only holds messages smaller than PIPE_BUF,
2749 * ensuring that read/write of sampling messages are atomic.
2750 */
7c2551ef 2751 ret = lttng_read(pipe, &sample_msg, sizeof(sample_msg));
ab0ee2ca
JG
2752 if (ret != sizeof(sample_msg)) {
2753 ERR("[notification-thread] Failed to read from monitoring pipe (fd = %i)",
2754 pipe);
2755 ret = -1;
2756 goto end;
2757 }
2758
2759 ret = 0;
2760 latest_sample.key.key = sample_msg.key;
2761 latest_sample.key.domain = domain;
2762 latest_sample.highest_usage = sample_msg.highest;
2763 latest_sample.lowest_usage = sample_msg.lowest;
e8360425 2764 latest_sample.channel_total_consumed = sample_msg.total_consumed;
ab0ee2ca
JG
2765
2766 rcu_read_lock();
2767
2768 /* Retrieve the channel's informations */
2769 cds_lfht_lookup(state->channels_ht,
2770 hash_channel_key(&latest_sample.key),
2771 match_channel_info,
2772 &latest_sample.key,
2773 &iter);
2774 node = cds_lfht_iter_get_node(&iter);
e0b5f87b 2775 if (caa_unlikely(!node)) {
ab0ee2ca
JG
2776 /*
2777 * Not an error since the consumer can push a sample to the pipe
2778 * and the rest of the session daemon could notify us of the
2779 * channel's destruction before we get a chance to process that
2780 * sample.
2781 */
2782 DBG("[notification-thread] Received a sample for an unknown channel from consumerd, key = %" PRIu64 " in %s domain",
2783 latest_sample.key.key,
2784 domain == LTTNG_DOMAIN_KERNEL ? "kernel" :
2785 "user space");
2786 goto end_unlock;
2787 }
2788 channel_info = caa_container_of(node, struct channel_info,
2789 channels_ht_node);
e8360425 2790 DBG("[notification-thread] Handling channel sample for channel %s (key = %" PRIu64 ") in session %s (highest usage = %" PRIu64 ", lowest usage = %" PRIu64", total consumed = %" PRIu64")",
8abe313a 2791 channel_info->name,
ab0ee2ca 2792 latest_sample.key.key,
8abe313a 2793 channel_info->session_info->name,
ab0ee2ca 2794 latest_sample.highest_usage,
e8360425
JD
2795 latest_sample.lowest_usage,
2796 latest_sample.channel_total_consumed);
2797
2798 previous_session_consumed_total =
2799 channel_info->session_info->consumed_data_size;
ab0ee2ca
JG
2800
2801 /* Retrieve the channel's last sample, if it exists, and update it. */
2802 cds_lfht_lookup(state->channel_state_ht,
2803 hash_channel_key(&latest_sample.key),
2804 match_channel_state_sample,
2805 &latest_sample.key,
2806 &iter);
2807 node = cds_lfht_iter_get_node(&iter);
e0b5f87b 2808 if (caa_likely(node)) {
ab0ee2ca
JG
2809 struct channel_state_sample *stored_sample;
2810
2811 /* Update the sample stored. */
2812 stored_sample = caa_container_of(node,
2813 struct channel_state_sample,
2814 channel_state_ht_node);
e8360425 2815
ab0ee2ca
JG
2816 memcpy(&previous_sample, stored_sample,
2817 sizeof(previous_sample));
2818 stored_sample->highest_usage = latest_sample.highest_usage;
2819 stored_sample->lowest_usage = latest_sample.lowest_usage;
0f0479d7 2820 stored_sample->channel_total_consumed = latest_sample.channel_total_consumed;
ab0ee2ca 2821 previous_sample_available = true;
e8360425
JD
2822
2823 latest_session_consumed_total =
2824 previous_session_consumed_total +
2825 (latest_sample.channel_total_consumed - previous_sample.channel_total_consumed);
ab0ee2ca
JG
2826 } else {
2827 /*
2828 * This is the channel's first sample, allocate space for and
2829 * store the new sample.
2830 */
2831 struct channel_state_sample *stored_sample;
2832
2833 stored_sample = zmalloc(sizeof(*stored_sample));
2834 if (!stored_sample) {
2835 ret = -1;
2836 goto end_unlock;
2837 }
2838
2839 memcpy(stored_sample, &latest_sample, sizeof(*stored_sample));
2840 cds_lfht_node_init(&stored_sample->channel_state_ht_node);
2841 cds_lfht_add(state->channel_state_ht,
2842 hash_channel_key(&stored_sample->key),
2843 &stored_sample->channel_state_ht_node);
e8360425
JD
2844
2845 latest_session_consumed_total =
2846 previous_session_consumed_total +
2847 latest_sample.channel_total_consumed;
ab0ee2ca
JG
2848 }
2849
e8360425
JD
2850 channel_info->session_info->consumed_data_size =
2851 latest_session_consumed_total;
2852
ab0ee2ca
JG
2853 /* Find triggers associated with this channel. */
2854 cds_lfht_lookup(state->channel_triggers_ht,
2855 hash_channel_key(&latest_sample.key),
2856 match_channel_trigger_list,
2857 &latest_sample.key,
2858 &iter);
2859 node = cds_lfht_iter_get_node(&iter);
e0b5f87b 2860 if (caa_likely(!node)) {
ab0ee2ca
JG
2861 goto end_unlock;
2862 }
2863
2864 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
2865 channel_triggers_ht_node);
2866 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
2867 node) {
9b63a4aa
JG
2868 const struct lttng_condition *condition;
2869 const struct lttng_action *action;
2870 const struct lttng_trigger *trigger;
ab0ee2ca
JG
2871 struct notification_client_list *client_list;
2872 struct lttng_evaluation *evaluation = NULL;
2873
2874 trigger = trigger_list_element->trigger;
9b63a4aa 2875 condition = lttng_trigger_get_const_condition(trigger);
ab0ee2ca 2876 assert(condition);
9b63a4aa 2877 action = lttng_trigger_get_const_action(trigger);
ab0ee2ca
JG
2878
2879 /* Notify actions are the only type currently supported. */
9b63a4aa 2880 assert(lttng_action_get_type_const(action) ==
ab0ee2ca
JG
2881 LTTNG_ACTION_TYPE_NOTIFY);
2882
2883 /*
2884 * Check if any client is subscribed to the result of this
2885 * evaluation.
2886 */
2887 cds_lfht_lookup(state->notification_trigger_clients_ht,
2888 lttng_condition_hash(condition),
2889 match_client_list,
2890 trigger,
2891 &iter);
2892 node = cds_lfht_iter_get_node(&iter);
2893 assert(node);
2894
2895 client_list = caa_container_of(node,
2896 struct notification_client_list,
2897 notification_trigger_ht_node);
2898 if (cds_list_empty(&client_list->list)) {
2899 /*
2900 * No clients interested in the evaluation's result,
2901 * skip it.
2902 */
2903 continue;
2904 }
2905
2906 ret = evaluate_condition(condition, &evaluation, state,
2907 previous_sample_available ? &previous_sample : NULL,
e8360425
JD
2908 &latest_sample,
2909 previous_session_consumed_total,
2910 latest_session_consumed_total,
2911 channel_info);
2912 if (caa_unlikely(ret)) {
ab0ee2ca
JG
2913 goto end_unlock;
2914 }
2915
e8360425 2916 if (caa_likely(!evaluation)) {
ab0ee2ca
JG
2917 continue;
2918 }
2919
2920 /* Dispatch evaluation result to all clients. */
2921 ret = send_evaluation_to_clients(trigger_list_element->trigger,
2922 evaluation, client_list, state,
8abe313a
JG
2923 channel_info->session_info->uid,
2924 channel_info->session_info->gid);
2925 lttng_evaluation_destroy(evaluation);
e0b5f87b 2926 if (caa_unlikely(ret)) {
ab0ee2ca
JG
2927 goto end_unlock;
2928 }
2929 }
2930end_unlock:
2931 rcu_read_unlock();
2932end:
2933 return ret;
2934}
This page took 0.262662 seconds and 4 git commands to generate.