Build a list of triggers applying to a given session on creation
[lttng-tools.git] / src / bin / lttng-sessiond / notification-thread-events.c
CommitLineData
ab0ee2ca
JG
1/*
2 * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18#define _LGPL_SOURCE
19#include <urcu.h>
20#include <urcu/rculfhash.h>
21
ab0ee2ca
JG
22#include <common/defaults.h>
23#include <common/error.h>
24#include <common/futex.h>
25#include <common/unix.h>
26#include <common/dynamic-buffer.h>
27#include <common/hashtable/utils.h>
28#include <common/sessiond-comm/sessiond-comm.h>
29#include <common/macros.h>
30#include <lttng/condition/condition.h>
9b63a4aa 31#include <lttng/action/action-internal.h>
ab0ee2ca
JG
32#include <lttng/notification/notification-internal.h>
33#include <lttng/condition/condition-internal.h>
34#include <lttng/condition/buffer-usage-internal.h>
e8360425 35#include <lttng/condition/session-consumed-size-internal.h>
ea9a44f0 36#include <lttng/condition/session-rotation-internal.h>
ab0ee2ca 37#include <lttng/notification/channel-internal.h>
1da26331 38
ab0ee2ca
JG
39#include <time.h>
40#include <unistd.h>
41#include <assert.h>
42#include <inttypes.h>
d53ea4e4 43#include <fcntl.h>
ab0ee2ca 44
1da26331
JG
45#include "notification-thread.h"
46#include "notification-thread-events.h"
47#include "notification-thread-commands.h"
48#include "lttng-sessiond.h"
49#include "kernel.h"
50
ab0ee2ca
JG
51#define CLIENT_POLL_MASK_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
52#define CLIENT_POLL_MASK_IN_OUT (CLIENT_POLL_MASK_IN | LPOLLOUT)
53
54struct lttng_trigger_list_element {
9e0bb80e
JG
55 /* No ownership of the trigger object is assumed. */
56 const struct lttng_trigger *trigger;
ab0ee2ca
JG
57 struct cds_list_head node;
58};
59
60struct lttng_channel_trigger_list {
61 struct channel_key channel_key;
ea9a44f0 62 /* List of struct lttng_trigger_list_element. */
ab0ee2ca 63 struct cds_list_head list;
ea9a44f0 64 /* Node in the channel_triggers_ht */
ab0ee2ca
JG
65 struct cds_lfht_node channel_triggers_ht_node;
66};
67
ea9a44f0
JG
68/*
69 * List of triggers applying to a given session.
70 *
71 * See:
72 * - lttng_session_trigger_list_create()
73 * - lttng_session_trigger_list_build()
74 * - lttng_session_trigger_list_destroy()
75 * - lttng_session_trigger_list_add()
76 */
77struct lttng_session_trigger_list {
78 /*
79 * Not owned by this; points to the session_info structure's
80 * session name.
81 */
82 const char *session_name;
83 /* List of struct lttng_trigger_list_element. */
84 struct cds_list_head list;
85 /* Node in the session_triggers_ht */
86 struct cds_lfht_node session_triggers_ht_node;
87 /*
88 * Weak reference to the notification system's session triggers
89 * hashtable.
90 *
91 * The session trigger list structure structure is owned by
92 * the session's session_info.
93 *
94 * The session_info is kept alive the the channel_infos holding a
95 * reference to it (reference counting). When those channels are
96 * destroyed (at runtime or on teardown), the reference they hold
97 * to the session_info are released. On destruction of session_info,
98 * session_info_destroy() will remove the list of triggers applying
99 * to this session from the notification system's state.
100 *
101 * This implies that the session_triggers_ht must be destroyed
102 * after the channels.
103 */
104 struct cds_lfht *session_triggers_ht;
105 /* Used for delayed RCU reclaim. */
106 struct rcu_head rcu_node;
107};
108
ab0ee2ca
JG
109struct lttng_trigger_ht_element {
110 struct lttng_trigger *trigger;
111 struct cds_lfht_node node;
112};
113
114struct lttng_condition_list_element {
115 struct lttng_condition *condition;
116 struct cds_list_head node;
117};
118
119struct notification_client_list_element {
120 struct notification_client *client;
121 struct cds_list_head node;
122};
123
124struct notification_client_list {
125 struct lttng_trigger *trigger;
126 struct cds_list_head list;
127 struct cds_lfht_node notification_trigger_ht_node;
128};
129
130struct notification_client {
131 int socket;
132 /* Client protocol version. */
133 uint8_t major, minor;
134 uid_t uid;
135 gid_t gid;
136 /*
5332364f 137 * Indicates if the credentials and versions of the client have been
ab0ee2ca
JG
138 * checked.
139 */
140 bool validated;
141 /*
142 * Conditions to which the client's notification channel is subscribed.
143 * List of struct lttng_condition_list_node. The condition member is
144 * owned by the client.
145 */
146 struct cds_list_head condition_list;
147 struct cds_lfht_node client_socket_ht_node;
148 struct {
149 struct {
14fa22f8
JG
150 /*
151 * During the reception of a message, the reception
152 * buffers' "size" is set to contain the current
153 * message's complete payload.
154 */
ab0ee2ca
JG
155 struct lttng_dynamic_buffer buffer;
156 /* Bytes left to receive for the current message. */
157 size_t bytes_to_receive;
158 /* Type of the message being received. */
159 enum lttng_notification_channel_message_type msg_type;
160 /*
161 * Indicates whether or not credentials are expected
162 * from the client.
163 */
01ea340e 164 bool expect_creds;
ab0ee2ca
JG
165 /*
166 * Indicates whether or not credentials were received
167 * from the client.
168 */
169 bool creds_received;
14fa22f8 170 /* Only used during credentials reception. */
ab0ee2ca
JG
171 lttng_sock_cred creds;
172 } inbound;
173 struct {
174 /*
175 * Indicates whether or not a notification addressed to
176 * this client was dropped because a command reply was
177 * already buffered.
178 *
179 * A notification is dropped whenever the buffer is not
180 * empty.
181 */
182 bool dropped_notification;
183 /*
184 * Indicates whether or not a command reply is already
185 * buffered. In this case, it means that the client is
186 * not consuming command replies before emitting a new
187 * one. This could be caused by a protocol error or a
188 * misbehaving/malicious client.
189 */
190 bool queued_command_reply;
191 struct lttng_dynamic_buffer buffer;
192 } outbound;
193 } communication;
194};
195
196struct channel_state_sample {
197 struct channel_key key;
198 struct cds_lfht_node channel_state_ht_node;
199 uint64_t highest_usage;
200 uint64_t lowest_usage;
e8360425 201 uint64_t channel_total_consumed;
ab0ee2ca
JG
202};
203
e4db5ace 204static unsigned long hash_channel_key(struct channel_key *key);
e8360425 205static int evaluate_condition(const struct lttng_condition *condition,
e4db5ace 206 struct lttng_evaluation **evaluation,
e8360425
JD
207 const struct notification_thread_state *state,
208 const struct channel_state_sample *previous_sample,
209 const struct channel_state_sample *latest_sample,
210 uint64_t previous_session_consumed_total,
211 uint64_t latest_session_consumed_total,
212 struct channel_info *channel_info);
e4db5ace 213static
9b63a4aa
JG
214int send_evaluation_to_clients(const struct lttng_trigger *trigger,
215 const struct lttng_evaluation *evaluation,
e4db5ace
JR
216 struct notification_client_list *client_list,
217 struct notification_thread_state *state,
218 uid_t channel_uid, gid_t channel_gid);
219
8abe313a 220
ea9a44f0 221/* session_info API */
8abe313a
JG
222static
223void session_info_destroy(void *_data);
224static
225void session_info_get(struct session_info *session_info);
226static
227void session_info_put(struct session_info *session_info);
228static
229struct session_info *session_info_create(const char *name,
ea9a44f0
JG
230 uid_t uid, gid_t gid,
231 struct lttng_session_trigger_list *trigger_list);
8abe313a
JG
232static
233void session_info_add_channel(struct session_info *session_info,
234 struct channel_info *channel_info);
235static
236void session_info_remove_channel(struct session_info *session_info,
237 struct channel_info *channel_info);
238
ea9a44f0
JG
239/* lttng_session_trigger_list API */
240static
241struct lttng_session_trigger_list *lttng_session_trigger_list_create(
242 const char *session_name,
243 struct cds_lfht *session_triggers_ht);
244static
245struct lttng_session_trigger_list *lttng_session_trigger_list_build(
246 const struct notification_thread_state *state,
247 const char *session_name);
248static
249void lttng_session_trigger_list_destroy(
250 struct lttng_session_trigger_list *list);
251static
252int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
253 const struct lttng_trigger *trigger);
254
255
ab0ee2ca
JG
256static
257int match_client(struct cds_lfht_node *node, const void *key)
258{
259 /* This double-cast is intended to supress pointer-to-cast warning. */
260 int socket = (int) (intptr_t) key;
261 struct notification_client *client;
262
263 client = caa_container_of(node, struct notification_client,
264 client_socket_ht_node);
265
266 return !!(client->socket == socket);
267}
268
269static
270int match_channel_trigger_list(struct cds_lfht_node *node, const void *key)
271{
272 struct channel_key *channel_key = (struct channel_key *) key;
273 struct lttng_channel_trigger_list *trigger_list;
274
275 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
276 channel_triggers_ht_node);
277
278 return !!((channel_key->key == trigger_list->channel_key.key) &&
279 (channel_key->domain == trigger_list->channel_key.domain));
280}
281
282static
283int match_channel_state_sample(struct cds_lfht_node *node, const void *key)
284{
285 struct channel_key *channel_key = (struct channel_key *) key;
286 struct channel_state_sample *sample;
287
288 sample = caa_container_of(node, struct channel_state_sample,
289 channel_state_ht_node);
290
291 return !!((channel_key->key == sample->key.key) &&
292 (channel_key->domain == sample->key.domain));
293}
294
295static
296int match_channel_info(struct cds_lfht_node *node, const void *key)
297{
298 struct channel_key *channel_key = (struct channel_key *) key;
299 struct channel_info *channel_info;
300
301 channel_info = caa_container_of(node, struct channel_info,
302 channels_ht_node);
303
304 return !!((channel_key->key == channel_info->key.key) &&
305 (channel_key->domain == channel_info->key.domain));
306}
307
308static
309int match_condition(struct cds_lfht_node *node, const void *key)
310{
311 struct lttng_condition *condition_key = (struct lttng_condition *) key;
312 struct lttng_trigger_ht_element *trigger;
313 struct lttng_condition *condition;
314
315 trigger = caa_container_of(node, struct lttng_trigger_ht_element,
316 node);
317 condition = lttng_trigger_get_condition(trigger->trigger);
318 assert(condition);
319
320 return !!lttng_condition_is_equal(condition_key, condition);
321}
322
323static
324int match_client_list(struct cds_lfht_node *node, const void *key)
325{
326 struct lttng_trigger *trigger_key = (struct lttng_trigger *) key;
327 struct notification_client_list *client_list;
328 struct lttng_condition *condition;
329 struct lttng_condition *condition_key = lttng_trigger_get_condition(
330 trigger_key);
331
332 assert(condition_key);
333
334 client_list = caa_container_of(node, struct notification_client_list,
335 notification_trigger_ht_node);
336 condition = lttng_trigger_get_condition(client_list->trigger);
337
338 return !!lttng_condition_is_equal(condition_key, condition);
339}
340
341static
342int match_client_list_condition(struct cds_lfht_node *node, const void *key)
343{
344 struct lttng_condition *condition_key = (struct lttng_condition *) key;
345 struct notification_client_list *client_list;
346 struct lttng_condition *condition;
347
348 assert(condition_key);
349
350 client_list = caa_container_of(node, struct notification_client_list,
351 notification_trigger_ht_node);
352 condition = lttng_trigger_get_condition(client_list->trigger);
353
354 return !!lttng_condition_is_equal(condition_key, condition);
355}
356
357static
358unsigned long lttng_condition_buffer_usage_hash(
9b63a4aa 359 const struct lttng_condition *_condition)
ab0ee2ca 360{
9a2746aa
JG
361 unsigned long hash;
362 unsigned long condition_type;
ab0ee2ca
JG
363 struct lttng_condition_buffer_usage *condition;
364
365 condition = container_of(_condition,
366 struct lttng_condition_buffer_usage, parent);
367
9a2746aa
JG
368 condition_type = (unsigned long) condition->parent.type;
369 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
ab0ee2ca
JG
370 if (condition->session_name) {
371 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
372 }
373 if (condition->channel_name) {
8f56701f 374 hash ^= hash_key_str(condition->channel_name, lttng_ht_seed);
ab0ee2ca
JG
375 }
376 if (condition->domain.set) {
377 hash ^= hash_key_ulong(
378 (void *) condition->domain.type,
379 lttng_ht_seed);
380 }
381 if (condition->threshold_ratio.set) {
382 uint64_t val;
383
384 val = condition->threshold_ratio.value * (double) UINT32_MAX;
385 hash ^= hash_key_u64(&val, lttng_ht_seed);
6633b0dd 386 } else if (condition->threshold_bytes.set) {
ab0ee2ca
JG
387 uint64_t val;
388
389 val = condition->threshold_bytes.value;
390 hash ^= hash_key_u64(&val, lttng_ht_seed);
391 }
392 return hash;
393}
394
e8360425
JD
395static
396unsigned long lttng_condition_session_consumed_size_hash(
9b63a4aa 397 const struct lttng_condition *_condition)
e8360425 398{
9a2746aa
JG
399 unsigned long hash;
400 unsigned long condition_type =
401 (unsigned long) LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE;
e8360425
JD
402 struct lttng_condition_session_consumed_size *condition;
403 uint64_t val;
404
405 condition = container_of(_condition,
406 struct lttng_condition_session_consumed_size, parent);
407
9a2746aa 408 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
e8360425
JD
409 if (condition->session_name) {
410 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
411 }
412 val = condition->consumed_threshold_bytes.value;
413 hash ^= hash_key_u64(&val, lttng_ht_seed);
414 return hash;
415}
416
9a2746aa 417
ab0ee2ca
JG
418/*
419 * The lttng_condition hashing code is kept in this file (rather than
420 * condition.c) since it makes use of GPLv2 code (hashtable utils), which we
421 * don't want to link in liblttng-ctl.
422 */
423static
9b63a4aa 424unsigned long lttng_condition_hash(const struct lttng_condition *condition)
ab0ee2ca
JG
425{
426 switch (condition->type) {
427 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
428 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
429 return lttng_condition_buffer_usage_hash(condition);
e8360425
JD
430 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
431 return lttng_condition_session_consumed_size_hash(condition);
ab0ee2ca
JG
432 default:
433 ERR("[notification-thread] Unexpected condition type caught");
434 abort();
435 }
436}
437
e4db5ace
JR
438static
439unsigned long hash_channel_key(struct channel_key *key)
440{
441 unsigned long key_hash = hash_key_u64(&key->key, lttng_ht_seed);
442 unsigned long domain_hash = hash_key_ulong(
443 (void *) (unsigned long) key->domain, lttng_ht_seed);
444
445 return key_hash ^ domain_hash;
446}
447
ab0ee2ca
JG
448static
449void channel_info_destroy(struct channel_info *channel_info)
450{
451 if (!channel_info) {
452 return;
453 }
454
8abe313a
JG
455 if (channel_info->session_info) {
456 session_info_remove_channel(channel_info->session_info,
457 channel_info);
458 session_info_put(channel_info->session_info);
ab0ee2ca 459 }
8abe313a
JG
460 if (channel_info->name) {
461 free(channel_info->name);
ab0ee2ca
JG
462 }
463 free(channel_info);
464}
465
8abe313a 466/* Don't call directly, use the ref-counting mechanism. */
ab0ee2ca 467static
8abe313a 468void session_info_destroy(void *_data)
ab0ee2ca 469{
8abe313a 470 struct session_info *session_info = _data;
3b68d0a3 471 int ret;
ab0ee2ca 472
8abe313a
JG
473 assert(session_info);
474 if (session_info->channel_infos_ht) {
3b68d0a3
JR
475 ret = cds_lfht_destroy(session_info->channel_infos_ht, NULL);
476 if (ret) {
4c3f302b 477 ERR("[notification-thread] Failed to destroy channel information hash table");
3b68d0a3 478 }
8abe313a 479 }
ea9a44f0 480 lttng_session_trigger_list_destroy(session_info->trigger_list);
8abe313a
JG
481 free(session_info->name);
482 free(session_info);
483}
ab0ee2ca 484
8abe313a
JG
485static
486void session_info_get(struct session_info *session_info)
487{
488 if (!session_info) {
489 return;
490 }
491 lttng_ref_get(&session_info->ref);
492}
493
494static
495void session_info_put(struct session_info *session_info)
496{
497 if (!session_info) {
498 return;
ab0ee2ca 499 }
8abe313a
JG
500 lttng_ref_put(&session_info->ref);
501}
502
503static
ea9a44f0
JG
504struct session_info *session_info_create(const char *name, uid_t uid, gid_t gid,
505 struct lttng_session_trigger_list *trigger_list)
8abe313a
JG
506{
507 struct session_info *session_info;
ab0ee2ca 508
8abe313a 509 assert(name);
ab0ee2ca 510
8abe313a
JG
511 session_info = zmalloc(sizeof(*session_info));
512 if (!session_info) {
513 goto end;
514 }
515 lttng_ref_init(&session_info->ref, session_info_destroy);
516
517 session_info->channel_infos_ht = cds_lfht_new(DEFAULT_HT_SIZE,
518 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
519 if (!session_info->channel_infos_ht) {
ab0ee2ca
JG
520 goto error;
521 }
8abe313a
JG
522
523 cds_lfht_node_init(&session_info->sessions_ht_node);
524 session_info->name = strdup(name);
525 if (!session_info->name) {
ab0ee2ca
JG
526 goto error;
527 }
8abe313a
JG
528 session_info->uid = uid;
529 session_info->gid = gid;
ea9a44f0 530 session_info->trigger_list = trigger_list;
8abe313a
JG
531end:
532 return session_info;
533error:
534 session_info_put(session_info);
535 return NULL;
536}
537
538static
539void session_info_add_channel(struct session_info *session_info,
540 struct channel_info *channel_info)
541{
542 rcu_read_lock();
543 cds_lfht_add(session_info->channel_infos_ht,
544 hash_channel_key(&channel_info->key),
545 &channel_info->session_info_channels_ht_node);
546 rcu_read_unlock();
547}
548
549static
550void session_info_remove_channel(struct session_info *session_info,
551 struct channel_info *channel_info)
552{
553 rcu_read_lock();
554 cds_lfht_del(session_info->channel_infos_ht,
555 &channel_info->session_info_channels_ht_node);
556 rcu_read_unlock();
557}
558
559static
560struct channel_info *channel_info_create(const char *channel_name,
561 struct channel_key *channel_key, uint64_t channel_capacity,
562 struct session_info *session_info)
563{
564 struct channel_info *channel_info = zmalloc(sizeof(*channel_info));
565
566 if (!channel_info) {
567 goto end;
568 }
569
ab0ee2ca 570 cds_lfht_node_init(&channel_info->channels_ht_node);
8abe313a
JG
571 cds_lfht_node_init(&channel_info->session_info_channels_ht_node);
572 memcpy(&channel_info->key, channel_key, sizeof(*channel_key));
573 channel_info->capacity = channel_capacity;
574
575 channel_info->name = strdup(channel_name);
576 if (!channel_info->name) {
577 goto error;
578 }
579
580 /*
581 * Set the references between session and channel infos:
582 * - channel_info holds a strong reference to session_info
583 * - session_info holds a weak reference to channel_info
584 */
585 session_info_get(session_info);
586 session_info_add_channel(session_info, channel_info);
587 channel_info->session_info = session_info;
ab0ee2ca 588end:
8abe313a 589 return channel_info;
ab0ee2ca 590error:
8abe313a 591 channel_info_destroy(channel_info);
ab0ee2ca
JG
592 return NULL;
593}
594
e4db5ace
JR
595/* This function must be called with the RCU read lock held. */
596static
597int evaluate_condition_for_client(struct lttng_trigger *trigger,
598 struct lttng_condition *condition,
599 struct notification_client *client,
600 struct notification_thread_state *state)
601{
602 int ret;
603 struct cds_lfht_iter iter;
604 struct cds_lfht_node *node;
605 struct channel_info *channel_info = NULL;
606 struct channel_key *channel_key = NULL;
607 struct channel_state_sample *last_sample = NULL;
608 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
609 struct lttng_evaluation *evaluation = NULL;
610 struct notification_client_list client_list = { 0 };
611 struct notification_client_list_element client_list_element = { 0 };
612
613 assert(trigger);
614 assert(condition);
615 assert(client);
616 assert(state);
617
618 /* Find the channel associated with the trigger. */
619 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter,
620 channel_trigger_list , channel_triggers_ht_node) {
621 struct lttng_trigger_list_element *element;
622
623 cds_list_for_each_entry(element, &channel_trigger_list->list, node) {
9b63a4aa
JG
624 const struct lttng_condition *current_condition =
625 lttng_trigger_get_const_condition(
e4db5ace
JR
626 element->trigger);
627
628 assert(current_condition);
629 if (!lttng_condition_is_equal(condition,
2ae99f0b 630 current_condition)) {
e4db5ace
JR
631 continue;
632 }
633
634 /* Found the trigger, save the channel key. */
635 channel_key = &channel_trigger_list->channel_key;
636 break;
637 }
638 if (channel_key) {
639 /* The channel key was found stop iteration. */
640 break;
641 }
642 }
643
644 if (!channel_key){
645 /* No channel found; normal exit. */
646 DBG("[notification-thread] No channel associated with newly subscribed-to condition");
647 ret = 0;
648 goto end;
649 }
650
651 /* Fetch channel info for the matching channel. */
652 cds_lfht_lookup(state->channels_ht,
653 hash_channel_key(channel_key),
654 match_channel_info,
655 channel_key,
656 &iter);
657 node = cds_lfht_iter_get_node(&iter);
658 assert(node);
659 channel_info = caa_container_of(node, struct channel_info,
660 channels_ht_node);
661
662 /* Retrieve the channel's last sample, if it exists. */
663 cds_lfht_lookup(state->channel_state_ht,
664 hash_channel_key(channel_key),
665 match_channel_state_sample,
666 channel_key,
667 &iter);
668 node = cds_lfht_iter_get_node(&iter);
669 if (node) {
670 last_sample = caa_container_of(node,
671 struct channel_state_sample,
672 channel_state_ht_node);
673 } else {
674 /* Nothing to evaluate, no sample was ever taken. Normal exit */
675 DBG("[notification-thread] No channel sample associated with newly subscribed-to condition");
676 ret = 0;
677 goto end;
678 }
679
e8360425
JD
680 ret = evaluate_condition(condition, &evaluation, state,
681 NULL, last_sample,
682 0, channel_info->session_info->consumed_data_size,
683 channel_info);
e4db5ace 684 if (ret) {
066a3c82 685 WARN("[notification-thread] Fatal error occurred while evaluating a newly subscribed-to condition");
e4db5ace
JR
686 goto end;
687 }
688
689 if (!evaluation) {
690 /* Evaluation yielded nothing. Normal exit. */
691 DBG("[notification-thread] Newly subscribed-to condition evaluated to false, nothing to report to client");
692 ret = 0;
693 goto end;
694 }
695
696 /*
697 * Create a temporary client list with the client currently
698 * subscribing.
699 */
700 cds_lfht_node_init(&client_list.notification_trigger_ht_node);
701 CDS_INIT_LIST_HEAD(&client_list.list);
702 client_list.trigger = trigger;
703
704 CDS_INIT_LIST_HEAD(&client_list_element.node);
705 client_list_element.client = client;
706 cds_list_add(&client_list_element.node, &client_list.list);
707
708 /* Send evaluation result to the newly-subscribed client. */
709 DBG("[notification-thread] Newly subscribed-to condition evaluated to true, notifying client");
710 ret = send_evaluation_to_clients(trigger, evaluation, &client_list,
8abe313a
JG
711 state, channel_info->session_info->uid,
712 channel_info->session_info->gid);
e4db5ace
JR
713
714end:
715 return ret;
716}
717
ab0ee2ca
JG
718static
719int notification_thread_client_subscribe(struct notification_client *client,
720 struct lttng_condition *condition,
721 struct notification_thread_state *state,
722 enum lttng_notification_channel_status *_status)
723{
724 int ret = 0;
725 struct cds_lfht_iter iter;
726 struct cds_lfht_node *node;
727 struct notification_client_list *client_list;
728 struct lttng_condition_list_element *condition_list_element = NULL;
729 struct notification_client_list_element *client_list_element = NULL;
730 enum lttng_notification_channel_status status =
731 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
732
733 /*
734 * Ensure that the client has not already subscribed to this condition
735 * before.
736 */
737 cds_list_for_each_entry(condition_list_element, &client->condition_list, node) {
738 if (lttng_condition_is_equal(condition_list_element->condition,
739 condition)) {
740 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ALREADY_SUBSCRIBED;
741 goto end;
742 }
743 }
744
745 condition_list_element = zmalloc(sizeof(*condition_list_element));
746 if (!condition_list_element) {
747 ret = -1;
748 goto error;
749 }
750 client_list_element = zmalloc(sizeof(*client_list_element));
751 if (!client_list_element) {
752 ret = -1;
753 goto error;
754 }
755
756 rcu_read_lock();
757
758 /*
759 * Add the newly-subscribed condition to the client's subscription list.
760 */
761 CDS_INIT_LIST_HEAD(&condition_list_element->node);
762 condition_list_element->condition = condition;
763 cds_list_add(&condition_list_element->node, &client->condition_list);
764
ab0ee2ca
JG
765 cds_lfht_lookup(state->notification_trigger_clients_ht,
766 lttng_condition_hash(condition),
767 match_client_list_condition,
768 condition,
769 &iter);
770 node = cds_lfht_iter_get_node(&iter);
771 if (!node) {
2ae99f0b
JG
772 /*
773 * No notification-emiting trigger registered with this
774 * condition. We don't evaluate the condition right away
775 * since this trigger is not registered yet.
776 */
4fb43b68 777 free(client_list_element);
ab0ee2ca
JG
778 goto end_unlock;
779 }
780
781 client_list = caa_container_of(node, struct notification_client_list,
782 notification_trigger_ht_node);
2ae99f0b
JG
783 /*
784 * The condition to which the client just subscribed is evaluated
785 * at this point so that conditions that are already TRUE result
786 * in a notification being sent out.
787 */
e4db5ace
JR
788 if (evaluate_condition_for_client(client_list->trigger, condition,
789 client, state)) {
790 WARN("[notification-thread] Evaluation of a condition on client subscription failed, aborting.");
791 ret = -1;
4bd5b4af 792 free(client_list_element);
e4db5ace
JR
793 goto end_unlock;
794 }
795
796 /*
797 * Add the client to the list of clients interested in a given trigger
798 * if a "notification" trigger with a corresponding condition was
799 * added prior.
800 */
ab0ee2ca
JG
801 client_list_element->client = client;
802 CDS_INIT_LIST_HEAD(&client_list_element->node);
803 cds_list_add(&client_list_element->node, &client_list->list);
804end_unlock:
805 rcu_read_unlock();
806end:
807 if (_status) {
808 *_status = status;
809 }
810 return ret;
811error:
812 free(condition_list_element);
813 free(client_list_element);
814 return ret;
815}
816
817static
818int notification_thread_client_unsubscribe(
819 struct notification_client *client,
820 struct lttng_condition *condition,
821 struct notification_thread_state *state,
822 enum lttng_notification_channel_status *_status)
823{
824 struct cds_lfht_iter iter;
825 struct cds_lfht_node *node;
826 struct notification_client_list *client_list;
827 struct lttng_condition_list_element *condition_list_element,
828 *condition_tmp;
829 struct notification_client_list_element *client_list_element,
830 *client_tmp;
831 bool condition_found = false;
832 enum lttng_notification_channel_status status =
833 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
834
835 /* Remove the condition from the client's condition list. */
836 cds_list_for_each_entry_safe(condition_list_element, condition_tmp,
837 &client->condition_list, node) {
838 if (!lttng_condition_is_equal(condition_list_element->condition,
839 condition)) {
840 continue;
841 }
842
843 cds_list_del(&condition_list_element->node);
844 /*
845 * The caller may be iterating on the client's conditions to
846 * tear down a client's connection. In this case, the condition
847 * will be destroyed at the end.
848 */
849 if (condition != condition_list_element->condition) {
850 lttng_condition_destroy(
851 condition_list_element->condition);
852 }
853 free(condition_list_element);
854 condition_found = true;
855 break;
856 }
857
858 if (!condition_found) {
859 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNKNOWN_CONDITION;
860 goto end;
861 }
862
863 /*
864 * Remove the client from the list of clients interested the trigger
865 * matching the condition.
866 */
867 rcu_read_lock();
868 cds_lfht_lookup(state->notification_trigger_clients_ht,
869 lttng_condition_hash(condition),
870 match_client_list_condition,
871 condition,
872 &iter);
873 node = cds_lfht_iter_get_node(&iter);
874 if (!node) {
875 goto end_unlock;
876 }
877
878 client_list = caa_container_of(node, struct notification_client_list,
879 notification_trigger_ht_node);
880 cds_list_for_each_entry_safe(client_list_element, client_tmp,
881 &client_list->list, node) {
882 if (client_list_element->client->socket != client->socket) {
883 continue;
884 }
885 cds_list_del(&client_list_element->node);
886 free(client_list_element);
887 break;
888 }
889end_unlock:
890 rcu_read_unlock();
891end:
892 lttng_condition_destroy(condition);
893 if (_status) {
894 *_status = status;
895 }
896 return 0;
897}
898
899static
900void notification_client_destroy(struct notification_client *client,
901 struct notification_thread_state *state)
902{
903 struct lttng_condition_list_element *condition_list_element, *tmp;
904
905 if (!client) {
906 return;
907 }
908
909 /* Release all conditions to which the client was subscribed. */
910 cds_list_for_each_entry_safe(condition_list_element, tmp,
911 &client->condition_list, node) {
912 (void) notification_thread_client_unsubscribe(client,
913 condition_list_element->condition, state, NULL);
914 }
915
916 if (client->socket >= 0) {
917 (void) lttcomm_close_unix_sock(client->socket);
918 }
919 lttng_dynamic_buffer_reset(&client->communication.inbound.buffer);
920 lttng_dynamic_buffer_reset(&client->communication.outbound.buffer);
921 free(client);
922}
923
924/*
925 * Call with rcu_read_lock held (and hold for the lifetime of the returned
926 * client pointer).
927 */
928static
929struct notification_client *get_client_from_socket(int socket,
930 struct notification_thread_state *state)
931{
932 struct cds_lfht_iter iter;
933 struct cds_lfht_node *node;
934 struct notification_client *client = NULL;
935
936 cds_lfht_lookup(state->client_socket_ht,
937 hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed),
938 match_client,
939 (void *) (unsigned long) socket,
940 &iter);
941 node = cds_lfht_iter_get_node(&iter);
942 if (!node) {
943 goto end;
944 }
945
946 client = caa_container_of(node, struct notification_client,
947 client_socket_ht_node);
948end:
949 return client;
950}
951
952static
e8360425
JD
953bool buffer_usage_condition_applies_to_channel(
954 struct lttng_condition *condition,
8abe313a 955 struct channel_info *channel_info)
ab0ee2ca
JG
956{
957 enum lttng_condition_status status;
e8360425
JD
958 enum lttng_domain_type condition_domain;
959 const char *condition_session_name = NULL;
960 const char *condition_channel_name = NULL;
ab0ee2ca 961
e8360425
JD
962 status = lttng_condition_buffer_usage_get_domain_type(condition,
963 &condition_domain);
964 assert(status == LTTNG_CONDITION_STATUS_OK);
965 if (channel_info->key.domain != condition_domain) {
ab0ee2ca
JG
966 goto fail;
967 }
968
e8360425
JD
969 status = lttng_condition_buffer_usage_get_session_name(
970 condition, &condition_session_name);
971 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
972
973 status = lttng_condition_buffer_usage_get_channel_name(
974 condition, &condition_channel_name);
975 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_channel_name);
976
977 if (strcmp(channel_info->session_info->name, condition_session_name)) {
978 goto fail;
979 }
980 if (strcmp(channel_info->name, condition_channel_name)) {
ab0ee2ca
JG
981 goto fail;
982 }
983
e8360425
JD
984 return true;
985fail:
986 return false;
987}
988
989static
990bool session_consumed_size_condition_applies_to_channel(
991 struct lttng_condition *condition,
992 struct channel_info *channel_info)
993{
994 enum lttng_condition_status status;
995 const char *condition_session_name = NULL;
996
997 status = lttng_condition_session_consumed_size_get_session_name(
998 condition, &condition_session_name);
999 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
1000
1001 if (strcmp(channel_info->session_info->name, condition_session_name)) {
ab0ee2ca
JG
1002 goto fail;
1003 }
1004
e8360425
JD
1005 return true;
1006fail:
1007 return false;
1008}
ab0ee2ca 1009
e8360425
JD
1010static
1011bool trigger_applies_to_channel(struct lttng_trigger *trigger,
1012 struct channel_info *channel_info)
1013{
1014 struct lttng_condition *condition;
1015 bool trigger_applies;
ab0ee2ca 1016
e8360425
JD
1017 condition = lttng_trigger_get_condition(trigger);
1018 if (!condition) {
ab0ee2ca
JG
1019 goto fail;
1020 }
e8360425
JD
1021
1022 switch (lttng_condition_get_type(condition)) {
1023 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
1024 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
1025 trigger_applies = buffer_usage_condition_applies_to_channel(
1026 condition, channel_info);
1027 break;
1028 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
1029 trigger_applies = session_consumed_size_condition_applies_to_channel(
1030 condition, channel_info);
1031 break;
1032 default:
ab0ee2ca
JG
1033 goto fail;
1034 }
1035
e8360425 1036 return trigger_applies;
ab0ee2ca
JG
1037fail:
1038 return false;
1039}
1040
1041static
1042bool trigger_applies_to_client(struct lttng_trigger *trigger,
1043 struct notification_client *client)
1044{
1045 bool applies = false;
1046 struct lttng_condition_list_element *condition_list_element;
1047
1048 cds_list_for_each_entry(condition_list_element, &client->condition_list,
1049 node) {
1050 applies = lttng_condition_is_equal(
1051 condition_list_element->condition,
1052 lttng_trigger_get_condition(trigger));
1053 if (applies) {
1054 break;
1055 }
1056 }
1057 return applies;
1058}
1059
8abe313a
JG
1060static
1061int match_session(struct cds_lfht_node *node, const void *key)
1062{
1063 const char *name = key;
1064 struct session_info *session_info = caa_container_of(
1065 node, struct session_info, sessions_ht_node);
1066
1067 return !strcmp(session_info->name, name);
1068}
1069
ea9a44f0
JG
1070/*
1071 * Allocate an empty lttng_session_trigger_list for the session named
1072 * 'session_name'.
1073 *
1074 * No ownership of 'session_name' is assumed by the session trigger list.
1075 * It is the caller's responsability to ensure the session name is alive
1076 * for as long as this list is.
1077 */
1078static
1079struct lttng_session_trigger_list *lttng_session_trigger_list_create(
1080 const char *session_name,
1081 struct cds_lfht *session_triggers_ht)
1082{
1083 struct lttng_session_trigger_list *list;
1084
1085 list = zmalloc(sizeof(*list));
1086 if (!list) {
1087 goto end;
1088 }
1089 list->session_name = session_name;
1090 CDS_INIT_LIST_HEAD(&list->list);
1091 cds_lfht_node_init(&list->session_triggers_ht_node);
1092 list->session_triggers_ht = session_triggers_ht;
1093
1094 rcu_read_lock();
1095 /* Publish the list through the session_triggers_ht. */
1096 cds_lfht_add(session_triggers_ht,
1097 hash_key_str(session_name, lttng_ht_seed),
1098 &list->session_triggers_ht_node);
1099 rcu_read_unlock();
1100end:
1101 return list;
1102}
1103
1104static
1105void free_session_trigger_list_rcu(struct rcu_head *node)
1106{
1107 free(caa_container_of(node, struct lttng_session_trigger_list,
1108 rcu_node));
1109}
1110
1111static
1112void lttng_session_trigger_list_destroy(struct lttng_session_trigger_list *list)
1113{
1114 struct lttng_trigger_list_element *trigger_list_element, *tmp;
1115
1116 /* Empty the list element by element, and then free the list itself. */
1117 cds_list_for_each_entry_safe(trigger_list_element, tmp,
1118 &list->list, node) {
1119 cds_list_del(&trigger_list_element->node);
1120 free(trigger_list_element);
1121 }
1122 rcu_read_lock();
1123 /* Unpublish the list from the session_triggers_ht. */
1124 cds_lfht_del(list->session_triggers_ht,
1125 &list->session_triggers_ht_node);
1126 rcu_read_unlock();
1127 call_rcu(&list->rcu_node, free_session_trigger_list_rcu);
1128}
1129
1130static
1131int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
1132 const struct lttng_trigger *trigger)
1133{
1134 int ret = 0;
1135 struct lttng_trigger_list_element *new_element =
1136 zmalloc(sizeof(*new_element));
1137
1138 if (!new_element) {
1139 ret = -1;
1140 goto end;
1141 }
1142 CDS_INIT_LIST_HEAD(&new_element->node);
1143 new_element->trigger = trigger;
1144 cds_list_add(&new_element->node, &list->list);
1145end:
1146 return ret;
1147}
1148
1149static
1150bool trigger_applies_to_session(const struct lttng_trigger *trigger,
1151 const char *session_name)
1152{
1153 bool applies = false;
1154 const struct lttng_condition *condition;
1155
1156 condition = lttng_trigger_get_const_condition(trigger);
1157 switch (lttng_condition_get_type(condition)) {
1158 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
1159 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
1160 {
1161 enum lttng_condition_status condition_status;
1162 const char *condition_session_name;
1163
1164 condition_status = lttng_condition_session_rotation_get_session_name(
1165 condition, &condition_session_name);
1166 if (condition_status != LTTNG_CONDITION_STATUS_OK) {
1167 ERR("[notification-thread] Failed to retrieve session rotation condition's session name");
1168 goto end;
1169 }
1170
1171 assert(condition_session_name);
1172 applies = !strcmp(condition_session_name, session_name);
1173 break;
1174 }
1175 default:
1176 goto end;
1177 }
1178end:
1179 return applies;
1180}
1181
1182/*
1183 * Allocate and initialize an lttng_session_trigger_list which contains
1184 * all triggers that apply to the session named 'session_name'.
1185 *
1186 * No ownership of 'session_name' is assumed by the session trigger list.
1187 * It is the caller's responsability to ensure the session name is alive
1188 * for as long as this list is.
1189 */
1190static
1191struct lttng_session_trigger_list *lttng_session_trigger_list_build(
1192 const struct notification_thread_state *state,
1193 const char *session_name)
1194{
1195 int trigger_count = 0;
1196 struct lttng_session_trigger_list *session_trigger_list = NULL;
1197 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1198 struct cds_lfht_iter iter;
1199
1200 session_trigger_list = lttng_session_trigger_list_create(session_name,
1201 state->session_triggers_ht);
1202
1203 /* Add all triggers applying to the session named 'session_name'. */
1204 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1205 node) {
1206 int ret;
1207
1208 if (!trigger_applies_to_session(trigger_ht_element->trigger,
1209 session_name)) {
1210 continue;
1211 }
1212
1213 ret = lttng_session_trigger_list_add(session_trigger_list,
1214 trigger_ht_element->trigger);
1215 if (ret) {
1216 goto error;
1217 }
1218
1219 trigger_count++;
1220 }
1221
1222 DBG("[notification-thread] Found %i triggers that apply to newly created session",
1223 trigger_count);
1224 return session_trigger_list;
1225error:
1226 lttng_session_trigger_list_destroy(session_trigger_list);
1227 return NULL;
1228}
1229
8abe313a
JG
1230static
1231struct session_info *find_or_create_session_info(
1232 struct notification_thread_state *state,
1233 const char *name, uid_t uid, gid_t gid)
1234{
1235 struct session_info *session = NULL;
1236 struct cds_lfht_node *node;
1237 struct cds_lfht_iter iter;
ea9a44f0 1238 struct lttng_session_trigger_list *trigger_list;
8abe313a
JG
1239
1240 rcu_read_lock();
1241 cds_lfht_lookup(state->sessions_ht,
1242 hash_key_str(name, lttng_ht_seed),
1243 match_session,
1244 name,
1245 &iter);
1246 node = cds_lfht_iter_get_node(&iter);
1247 if (node) {
1248 DBG("[notification-thread] Found session info of session \"%s\" (uid = %i, gid = %i)",
1249 name, uid, gid);
1250 session = caa_container_of(node, struct session_info,
1251 sessions_ht_node);
1252 assert(session->uid == uid);
1253 assert(session->gid == gid);
ea9a44f0
JG
1254 goto error;
1255 }
1256
1257 trigger_list = lttng_session_trigger_list_build(state, name);
1258 if (!trigger_list) {
1259 goto error;
8abe313a
JG
1260 }
1261
ea9a44f0 1262 session = session_info_create(name, uid, gid, trigger_list);
8abe313a
JG
1263 if (!session) {
1264 ERR("[notification-thread] Failed to allocation session info for session \"%s\" (uid = %i, gid = %i)",
1265 name, uid, gid);
ea9a44f0 1266 goto error;
8abe313a 1267 }
ea9a44f0 1268 trigger_list = NULL;
577983cb
JG
1269
1270 cds_lfht_add(state->sessions_ht, hash_key_str(name, lttng_ht_seed),
9b63a4aa 1271 &session->sessions_ht_node);
8abe313a
JG
1272 rcu_read_unlock();
1273 return session;
ea9a44f0
JG
1274error:
1275 rcu_read_unlock();
1276 session_info_put(session);
1277 return NULL;
8abe313a
JG
1278}
1279
ab0ee2ca
JG
1280static
1281int handle_notification_thread_command_add_channel(
8abe313a
JG
1282 struct notification_thread_state *state,
1283 const char *session_name, uid_t session_uid, gid_t session_gid,
1284 const char *channel_name, enum lttng_domain_type channel_domain,
1285 uint64_t channel_key_int, uint64_t channel_capacity,
1286 enum lttng_error_code *cmd_result)
ab0ee2ca
JG
1287{
1288 struct cds_list_head trigger_list;
8abe313a
JG
1289 struct channel_info *new_channel_info = NULL;
1290 struct channel_key channel_key = {
1291 .key = channel_key_int,
1292 .domain = channel_domain,
1293 };
ab0ee2ca
JG
1294 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
1295 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1296 int trigger_count = 0;
1297 struct cds_lfht_iter iter;
8abe313a 1298 struct session_info *session_info = NULL;
ab0ee2ca
JG
1299
1300 DBG("[notification-thread] Adding channel %s from session %s, channel key = %" PRIu64 " in %s domain",
8abe313a
JG
1301 channel_name, session_name, channel_key_int,
1302 channel_domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
ab0ee2ca
JG
1303
1304 CDS_INIT_LIST_HEAD(&trigger_list);
1305
8abe313a
JG
1306 session_info = find_or_create_session_info(state, session_name,
1307 session_uid, session_gid);
1308 if (!session_info) {
1309 /* Allocation error or an internal error occured. */
ab0ee2ca
JG
1310 goto error;
1311 }
1312
8abe313a
JG
1313 new_channel_info = channel_info_create(channel_name, &channel_key,
1314 channel_capacity, session_info);
1315 if (!new_channel_info) {
1316 goto error;
1317 }
ab0ee2ca
JG
1318
1319 /* Build a list of all triggers applying to the new channel. */
1320 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1321 node) {
1322 struct lttng_trigger_list_element *new_element;
1323
1324 if (!trigger_applies_to_channel(trigger_ht_element->trigger,
8abe313a 1325 new_channel_info)) {
ab0ee2ca
JG
1326 continue;
1327 }
1328
1329 new_element = zmalloc(sizeof(*new_element));
1330 if (!new_element) {
1331 goto error;
1332 }
1333 CDS_INIT_LIST_HEAD(&new_element->node);
1334 new_element->trigger = trigger_ht_element->trigger;
1335 cds_list_add(&new_element->node, &trigger_list);
1336 trigger_count++;
1337 }
1338
1339 DBG("[notification-thread] Found %i triggers that apply to newly added channel",
1340 trigger_count);
1341 channel_trigger_list = zmalloc(sizeof(*channel_trigger_list));
1342 if (!channel_trigger_list) {
1343 goto error;
1344 }
8abe313a 1345 channel_trigger_list->channel_key = new_channel_info->key;
ab0ee2ca
JG
1346 CDS_INIT_LIST_HEAD(&channel_trigger_list->list);
1347 cds_lfht_node_init(&channel_trigger_list->channel_triggers_ht_node);
1348 cds_list_splice(&trigger_list, &channel_trigger_list->list);
1349
1350 rcu_read_lock();
1351 /* Add channel to the channel_ht which owns the channel_infos. */
1352 cds_lfht_add(state->channels_ht,
8abe313a 1353 hash_channel_key(&new_channel_info->key),
ab0ee2ca
JG
1354 &new_channel_info->channels_ht_node);
1355 /*
1356 * Add the list of triggers associated with this channel to the
1357 * channel_triggers_ht.
1358 */
1359 cds_lfht_add(state->channel_triggers_ht,
8abe313a 1360 hash_channel_key(&new_channel_info->key),
ab0ee2ca
JG
1361 &channel_trigger_list->channel_triggers_ht_node);
1362 rcu_read_unlock();
1363 *cmd_result = LTTNG_OK;
1364 return 0;
1365error:
ab0ee2ca 1366 channel_info_destroy(new_channel_info);
8abe313a 1367 session_info_put(session_info);
ab0ee2ca
JG
1368 return 1;
1369}
1370
1371static
1372int handle_notification_thread_command_remove_channel(
1373 struct notification_thread_state *state,
1374 uint64_t channel_key, enum lttng_domain_type domain,
1375 enum lttng_error_code *cmd_result)
1376{
1377 struct cds_lfht_node *node;
1378 struct cds_lfht_iter iter;
1379 struct lttng_channel_trigger_list *trigger_list;
1380 struct lttng_trigger_list_element *trigger_list_element, *tmp;
1381 struct channel_key key = { .key = channel_key, .domain = domain };
1382 struct channel_info *channel_info;
1383
1384 DBG("[notification-thread] Removing channel key = %" PRIu64 " in %s domain",
1385 channel_key, domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
1386
1387 rcu_read_lock();
1388
1389 cds_lfht_lookup(state->channel_triggers_ht,
1390 hash_channel_key(&key),
1391 match_channel_trigger_list,
1392 &key,
1393 &iter);
1394 node = cds_lfht_iter_get_node(&iter);
1395 /*
1396 * There is a severe internal error if we are being asked to remove a
1397 * channel that doesn't exist.
1398 */
1399 if (!node) {
1400 ERR("[notification-thread] Channel being removed is unknown to the notification thread");
1401 goto end;
1402 }
1403
1404 /* Free the list of triggers associated with this channel. */
1405 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
1406 channel_triggers_ht_node);
1407 cds_list_for_each_entry_safe(trigger_list_element, tmp,
1408 &trigger_list->list, node) {
1409 cds_list_del(&trigger_list_element->node);
1410 free(trigger_list_element);
1411 }
1412 cds_lfht_del(state->channel_triggers_ht, node);
1413 free(trigger_list);
1414
1415 /* Free sampled channel state. */
1416 cds_lfht_lookup(state->channel_state_ht,
1417 hash_channel_key(&key),
1418 match_channel_state_sample,
1419 &key,
1420 &iter);
1421 node = cds_lfht_iter_get_node(&iter);
1422 /*
1423 * This is expected to be NULL if the channel is destroyed before we
1424 * received a sample.
1425 */
1426 if (node) {
1427 struct channel_state_sample *sample = caa_container_of(node,
1428 struct channel_state_sample,
1429 channel_state_ht_node);
1430
1431 cds_lfht_del(state->channel_state_ht, node);
1432 free(sample);
1433 }
1434
1435 /* Remove the channel from the channels_ht and free it. */
1436 cds_lfht_lookup(state->channels_ht,
1437 hash_channel_key(&key),
1438 match_channel_info,
1439 &key,
1440 &iter);
1441 node = cds_lfht_iter_get_node(&iter);
1442 assert(node);
1443 channel_info = caa_container_of(node, struct channel_info,
1444 channels_ht_node);
1445 cds_lfht_del(state->channels_ht, node);
1446 channel_info_destroy(channel_info);
1447end:
1448 rcu_read_unlock();
1449 *cmd_result = LTTNG_OK;
1450 return 0;
1451}
1452
1da26331
JG
1453static
1454int condition_is_supported(struct lttng_condition *condition)
1455{
1456 int ret;
1457
1458 switch (lttng_condition_get_type(condition)) {
1459 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
1460 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
1461 {
1462 enum lttng_domain_type domain;
1463
1464 ret = lttng_condition_buffer_usage_get_domain_type(condition,
1465 &domain);
1466 if (ret) {
1467 ret = -1;
1468 goto end;
1469 }
1470
1471 if (domain != LTTNG_DOMAIN_KERNEL) {
1472 ret = 1;
1473 goto end;
1474 }
1475
1476 /*
1477 * Older kernel tracers don't expose the API to monitor their
1478 * buffers. Therefore, we reject triggers that require that
1479 * mechanism to be available to be evaluated.
1480 */
1481 ret = kernel_supports_ring_buffer_snapshot_sample_positions(
1482 kernel_tracer_fd);
1483 break;
1484 }
1485 default:
1486 ret = 1;
1487 }
1488end:
1489 return ret;
1490}
1491
ab0ee2ca
JG
1492/*
1493 * FIXME A client's credentials are not checked when registering a trigger, nor
1494 * are they stored alongside with the trigger.
1495 *
1da26331 1496 * The effects of this are benign since:
ab0ee2ca
JG
1497 * - The client will succeed in registering the trigger, as it is valid,
1498 * - The trigger will, internally, be bound to the channel,
1499 * - The notifications will not be sent since the client's credentials
1500 * are checked against the channel at that moment.
1da26331
JG
1501 *
1502 * If this function returns a non-zero value, it means something is
50ca7858 1503 * fundamentally broken and the whole subsystem/thread will be torn down.
1da26331
JG
1504 *
1505 * If a non-fatal error occurs, just set the cmd_result to the appropriate
1506 * error code.
ab0ee2ca
JG
1507 */
1508static
1509int handle_notification_thread_command_register_trigger(
8abe313a
JG
1510 struct notification_thread_state *state,
1511 struct lttng_trigger *trigger,
1512 enum lttng_error_code *cmd_result)
ab0ee2ca
JG
1513{
1514 int ret = 0;
1515 struct lttng_condition *condition;
1516 struct notification_client *client;
1517 struct notification_client_list *client_list = NULL;
1518 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1519 struct notification_client_list_element *client_list_element, *tmp;
1520 struct cds_lfht_node *node;
1521 struct cds_lfht_iter iter;
1522 struct channel_info *channel;
1523 bool free_trigger = true;
1524
1525 rcu_read_lock();
1526
1527 condition = lttng_trigger_get_condition(trigger);
1da26331
JG
1528 assert(condition);
1529
1530 ret = condition_is_supported(condition);
1531 if (ret < 0) {
1532 goto error;
1533 } else if (ret == 0) {
1534 *cmd_result = LTTNG_ERR_NOT_SUPPORTED;
1535 goto error;
1536 } else {
1537 /* Feature is supported, continue. */
1538 ret = 0;
1539 }
1540
ab0ee2ca
JG
1541 trigger_ht_element = zmalloc(sizeof(*trigger_ht_element));
1542 if (!trigger_ht_element) {
1543 ret = -1;
1544 goto error;
1545 }
1546
1547 /* Add trigger to the trigger_ht. */
1548 cds_lfht_node_init(&trigger_ht_element->node);
1549 trigger_ht_element->trigger = trigger;
1550
1551 node = cds_lfht_add_unique(state->triggers_ht,
1552 lttng_condition_hash(condition),
1553 match_condition,
1554 condition,
1555 &trigger_ht_element->node);
1556 if (node != &trigger_ht_element->node) {
1557 /* Not a fatal error, simply report it to the client. */
1558 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
1559 goto error_free_ht_element;
1560 }
1561
1562 /*
1563 * Ownership of the trigger and of its wrapper was transfered to
1564 * the triggers_ht.
1565 */
1566 trigger_ht_element = NULL;
1567 free_trigger = false;
1568
1569 /*
1570 * The rest only applies to triggers that have a "notify" action.
1571 * It is not skipped as this is the only action type currently
1572 * supported.
1573 */
1574 client_list = zmalloc(sizeof(*client_list));
1575 if (!client_list) {
1576 ret = -1;
1577 goto error_free_ht_element;
1578 }
1579 cds_lfht_node_init(&client_list->notification_trigger_ht_node);
1580 CDS_INIT_LIST_HEAD(&client_list->list);
1581 client_list->trigger = trigger;
1582
1583 /* Build a list of clients to which this new trigger applies. */
1584 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
1585 client_socket_ht_node) {
1586 if (!trigger_applies_to_client(trigger, client)) {
1587 continue;
1588 }
1589
1590 client_list_element = zmalloc(sizeof(*client_list_element));
1591 if (!client_list_element) {
1592 ret = -1;
1593 goto error_free_client_list;
1594 }
1595 CDS_INIT_LIST_HEAD(&client_list_element->node);
1596 client_list_element->client = client;
1597 cds_list_add(&client_list_element->node, &client_list->list);
1598 }
1599
1600 cds_lfht_add(state->notification_trigger_clients_ht,
1601 lttng_condition_hash(condition),
1602 &client_list->notification_trigger_ht_node);
ab0ee2ca
JG
1603
1604 /*
1605 * Add the trigger to list of triggers bound to the channels currently
1606 * known.
1607 */
1608 cds_lfht_for_each_entry(state->channels_ht, &iter, channel,
1609 channels_ht_node) {
1610 struct lttng_trigger_list_element *trigger_list_element;
1611 struct lttng_channel_trigger_list *trigger_list;
1612
1613 if (!trigger_applies_to_channel(trigger, channel)) {
1614 continue;
1615 }
1616
1617 cds_lfht_lookup(state->channel_triggers_ht,
1618 hash_channel_key(&channel->key),
1619 match_channel_trigger_list,
1620 &channel->key,
1621 &iter);
1622 node = cds_lfht_iter_get_node(&iter);
1623 assert(node);
ab0ee2ca
JG
1624 trigger_list = caa_container_of(node,
1625 struct lttng_channel_trigger_list,
1626 channel_triggers_ht_node);
1627
1628 trigger_list_element = zmalloc(sizeof(*trigger_list_element));
1629 if (!trigger_list_element) {
1630 ret = -1;
1631 goto error_free_client_list;
1632 }
1633 CDS_INIT_LIST_HEAD(&trigger_list_element->node);
1634 trigger_list_element->trigger = trigger;
1635 cds_list_add(&trigger_list_element->node, &trigger_list->list);
ab0ee2ca
JG
1636 }
1637
2ae99f0b
JG
1638 /*
1639 * Since there is nothing preventing clients from subscribing to a
1640 * condition before the corresponding trigger is registered, we have
1641 * to evaluate this new condition right away.
1642 *
1643 * At some point, we were waiting for the next "evaluation" (e.g. on
1644 * reception of a channel sample) to evaluate this new condition, but
1645 * that was broken.
1646 *
1647 * The reason it was broken is that waiting for the next sample
1648 * does not allow us to properly handle transitions for edge-triggered
1649 * conditions.
1650 *
1651 * Consider this example: when we handle a new channel sample, we
1652 * evaluate each conditions twice: once with the previous state, and
1653 * again with the newest state. We then use those two results to
1654 * determine whether a state change happened: a condition was false and
1655 * became true. If a state change happened, we have to notify clients.
1656 *
1657 * Now, if a client subscribes to a given notification and registers
1658 * a trigger *after* that subscription, we have to make sure the
1659 * condition is evaluated at this point while considering only the
1660 * current state. Otherwise, the next evaluation cycle may only see
1661 * that the evaluations remain the same (true for samples n-1 and n) and
1662 * the client will never know that the condition has been met.
1663 */
1664 cds_list_for_each_entry_safe(client_list_element, tmp,
1665 &client_list->list, node) {
1666 ret = evaluate_condition_for_client(trigger, condition,
1667 client_list_element->client, state);
1668 if (ret) {
1669 goto error_free_client_list;
1670 }
1671 }
1672
1673 /*
1674 * Client list ownership transferred to the
1675 * notification_trigger_clients_ht.
1676 */
1677 client_list = NULL;
1678
ab0ee2ca
JG
1679 *cmd_result = LTTNG_OK;
1680error_free_client_list:
1681 if (client_list) {
1682 cds_list_for_each_entry_safe(client_list_element, tmp,
1683 &client_list->list, node) {
1684 free(client_list_element);
1685 }
1686 free(client_list);
1687 }
1688error_free_ht_element:
1689 free(trigger_ht_element);
1690error:
1691 if (free_trigger) {
1692 struct lttng_action *action = lttng_trigger_get_action(trigger);
1693
1694 lttng_condition_destroy(condition);
1695 lttng_action_destroy(action);
1696 lttng_trigger_destroy(trigger);
1697 }
1698 rcu_read_unlock();
1699 return ret;
1700}
1701
cc2295b5 1702static
ab0ee2ca
JG
1703int handle_notification_thread_command_unregister_trigger(
1704 struct notification_thread_state *state,
1705 struct lttng_trigger *trigger,
1706 enum lttng_error_code *_cmd_reply)
1707{
1708 struct cds_lfht_iter iter;
1709 struct cds_lfht_node *node, *triggers_ht_node;
1710 struct lttng_channel_trigger_list *trigger_list;
1711 struct notification_client_list *client_list;
1712 struct notification_client_list_element *client_list_element, *tmp;
1713 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1714 struct lttng_condition *condition = lttng_trigger_get_condition(
1715 trigger);
1716 struct lttng_action *action;
1717 enum lttng_error_code cmd_reply;
1718
1719 rcu_read_lock();
1720
1721 cds_lfht_lookup(state->triggers_ht,
1722 lttng_condition_hash(condition),
1723 match_condition,
1724 condition,
1725 &iter);
1726 triggers_ht_node = cds_lfht_iter_get_node(&iter);
1727 if (!triggers_ht_node) {
1728 cmd_reply = LTTNG_ERR_TRIGGER_NOT_FOUND;
1729 goto end;
1730 } else {
1731 cmd_reply = LTTNG_OK;
1732 }
1733
1734 /* Remove trigger from channel_triggers_ht. */
1735 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter, trigger_list,
1736 channel_triggers_ht_node) {
1737 struct lttng_trigger_list_element *trigger_element, *tmp;
1738
1739 cds_list_for_each_entry_safe(trigger_element, tmp,
1740 &trigger_list->list, node) {
9b63a4aa
JG
1741 const struct lttng_condition *current_condition =
1742 lttng_trigger_get_const_condition(
ab0ee2ca
JG
1743 trigger_element->trigger);
1744
1745 assert(current_condition);
1746 if (!lttng_condition_is_equal(condition,
1747 current_condition)) {
1748 continue;
1749 }
1750
1751 DBG("[notification-thread] Removed trigger from channel_triggers_ht");
1752 cds_list_del(&trigger_element->node);
e4db5ace
JR
1753 /* A trigger can only appear once per channel */
1754 break;
ab0ee2ca
JG
1755 }
1756 }
1757
1758 /*
1759 * Remove and release the client list from
1760 * notification_trigger_clients_ht.
1761 */
1762 cds_lfht_lookup(state->notification_trigger_clients_ht,
1763 lttng_condition_hash(condition),
1764 match_client_list,
1765 trigger,
1766 &iter);
1767 node = cds_lfht_iter_get_node(&iter);
1768 assert(node);
1769 client_list = caa_container_of(node, struct notification_client_list,
1770 notification_trigger_ht_node);
1771 cds_list_for_each_entry_safe(client_list_element, tmp,
1772 &client_list->list, node) {
1773 free(client_list_element);
1774 }
1775 cds_lfht_del(state->notification_trigger_clients_ht, node);
1776 free(client_list);
1777
1778 /* Remove trigger from triggers_ht. */
1779 trigger_ht_element = caa_container_of(triggers_ht_node,
1780 struct lttng_trigger_ht_element, node);
1781 cds_lfht_del(state->triggers_ht, triggers_ht_node);
1782
1783 condition = lttng_trigger_get_condition(trigger_ht_element->trigger);
1784 lttng_condition_destroy(condition);
1785 action = lttng_trigger_get_action(trigger_ht_element->trigger);
1786 lttng_action_destroy(action);
1787 lttng_trigger_destroy(trigger_ht_element->trigger);
1788 free(trigger_ht_element);
1789end:
1790 rcu_read_unlock();
1791 if (_cmd_reply) {
1792 *_cmd_reply = cmd_reply;
1793 }
1794 return 0;
1795}
1796
1797/* Returns 0 on success, 1 on exit requested, negative value on error. */
1798int handle_notification_thread_command(
1799 struct notification_thread_handle *handle,
1800 struct notification_thread_state *state)
1801{
1802 int ret;
1803 uint64_t counter;
1804 struct notification_thread_command *cmd;
1805
8abe313a
JG
1806 /* Read the event pipe to put it back into a quiescent state. */
1807 ret = read(lttng_pipe_get_readfd(handle->cmd_queue.event_pipe), &counter,
1808 sizeof(counter));
ab0ee2ca
JG
1809 if (ret == -1) {
1810 goto error;
1811 }
1812
1813 pthread_mutex_lock(&handle->cmd_queue.lock);
1814 cmd = cds_list_first_entry(&handle->cmd_queue.list,
1815 struct notification_thread_command, cmd_list_node);
1816 switch (cmd->type) {
1817 case NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER:
1818 DBG("[notification-thread] Received register trigger command");
1819 ret = handle_notification_thread_command_register_trigger(
1820 state, cmd->parameters.trigger,
1821 &cmd->reply_code);
1822 break;
1823 case NOTIFICATION_COMMAND_TYPE_UNREGISTER_TRIGGER:
1824 DBG("[notification-thread] Received unregister trigger command");
1825 ret = handle_notification_thread_command_unregister_trigger(
1826 state, cmd->parameters.trigger,
1827 &cmd->reply_code);
1828 break;
1829 case NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL:
1830 DBG("[notification-thread] Received add channel command");
1831 ret = handle_notification_thread_command_add_channel(
8abe313a
JG
1832 state,
1833 cmd->parameters.add_channel.session.name,
1834 cmd->parameters.add_channel.session.uid,
1835 cmd->parameters.add_channel.session.gid,
1836 cmd->parameters.add_channel.channel.name,
1837 cmd->parameters.add_channel.channel.domain,
1838 cmd->parameters.add_channel.channel.key,
1839 cmd->parameters.add_channel.channel.capacity,
ab0ee2ca
JG
1840 &cmd->reply_code);
1841 break;
1842 case NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL:
1843 DBG("[notification-thread] Received remove channel command");
1844 ret = handle_notification_thread_command_remove_channel(
1845 state, cmd->parameters.remove_channel.key,
1846 cmd->parameters.remove_channel.domain,
1847 &cmd->reply_code);
1848 break;
1849 case NOTIFICATION_COMMAND_TYPE_QUIT:
1850 DBG("[notification-thread] Received quit command");
1851 cmd->reply_code = LTTNG_OK;
1852 ret = 1;
1853 goto end;
1854 default:
1855 ERR("[notification-thread] Unknown internal command received");
1856 goto error_unlock;
1857 }
1858
1859 if (ret) {
1860 goto error_unlock;
1861 }
1862end:
1863 cds_list_del(&cmd->cmd_list_node);
8ada111f 1864 lttng_waiter_wake_up(&cmd->reply_waiter);
ab0ee2ca
JG
1865 pthread_mutex_unlock(&handle->cmd_queue.lock);
1866 return ret;
1867error_unlock:
1868 /* Wake-up and return a fatal error to the calling thread. */
8ada111f 1869 lttng_waiter_wake_up(&cmd->reply_waiter);
ab0ee2ca
JG
1870 pthread_mutex_unlock(&handle->cmd_queue.lock);
1871 cmd->reply_code = LTTNG_ERR_FATAL;
1872error:
1873 /* Indicate a fatal error to the caller. */
1874 return -1;
1875}
1876
1877static
1878unsigned long hash_client_socket(int socket)
1879{
1880 return hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed);
1881}
1882
1883static
1884int socket_set_non_blocking(int socket)
1885{
1886 int ret, flags;
1887
1888 /* Set the pipe as non-blocking. */
1889 ret = fcntl(socket, F_GETFL, 0);
1890 if (ret == -1) {
1891 PERROR("fcntl get socket flags");
1892 goto end;
1893 }
1894 flags = ret;
1895
1896 ret = fcntl(socket, F_SETFL, flags | O_NONBLOCK);
1897 if (ret == -1) {
1898 PERROR("fcntl set O_NONBLOCK socket flag");
1899 goto end;
1900 }
1901 DBG("Client socket (fd = %i) set as non-blocking", socket);
1902end:
1903 return ret;
1904}
1905
1906static
14fa22f8 1907int client_reset_inbound_state(struct notification_client *client)
ab0ee2ca
JG
1908{
1909 int ret;
1910
1911 ret = lttng_dynamic_buffer_set_size(
1912 &client->communication.inbound.buffer, 0);
1913 assert(!ret);
1914
1915 client->communication.inbound.bytes_to_receive =
1916 sizeof(struct lttng_notification_channel_message);
1917 client->communication.inbound.msg_type =
1918 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN;
ab0ee2ca
JG
1919 LTTNG_SOCK_SET_UID_CRED(&client->communication.inbound.creds, -1);
1920 LTTNG_SOCK_SET_GID_CRED(&client->communication.inbound.creds, -1);
14fa22f8
JG
1921 ret = lttng_dynamic_buffer_set_size(
1922 &client->communication.inbound.buffer,
1923 client->communication.inbound.bytes_to_receive);
1924 return ret;
ab0ee2ca
JG
1925}
1926
1927int handle_notification_thread_client_connect(
1928 struct notification_thread_state *state)
1929{
1930 int ret;
1931 struct notification_client *client;
1932
1933 DBG("[notification-thread] Handling new notification channel client connection");
1934
1935 client = zmalloc(sizeof(*client));
1936 if (!client) {
1937 /* Fatal error. */
1938 ret = -1;
1939 goto error;
1940 }
1941 CDS_INIT_LIST_HEAD(&client->condition_list);
1942 lttng_dynamic_buffer_init(&client->communication.inbound.buffer);
1943 lttng_dynamic_buffer_init(&client->communication.outbound.buffer);
01ea340e 1944 client->communication.inbound.expect_creds = true;
14fa22f8
JG
1945 ret = client_reset_inbound_state(client);
1946 if (ret) {
1947 ERR("[notification-thread] Failed to reset client communication's inbound state");
1948 ret = 0;
1949 goto error;
1950 }
ab0ee2ca
JG
1951
1952 ret = lttcomm_accept_unix_sock(state->notification_channel_socket);
1953 if (ret < 0) {
1954 ERR("[notification-thread] Failed to accept new notification channel client connection");
1955 ret = 0;
1956 goto error;
1957 }
1958
1959 client->socket = ret;
1960
1961 ret = socket_set_non_blocking(client->socket);
1962 if (ret) {
1963 ERR("[notification-thread] Failed to set new notification channel client connection socket as non-blocking");
1964 goto error;
1965 }
1966
1967 ret = lttcomm_setsockopt_creds_unix_sock(client->socket);
1968 if (ret < 0) {
1969 ERR("[notification-thread] Failed to set socket options on new notification channel client socket");
1970 ret = 0;
1971 goto error;
1972 }
1973
1974 ret = lttng_poll_add(&state->events, client->socket,
1975 LPOLLIN | LPOLLERR |
1976 LPOLLHUP | LPOLLRDHUP);
1977 if (ret < 0) {
1978 ERR("[notification-thread] Failed to add notification channel client socket to poll set");
1979 ret = 0;
1980 goto error;
1981 }
1982 DBG("[notification-thread] Added new notification channel client socket (%i) to poll set",
1983 client->socket);
1984
ab0ee2ca
JG
1985 rcu_read_lock();
1986 cds_lfht_add(state->client_socket_ht,
1987 hash_client_socket(client->socket),
1988 &client->client_socket_ht_node);
1989 rcu_read_unlock();
1990
1991 return ret;
1992error:
1993 notification_client_destroy(client, state);
1994 return ret;
1995}
1996
1997int handle_notification_thread_client_disconnect(
1998 int client_socket,
1999 struct notification_thread_state *state)
2000{
2001 int ret = 0;
2002 struct notification_client *client;
2003
2004 rcu_read_lock();
2005 DBG("[notification-thread] Closing client connection (socket fd = %i)",
2006 client_socket);
2007 client = get_client_from_socket(client_socket, state);
2008 if (!client) {
2009 /* Internal state corruption, fatal error. */
2010 ERR("[notification-thread] Unable to find client (socket fd = %i)",
2011 client_socket);
2012 ret = -1;
2013 goto end;
2014 }
2015
2016 ret = lttng_poll_del(&state->events, client_socket);
2017 if (ret) {
2018 ERR("[notification-thread] Failed to remove client socket from poll set");
2019 }
2020 cds_lfht_del(state->client_socket_ht,
2021 &client->client_socket_ht_node);
2022 notification_client_destroy(client, state);
2023end:
2024 rcu_read_unlock();
2025 return ret;
2026}
2027
2028int handle_notification_thread_client_disconnect_all(
2029 struct notification_thread_state *state)
2030{
2031 struct cds_lfht_iter iter;
2032 struct notification_client *client;
2033 bool error_encoutered = false;
2034
2035 rcu_read_lock();
2036 DBG("[notification-thread] Closing all client connections");
2037 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
2038 client_socket_ht_node) {
2039 int ret;
2040
2041 ret = handle_notification_thread_client_disconnect(
2042 client->socket, state);
2043 if (ret) {
2044 error_encoutered = true;
2045 }
2046 }
2047 rcu_read_unlock();
2048 return error_encoutered ? 1 : 0;
2049}
2050
2051int handle_notification_thread_trigger_unregister_all(
2052 struct notification_thread_state *state)
2053{
4149ace8 2054 bool error_occurred = false;
ab0ee2ca
JG
2055 struct cds_lfht_iter iter;
2056 struct lttng_trigger_ht_element *trigger_ht_element;
2057
2058 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
2059 node) {
2060 int ret = handle_notification_thread_command_unregister_trigger(
2061 state, trigger_ht_element->trigger, NULL);
2062 if (ret) {
4149ace8 2063 error_occurred = true;
ab0ee2ca
JG
2064 }
2065 }
4149ace8 2066 return error_occurred ? -1 : 0;
ab0ee2ca
JG
2067}
2068
2069static
2070int client_flush_outgoing_queue(struct notification_client *client,
2071 struct notification_thread_state *state)
2072{
2073 ssize_t ret;
2074 size_t to_send_count;
2075
2076 assert(client->communication.outbound.buffer.size != 0);
2077 to_send_count = client->communication.outbound.buffer.size;
2078 DBG("[notification-thread] Flushing client (socket fd = %i) outgoing queue",
2079 client->socket);
2080
2081 ret = lttcomm_send_unix_sock_non_block(client->socket,
2082 client->communication.outbound.buffer.data,
2083 to_send_count);
2084 if ((ret < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) ||
2085 (ret > 0 && ret < to_send_count)) {
2086 DBG("[notification-thread] Client (socket fd = %i) outgoing queue could not be completely flushed",
2087 client->socket);
2088 to_send_count -= max(ret, 0);
2089
2090 memcpy(client->communication.outbound.buffer.data,
2091 client->communication.outbound.buffer.data +
2092 client->communication.outbound.buffer.size - to_send_count,
2093 to_send_count);
2094 ret = lttng_dynamic_buffer_set_size(
2095 &client->communication.outbound.buffer,
2096 to_send_count);
2097 if (ret) {
2098 goto error;
2099 }
2100
2101 /*
2102 * We want to be notified whenever there is buffer space
2103 * available to send the rest of the payload.
2104 */
2105 ret = lttng_poll_mod(&state->events, client->socket,
2106 CLIENT_POLL_MASK_IN_OUT);
2107 if (ret) {
2108 goto error;
2109 }
2110 } else if (ret < 0) {
2111 /* Generic error, disconnect the client. */
2112 ERR("[notification-thread] Failed to send flush outgoing queue, disconnecting client (socket fd = %i)",
2113 client->socket);
2114 ret = handle_notification_thread_client_disconnect(
2115 client->socket, state);
2116 if (ret) {
2117 goto error;
2118 }
2119 } else {
2120 /* No error and flushed the queue completely. */
2121 ret = lttng_dynamic_buffer_set_size(
2122 &client->communication.outbound.buffer, 0);
2123 if (ret) {
2124 goto error;
2125 }
2126 ret = lttng_poll_mod(&state->events, client->socket,
2127 CLIENT_POLL_MASK_IN);
2128 if (ret) {
2129 goto error;
2130 }
2131
2132 client->communication.outbound.queued_command_reply = false;
2133 client->communication.outbound.dropped_notification = false;
2134 }
2135
2136 return 0;
2137error:
2138 return -1;
2139}
2140
2141static
2142int client_send_command_reply(struct notification_client *client,
2143 struct notification_thread_state *state,
2144 enum lttng_notification_channel_status status)
2145{
2146 int ret;
2147 struct lttng_notification_channel_command_reply reply = {
2148 .status = (int8_t) status,
2149 };
2150 struct lttng_notification_channel_message msg = {
2151 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY,
2152 .size = sizeof(reply),
2153 };
2154 char buffer[sizeof(msg) + sizeof(reply)];
2155
2156 if (client->communication.outbound.queued_command_reply) {
2157 /* Protocol error. */
2158 goto error;
2159 }
2160
2161 memcpy(buffer, &msg, sizeof(msg));
2162 memcpy(buffer + sizeof(msg), &reply, sizeof(reply));
2163 DBG("[notification-thread] Send command reply (%i)", (int) status);
2164
2165 /* Enqueue buffer to outgoing queue and flush it. */
2166 ret = lttng_dynamic_buffer_append(
2167 &client->communication.outbound.buffer,
2168 buffer, sizeof(buffer));
2169 if (ret) {
2170 goto error;
2171 }
2172
2173 ret = client_flush_outgoing_queue(client, state);
2174 if (ret) {
2175 goto error;
2176 }
2177
2178 if (client->communication.outbound.buffer.size != 0) {
2179 /* Queue could not be emptied. */
2180 client->communication.outbound.queued_command_reply = true;
2181 }
2182
2183 return 0;
2184error:
2185 return -1;
2186}
2187
2188static
2189int client_dispatch_message(struct notification_client *client,
2190 struct notification_thread_state *state)
2191{
2192 int ret = 0;
2193
2194 if (client->communication.inbound.msg_type !=
2195 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE &&
2196 client->communication.inbound.msg_type !=
2197 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN &&
2198 !client->validated) {
2199 WARN("[notification-thread] client attempted a command before handshake");
2200 ret = -1;
2201 goto end;
2202 }
2203
2204 switch (client->communication.inbound.msg_type) {
2205 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN:
2206 {
2207 /*
2208 * Receiving message header. The function will be called again
2209 * once the rest of the message as been received and can be
2210 * interpreted.
2211 */
2212 const struct lttng_notification_channel_message *msg;
2213
2214 assert(sizeof(*msg) ==
2215 client->communication.inbound.buffer.size);
2216 msg = (const struct lttng_notification_channel_message *)
2217 client->communication.inbound.buffer.data;
2218
2219 if (msg->size == 0 || msg->size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) {
2220 ERR("[notification-thread] Invalid notification channel message: length = %u", msg->size);
2221 ret = -1;
2222 goto end;
2223 }
2224
2225 switch (msg->type) {
2226 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
2227 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
2228 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
2229 break;
2230 default:
2231 ret = -1;
2232 ERR("[notification-thread] Invalid notification channel message: unexpected message type");
2233 goto end;
2234 }
2235
2236 client->communication.inbound.bytes_to_receive = msg->size;
2237 client->communication.inbound.msg_type =
2238 (enum lttng_notification_channel_message_type) msg->type;
ab0ee2ca 2239 ret = lttng_dynamic_buffer_set_size(
14fa22f8 2240 &client->communication.inbound.buffer, msg->size);
ab0ee2ca
JG
2241 if (ret) {
2242 goto end;
2243 }
2244 break;
2245 }
2246 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
2247 {
2248 struct lttng_notification_channel_command_handshake *handshake_client;
2249 struct lttng_notification_channel_command_handshake handshake_reply = {
2250 .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
2251 .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
2252 };
2253 struct lttng_notification_channel_message msg_header = {
2254 .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE,
2255 .size = sizeof(handshake_reply),
2256 };
2257 enum lttng_notification_channel_status status =
2258 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
2259 char send_buffer[sizeof(msg_header) + sizeof(handshake_reply)];
2260
2261 memcpy(send_buffer, &msg_header, sizeof(msg_header));
2262 memcpy(send_buffer + sizeof(msg_header), &handshake_reply,
2263 sizeof(handshake_reply));
2264
2265 handshake_client =
2266 (struct lttng_notification_channel_command_handshake *)
2267 client->communication.inbound.buffer.data;
47a32869 2268 client->major = handshake_client->major;
ab0ee2ca
JG
2269 client->minor = handshake_client->minor;
2270 if (!client->communication.inbound.creds_received) {
2271 ERR("[notification-thread] No credentials received from client");
2272 ret = -1;
2273 goto end;
2274 }
2275
2276 client->uid = LTTNG_SOCK_GET_UID_CRED(
2277 &client->communication.inbound.creds);
2278 client->gid = LTTNG_SOCK_GET_GID_CRED(
2279 &client->communication.inbound.creds);
2280 DBG("[notification-thread] Received handshake from client (uid = %u, gid = %u) with version %i.%i",
2281 client->uid, client->gid, (int) client->major,
2282 (int) client->minor);
2283
2284 if (handshake_client->major != LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) {
2285 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNSUPPORTED_VERSION;
2286 }
2287
2288 ret = lttng_dynamic_buffer_append(&client->communication.outbound.buffer,
2289 send_buffer, sizeof(send_buffer));
2290 if (ret) {
2291 ERR("[notification-thread] Failed to send protocol version to notification channel client");
2292 goto end;
2293 }
2294
2295 ret = client_flush_outgoing_queue(client, state);
2296 if (ret) {
2297 goto end;
2298 }
2299
2300 ret = client_send_command_reply(client, state, status);
2301 if (ret) {
2302 ERR("[notification-thread] Failed to send reply to notification channel client");
2303 goto end;
2304 }
2305
2306 /* Set reception state to receive the next message header. */
14fa22f8
JG
2307 ret = client_reset_inbound_state(client);
2308 if (ret) {
2309 ERR("[notification-thread] Failed to reset client communication's inbound state");
2310 goto end;
2311 }
ab0ee2ca
JG
2312 client->validated = true;
2313 break;
2314 }
2315 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
2316 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
2317 {
2318 struct lttng_condition *condition;
2319 enum lttng_notification_channel_status status =
2320 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
2321 const struct lttng_buffer_view condition_view =
2322 lttng_buffer_view_from_dynamic_buffer(
2323 &client->communication.inbound.buffer,
2324 0, -1);
2325 size_t expected_condition_size =
2326 client->communication.inbound.buffer.size;
2327
2328 ret = lttng_condition_create_from_buffer(&condition_view,
2329 &condition);
2330 if (ret != expected_condition_size) {
2331 ERR("[notification-thread] Malformed condition received from client");
2332 goto end;
2333 }
2334
2335 if (client->communication.inbound.msg_type ==
2336 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE) {
ab0ee2ca
JG
2337 ret = notification_thread_client_subscribe(client,
2338 condition, state, &status);
2339 } else {
2340 ret = notification_thread_client_unsubscribe(client,
2341 condition, state, &status);
2342 }
2343 if (ret) {
2344 goto end;
2345 }
2346
2347 ret = client_send_command_reply(client, state, status);
2348 if (ret) {
2349 ERR("[notification-thread] Failed to send reply to notification channel client");
2350 goto end;
2351 }
2352
2353 /* Set reception state to receive the next message header. */
14fa22f8
JG
2354 ret = client_reset_inbound_state(client);
2355 if (ret) {
2356 ERR("[notification-thread] Failed to reset client communication's inbound state");
2357 goto end;
2358 }
ab0ee2ca
JG
2359 break;
2360 }
2361 default:
2362 abort();
2363 }
2364end:
2365 return ret;
2366}
2367
2368/* Incoming data from client. */
2369int handle_notification_thread_client_in(
2370 struct notification_thread_state *state, int socket)
2371{
14fa22f8 2372 int ret = 0;
ab0ee2ca
JG
2373 struct notification_client *client;
2374 ssize_t recv_ret;
2375 size_t offset;
2376
2377 client = get_client_from_socket(socket, state);
2378 if (!client) {
2379 /* Internal error, abort. */
2380 ret = -1;
2381 goto end;
2382 }
2383
14fa22f8
JG
2384 offset = client->communication.inbound.buffer.size -
2385 client->communication.inbound.bytes_to_receive;
01ea340e 2386 if (client->communication.inbound.expect_creds) {
ab0ee2ca
JG
2387 recv_ret = lttcomm_recv_creds_unix_sock(socket,
2388 client->communication.inbound.buffer.data + offset,
2389 client->communication.inbound.bytes_to_receive,
2390 &client->communication.inbound.creds);
2391 if (recv_ret > 0) {
01ea340e 2392 client->communication.inbound.expect_creds = false;
ab0ee2ca
JG
2393 client->communication.inbound.creds_received = true;
2394 }
2395 } else {
2396 recv_ret = lttcomm_recv_unix_sock_non_block(socket,
2397 client->communication.inbound.buffer.data + offset,
2398 client->communication.inbound.bytes_to_receive);
2399 }
2400 if (recv_ret < 0) {
2401 goto error_disconnect_client;
2402 }
2403
2404 client->communication.inbound.bytes_to_receive -= recv_ret;
ab0ee2ca
JG
2405 if (client->communication.inbound.bytes_to_receive == 0) {
2406 ret = client_dispatch_message(client, state);
2407 if (ret) {
2408 /*
2409 * Only returns an error if this client must be
2410 * disconnected.
2411 */
2412 goto error_disconnect_client;
2413 }
2414 } else {
2415 goto end;
2416 }
2417end:
2418 return ret;
2419error_disconnect_client:
2420 ret = handle_notification_thread_client_disconnect(socket, state);
2421 return ret;
2422}
2423
2424/* Client ready to receive outgoing data. */
2425int handle_notification_thread_client_out(
2426 struct notification_thread_state *state, int socket)
2427{
2428 int ret;
2429 struct notification_client *client;
2430
2431 client = get_client_from_socket(socket, state);
2432 if (!client) {
2433 /* Internal error, abort. */
2434 ret = -1;
2435 goto end;
2436 }
2437
2438 ret = client_flush_outgoing_queue(client, state);
2439 if (ret) {
2440 goto end;
2441 }
2442end:
2443 return ret;
2444}
2445
2446static
e8360425
JD
2447bool evaluate_buffer_usage_condition(const struct lttng_condition *condition,
2448 const struct channel_state_sample *sample,
2449 uint64_t buffer_capacity)
ab0ee2ca
JG
2450{
2451 bool result = false;
2452 uint64_t threshold;
2453 enum lttng_condition_type condition_type;
e8360425 2454 const struct lttng_condition_buffer_usage *use_condition = container_of(
ab0ee2ca
JG
2455 condition, struct lttng_condition_buffer_usage,
2456 parent);
2457
ab0ee2ca
JG
2458 if (use_condition->threshold_bytes.set) {
2459 threshold = use_condition->threshold_bytes.value;
2460 } else {
2461 /*
2462 * Threshold was expressed as a ratio.
2463 *
2464 * TODO the threshold (in bytes) of conditions expressed
2465 * as a ratio of total buffer size could be cached to
2466 * forego this double-multiplication or it could be performed
2467 * as fixed-point math.
2468 *
2469 * Note that caching should accomodate the case where the
2470 * condition applies to multiple channels (i.e. don't assume
2471 * that all channels matching my_chann* have the same size...)
2472 */
2473 threshold = (uint64_t) (use_condition->threshold_ratio.value *
2474 (double) buffer_capacity);
2475 }
2476
2477 condition_type = lttng_condition_get_type(condition);
2478 if (condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW) {
2479 DBG("[notification-thread] Low buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
2480 threshold, sample->highest_usage);
2481
2482 /*
2483 * The low condition should only be triggered once _all_ of the
2484 * streams in a channel have gone below the "low" threshold.
2485 */
2486 if (sample->highest_usage <= threshold) {
2487 result = true;
2488 }
2489 } else {
2490 DBG("[notification-thread] High buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
2491 threshold, sample->highest_usage);
2492
2493 /*
2494 * For high buffer usage scenarios, we want to trigger whenever
2495 * _any_ of the streams has reached the "high" threshold.
2496 */
2497 if (sample->highest_usage >= threshold) {
2498 result = true;
2499 }
2500 }
e8360425 2501
ab0ee2ca
JG
2502 return result;
2503}
2504
2505static
e8360425
JD
2506bool evaluate_session_consumed_size_condition(
2507 const struct lttng_condition *condition,
2508 uint64_t session_consumed_size)
2509{
2510 uint64_t threshold;
2511 const struct lttng_condition_session_consumed_size *size_condition =
2512 container_of(condition,
2513 struct lttng_condition_session_consumed_size,
2514 parent);
2515
2516 threshold = size_condition->consumed_threshold_bytes.value;
2517 DBG("[notification-thread] Session consumed size condition being evaluated: threshold = %" PRIu64 ", current size = %" PRIu64,
2518 threshold, session_consumed_size);
2519 return session_consumed_size >= threshold;
2520}
2521
2522static
2523int evaluate_condition(const struct lttng_condition *condition,
ab0ee2ca 2524 struct lttng_evaluation **evaluation,
e8360425
JD
2525 const struct notification_thread_state *state,
2526 const struct channel_state_sample *previous_sample,
2527 const struct channel_state_sample *latest_sample,
2528 uint64_t previous_session_consumed_total,
2529 uint64_t latest_session_consumed_total,
2530 struct channel_info *channel_info)
ab0ee2ca
JG
2531{
2532 int ret = 0;
2533 enum lttng_condition_type condition_type;
e8360425
JD
2534 const bool previous_sample_available = !!previous_sample;
2535 bool previous_sample_result = false;
ab0ee2ca
JG
2536 bool latest_sample_result;
2537
2538 condition_type = lttng_condition_get_type(condition);
ab0ee2ca 2539
e8360425
JD
2540 switch (condition_type) {
2541 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
2542 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
2543 if (caa_likely(previous_sample_available)) {
2544 previous_sample_result =
2545 evaluate_buffer_usage_condition(condition,
2546 previous_sample, channel_info->capacity);
2547 }
2548 latest_sample_result = evaluate_buffer_usage_condition(
2549 condition, latest_sample,
2550 channel_info->capacity);
2551 break;
2552 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
2553 if (caa_likely(previous_sample_available)) {
2554 previous_sample_result =
2555 evaluate_session_consumed_size_condition(
2556 condition,
2557 previous_session_consumed_total);
2558 }
2559 latest_sample_result =
2560 evaluate_session_consumed_size_condition(
2561 condition,
2562 latest_session_consumed_total);
2563 break;
2564 default:
2565 /* Unknown condition type; internal error. */
2566 abort();
2567 }
ab0ee2ca
JG
2568
2569 if (!latest_sample_result ||
2570 (previous_sample_result == latest_sample_result)) {
2571 /*
2572 * Only trigger on a condition evaluation transition.
2573 *
2574 * NOTE: This edge-triggered logic may not be appropriate for
2575 * future condition types.
2576 */
2577 goto end;
2578 }
2579
e8360425
JD
2580 if (!evaluation || !latest_sample_result) {
2581 goto end;
2582 }
2583
2584 switch (condition_type) {
2585 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
2586 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
ab0ee2ca
JG
2587 *evaluation = lttng_evaluation_buffer_usage_create(
2588 condition_type,
2589 latest_sample->highest_usage,
e8360425
JD
2590 channel_info->capacity);
2591 break;
2592 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
2593 *evaluation = lttng_evaluation_session_consumed_size_create(
e8360425
JD
2594 latest_session_consumed_total);
2595 break;
2596 default:
2597 abort();
2598 }
2599
2600 if (!*evaluation) {
2601 ret = -1;
2602 goto end;
ab0ee2ca
JG
2603 }
2604end:
2605 return ret;
2606}
2607
2608static
2609int client_enqueue_dropped_notification(struct notification_client *client,
2610 struct notification_thread_state *state)
2611{
2612 int ret;
2613 struct lttng_notification_channel_message msg = {
2614 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED,
2615 .size = 0,
2616 };
2617
2618 ret = lttng_dynamic_buffer_append(
2619 &client->communication.outbound.buffer, &msg,
2620 sizeof(msg));
2621 return ret;
2622}
2623
2624static
9b63a4aa
JG
2625int send_evaluation_to_clients(const struct lttng_trigger *trigger,
2626 const struct lttng_evaluation *evaluation,
ab0ee2ca
JG
2627 struct notification_client_list* client_list,
2628 struct notification_thread_state *state,
2629 uid_t channel_uid, gid_t channel_gid)
2630{
2631 int ret = 0;
2632 struct lttng_dynamic_buffer msg_buffer;
2633 struct notification_client_list_element *client_list_element, *tmp;
9b63a4aa
JG
2634 const struct lttng_notification notification = {
2635 .condition = (struct lttng_condition *) lttng_trigger_get_const_condition(trigger),
2636 .evaluation = (struct lttng_evaluation *) evaluation,
2637 };
3647288f
JG
2638 struct lttng_notification_channel_message msg_header = {
2639 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION,
2640 };
ab0ee2ca
JG
2641
2642 lttng_dynamic_buffer_init(&msg_buffer);
2643
3647288f
JG
2644 ret = lttng_dynamic_buffer_append(&msg_buffer, &msg_header,
2645 sizeof(msg_header));
ab0ee2ca
JG
2646 if (ret) {
2647 goto end;
2648 }
2649
9b63a4aa 2650 ret = lttng_notification_serialize(&notification, &msg_buffer);
ab0ee2ca 2651 if (ret) {
ab0ee2ca
JG
2652 ERR("[notification-thread] Failed to serialize notification");
2653 ret = -1;
2654 goto end;
2655 }
2656
3647288f
JG
2657 /* Update payload size. */
2658 ((struct lttng_notification_channel_message * ) msg_buffer.data)->size =
2659 (uint32_t) (msg_buffer.size - sizeof(msg_header));
2660
ab0ee2ca
JG
2661 cds_list_for_each_entry_safe(client_list_element, tmp,
2662 &client_list->list, node) {
2663 struct notification_client *client =
2664 client_list_element->client;
2665
2666 if (client->uid != channel_uid && client->gid != channel_gid &&
2667 client->uid != 0) {
2668 /* Client is not allowed to monitor this channel. */
2669 DBG("[notification-thread] Skipping client at it does not have the permission to receive notification for this channel");
2670 continue;
2671 }
2672
2673 DBG("[notification-thread] Sending notification to client (fd = %i, %zu bytes)",
2674 client->socket, msg_buffer.size);
2675 if (client->communication.outbound.buffer.size) {
2676 /*
2677 * Outgoing data is already buffered for this client;
2678 * drop the notification and enqueue a "dropped
2679 * notification" message if this is the first dropped
2680 * notification since the socket spilled-over to the
2681 * queue.
2682 */
2683 DBG("[notification-thread] Dropping notification addressed to client (socket fd = %i)",
2684 client->socket);
2685 if (!client->communication.outbound.dropped_notification) {
2686 client->communication.outbound.dropped_notification = true;
2687 ret = client_enqueue_dropped_notification(
2688 client, state);
2689 if (ret) {
2690 goto end;
2691 }
2692 }
2693 continue;
2694 }
2695
2696 ret = lttng_dynamic_buffer_append_buffer(
2697 &client->communication.outbound.buffer,
2698 &msg_buffer);
2699 if (ret) {
2700 goto end;
2701 }
2702
2703 ret = client_flush_outgoing_queue(client, state);
2704 if (ret) {
2705 goto end;
2706 }
2707 }
2708 ret = 0;
2709end:
ab0ee2ca
JG
2710 lttng_dynamic_buffer_reset(&msg_buffer);
2711 return ret;
2712}
2713
2714int handle_notification_thread_channel_sample(
2715 struct notification_thread_state *state, int pipe,
2716 enum lttng_domain_type domain)
2717{
2718 int ret = 0;
2719 struct lttcomm_consumer_channel_monitor_msg sample_msg;
ab0ee2ca
JG
2720 struct channel_info *channel_info;
2721 struct cds_lfht_node *node;
2722 struct cds_lfht_iter iter;
2723 struct lttng_channel_trigger_list *trigger_list;
2724 struct lttng_trigger_list_element *trigger_list_element;
2725 bool previous_sample_available = false;
e8360425
JD
2726 struct channel_state_sample previous_sample, latest_sample;
2727 uint64_t previous_session_consumed_total, latest_session_consumed_total;
ab0ee2ca
JG
2728
2729 /*
2730 * The monitoring pipe only holds messages smaller than PIPE_BUF,
2731 * ensuring that read/write of sampling messages are atomic.
2732 */
7c2551ef 2733 ret = lttng_read(pipe, &sample_msg, sizeof(sample_msg));
ab0ee2ca
JG
2734 if (ret != sizeof(sample_msg)) {
2735 ERR("[notification-thread] Failed to read from monitoring pipe (fd = %i)",
2736 pipe);
2737 ret = -1;
2738 goto end;
2739 }
2740
2741 ret = 0;
2742 latest_sample.key.key = sample_msg.key;
2743 latest_sample.key.domain = domain;
2744 latest_sample.highest_usage = sample_msg.highest;
2745 latest_sample.lowest_usage = sample_msg.lowest;
e8360425 2746 latest_sample.channel_total_consumed = sample_msg.total_consumed;
ab0ee2ca
JG
2747
2748 rcu_read_lock();
2749
2750 /* Retrieve the channel's informations */
2751 cds_lfht_lookup(state->channels_ht,
2752 hash_channel_key(&latest_sample.key),
2753 match_channel_info,
2754 &latest_sample.key,
2755 &iter);
2756 node = cds_lfht_iter_get_node(&iter);
e0b5f87b 2757 if (caa_unlikely(!node)) {
ab0ee2ca
JG
2758 /*
2759 * Not an error since the consumer can push a sample to the pipe
2760 * and the rest of the session daemon could notify us of the
2761 * channel's destruction before we get a chance to process that
2762 * sample.
2763 */
2764 DBG("[notification-thread] Received a sample for an unknown channel from consumerd, key = %" PRIu64 " in %s domain",
2765 latest_sample.key.key,
2766 domain == LTTNG_DOMAIN_KERNEL ? "kernel" :
2767 "user space");
2768 goto end_unlock;
2769 }
2770 channel_info = caa_container_of(node, struct channel_info,
2771 channels_ht_node);
e8360425 2772 DBG("[notification-thread] Handling channel sample for channel %s (key = %" PRIu64 ") in session %s (highest usage = %" PRIu64 ", lowest usage = %" PRIu64", total consumed = %" PRIu64")",
8abe313a 2773 channel_info->name,
ab0ee2ca 2774 latest_sample.key.key,
8abe313a 2775 channel_info->session_info->name,
ab0ee2ca 2776 latest_sample.highest_usage,
e8360425
JD
2777 latest_sample.lowest_usage,
2778 latest_sample.channel_total_consumed);
2779
2780 previous_session_consumed_total =
2781 channel_info->session_info->consumed_data_size;
ab0ee2ca
JG
2782
2783 /* Retrieve the channel's last sample, if it exists, and update it. */
2784 cds_lfht_lookup(state->channel_state_ht,
2785 hash_channel_key(&latest_sample.key),
2786 match_channel_state_sample,
2787 &latest_sample.key,
2788 &iter);
2789 node = cds_lfht_iter_get_node(&iter);
e0b5f87b 2790 if (caa_likely(node)) {
ab0ee2ca
JG
2791 struct channel_state_sample *stored_sample;
2792
2793 /* Update the sample stored. */
2794 stored_sample = caa_container_of(node,
2795 struct channel_state_sample,
2796 channel_state_ht_node);
e8360425 2797
ab0ee2ca
JG
2798 memcpy(&previous_sample, stored_sample,
2799 sizeof(previous_sample));
2800 stored_sample->highest_usage = latest_sample.highest_usage;
2801 stored_sample->lowest_usage = latest_sample.lowest_usage;
0f0479d7 2802 stored_sample->channel_total_consumed = latest_sample.channel_total_consumed;
ab0ee2ca 2803 previous_sample_available = true;
e8360425
JD
2804
2805 latest_session_consumed_total =
2806 previous_session_consumed_total +
2807 (latest_sample.channel_total_consumed - previous_sample.channel_total_consumed);
ab0ee2ca
JG
2808 } else {
2809 /*
2810 * This is the channel's first sample, allocate space for and
2811 * store the new sample.
2812 */
2813 struct channel_state_sample *stored_sample;
2814
2815 stored_sample = zmalloc(sizeof(*stored_sample));
2816 if (!stored_sample) {
2817 ret = -1;
2818 goto end_unlock;
2819 }
2820
2821 memcpy(stored_sample, &latest_sample, sizeof(*stored_sample));
2822 cds_lfht_node_init(&stored_sample->channel_state_ht_node);
2823 cds_lfht_add(state->channel_state_ht,
2824 hash_channel_key(&stored_sample->key),
2825 &stored_sample->channel_state_ht_node);
e8360425
JD
2826
2827 latest_session_consumed_total =
2828 previous_session_consumed_total +
2829 latest_sample.channel_total_consumed;
ab0ee2ca
JG
2830 }
2831
e8360425
JD
2832 channel_info->session_info->consumed_data_size =
2833 latest_session_consumed_total;
2834
ab0ee2ca
JG
2835 /* Find triggers associated with this channel. */
2836 cds_lfht_lookup(state->channel_triggers_ht,
2837 hash_channel_key(&latest_sample.key),
2838 match_channel_trigger_list,
2839 &latest_sample.key,
2840 &iter);
2841 node = cds_lfht_iter_get_node(&iter);
e0b5f87b 2842 if (caa_likely(!node)) {
ab0ee2ca
JG
2843 goto end_unlock;
2844 }
2845
2846 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
2847 channel_triggers_ht_node);
2848 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
2849 node) {
9b63a4aa
JG
2850 const struct lttng_condition *condition;
2851 const struct lttng_action *action;
2852 const struct lttng_trigger *trigger;
ab0ee2ca
JG
2853 struct notification_client_list *client_list;
2854 struct lttng_evaluation *evaluation = NULL;
2855
2856 trigger = trigger_list_element->trigger;
9b63a4aa 2857 condition = lttng_trigger_get_const_condition(trigger);
ab0ee2ca 2858 assert(condition);
9b63a4aa 2859 action = lttng_trigger_get_const_action(trigger);
ab0ee2ca
JG
2860
2861 /* Notify actions are the only type currently supported. */
9b63a4aa 2862 assert(lttng_action_get_type_const(action) ==
ab0ee2ca
JG
2863 LTTNG_ACTION_TYPE_NOTIFY);
2864
2865 /*
2866 * Check if any client is subscribed to the result of this
2867 * evaluation.
2868 */
2869 cds_lfht_lookup(state->notification_trigger_clients_ht,
2870 lttng_condition_hash(condition),
2871 match_client_list,
2872 trigger,
2873 &iter);
2874 node = cds_lfht_iter_get_node(&iter);
2875 assert(node);
2876
2877 client_list = caa_container_of(node,
2878 struct notification_client_list,
2879 notification_trigger_ht_node);
2880 if (cds_list_empty(&client_list->list)) {
2881 /*
2882 * No clients interested in the evaluation's result,
2883 * skip it.
2884 */
2885 continue;
2886 }
2887
2888 ret = evaluate_condition(condition, &evaluation, state,
2889 previous_sample_available ? &previous_sample : NULL,
e8360425
JD
2890 &latest_sample,
2891 previous_session_consumed_total,
2892 latest_session_consumed_total,
2893 channel_info);
2894 if (caa_unlikely(ret)) {
ab0ee2ca
JG
2895 goto end_unlock;
2896 }
2897
e8360425 2898 if (caa_likely(!evaluation)) {
ab0ee2ca
JG
2899 continue;
2900 }
2901
2902 /* Dispatch evaluation result to all clients. */
2903 ret = send_evaluation_to_clients(trigger_list_element->trigger,
2904 evaluation, client_list, state,
8abe313a
JG
2905 channel_info->session_info->uid,
2906 channel_info->session_info->gid);
2907 lttng_evaluation_destroy(evaluation);
e0b5f87b 2908 if (caa_unlikely(ret)) {
ab0ee2ca
JG
2909 goto end_unlock;
2910 }
2911 }
2912end_unlock:
2913 rcu_read_unlock();
2914end:
2915 return ret;
2916}
This page took 0.14947 seconds and 4 git commands to generate.