Fix: return 0 on successful location serialization
[lttng-tools.git] / src / bin / lttng-sessiond / notification-thread-events.c
CommitLineData
ab0ee2ca
JG
1/*
2 * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18#define _LGPL_SOURCE
19#include <urcu.h>
20#include <urcu/rculfhash.h>
21
ab0ee2ca
JG
22#include <common/defaults.h>
23#include <common/error.h>
24#include <common/futex.h>
25#include <common/unix.h>
26#include <common/dynamic-buffer.h>
27#include <common/hashtable/utils.h>
28#include <common/sessiond-comm/sessiond-comm.h>
29#include <common/macros.h>
30#include <lttng/condition/condition.h>
9b63a4aa 31#include <lttng/action/action-internal.h>
ab0ee2ca
JG
32#include <lttng/notification/notification-internal.h>
33#include <lttng/condition/condition-internal.h>
34#include <lttng/condition/buffer-usage-internal.h>
e8360425 35#include <lttng/condition/session-consumed-size-internal.h>
ea9a44f0 36#include <lttng/condition/session-rotation-internal.h>
ab0ee2ca 37#include <lttng/notification/channel-internal.h>
1da26331 38
ab0ee2ca
JG
39#include <time.h>
40#include <unistd.h>
41#include <assert.h>
42#include <inttypes.h>
d53ea4e4 43#include <fcntl.h>
ab0ee2ca 44
1da26331
JG
45#include "notification-thread.h"
46#include "notification-thread-events.h"
47#include "notification-thread-commands.h"
48#include "lttng-sessiond.h"
49#include "kernel.h"
50
ab0ee2ca
JG
51#define CLIENT_POLL_MASK_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
52#define CLIENT_POLL_MASK_IN_OUT (CLIENT_POLL_MASK_IN | LPOLLOUT)
53
51eab943
JG
54enum lttng_object_type {
55 LTTNG_OBJECT_TYPE_UNKNOWN,
56 LTTNG_OBJECT_TYPE_NONE,
57 LTTNG_OBJECT_TYPE_CHANNEL,
58 LTTNG_OBJECT_TYPE_SESSION,
59};
60
ab0ee2ca 61struct lttng_trigger_list_element {
9e0bb80e
JG
62 /* No ownership of the trigger object is assumed. */
63 const struct lttng_trigger *trigger;
ab0ee2ca
JG
64 struct cds_list_head node;
65};
66
67struct lttng_channel_trigger_list {
68 struct channel_key channel_key;
ea9a44f0 69 /* List of struct lttng_trigger_list_element. */
ab0ee2ca 70 struct cds_list_head list;
ea9a44f0 71 /* Node in the channel_triggers_ht */
ab0ee2ca
JG
72 struct cds_lfht_node channel_triggers_ht_node;
73};
74
ea9a44f0
JG
75/*
76 * List of triggers applying to a given session.
77 *
78 * See:
79 * - lttng_session_trigger_list_create()
80 * - lttng_session_trigger_list_build()
81 * - lttng_session_trigger_list_destroy()
82 * - lttng_session_trigger_list_add()
83 */
84struct lttng_session_trigger_list {
85 /*
86 * Not owned by this; points to the session_info structure's
87 * session name.
88 */
89 const char *session_name;
90 /* List of struct lttng_trigger_list_element. */
91 struct cds_list_head list;
92 /* Node in the session_triggers_ht */
93 struct cds_lfht_node session_triggers_ht_node;
94 /*
95 * Weak reference to the notification system's session triggers
96 * hashtable.
97 *
98 * The session trigger list structure structure is owned by
99 * the session's session_info.
100 *
101 * The session_info is kept alive the the channel_infos holding a
102 * reference to it (reference counting). When those channels are
103 * destroyed (at runtime or on teardown), the reference they hold
104 * to the session_info are released. On destruction of session_info,
105 * session_info_destroy() will remove the list of triggers applying
106 * to this session from the notification system's state.
107 *
108 * This implies that the session_triggers_ht must be destroyed
109 * after the channels.
110 */
111 struct cds_lfht *session_triggers_ht;
112 /* Used for delayed RCU reclaim. */
113 struct rcu_head rcu_node;
114};
115
ab0ee2ca
JG
116struct lttng_trigger_ht_element {
117 struct lttng_trigger *trigger;
118 struct cds_lfht_node node;
119};
120
121struct lttng_condition_list_element {
122 struct lttng_condition *condition;
123 struct cds_list_head node;
124};
125
126struct notification_client_list_element {
127 struct notification_client *client;
128 struct cds_list_head node;
129};
130
131struct notification_client_list {
132 struct lttng_trigger *trigger;
133 struct cds_list_head list;
134 struct cds_lfht_node notification_trigger_ht_node;
135};
136
137struct notification_client {
138 int socket;
139 /* Client protocol version. */
140 uint8_t major, minor;
141 uid_t uid;
142 gid_t gid;
143 /*
5332364f 144 * Indicates if the credentials and versions of the client have been
ab0ee2ca
JG
145 * checked.
146 */
147 bool validated;
148 /*
149 * Conditions to which the client's notification channel is subscribed.
150 * List of struct lttng_condition_list_node. The condition member is
151 * owned by the client.
152 */
153 struct cds_list_head condition_list;
154 struct cds_lfht_node client_socket_ht_node;
155 struct {
156 struct {
14fa22f8
JG
157 /*
158 * During the reception of a message, the reception
159 * buffers' "size" is set to contain the current
160 * message's complete payload.
161 */
ab0ee2ca
JG
162 struct lttng_dynamic_buffer buffer;
163 /* Bytes left to receive for the current message. */
164 size_t bytes_to_receive;
165 /* Type of the message being received. */
166 enum lttng_notification_channel_message_type msg_type;
167 /*
168 * Indicates whether or not credentials are expected
169 * from the client.
170 */
01ea340e 171 bool expect_creds;
ab0ee2ca
JG
172 /*
173 * Indicates whether or not credentials were received
174 * from the client.
175 */
176 bool creds_received;
14fa22f8 177 /* Only used during credentials reception. */
ab0ee2ca
JG
178 lttng_sock_cred creds;
179 } inbound;
180 struct {
181 /*
182 * Indicates whether or not a notification addressed to
183 * this client was dropped because a command reply was
184 * already buffered.
185 *
186 * A notification is dropped whenever the buffer is not
187 * empty.
188 */
189 bool dropped_notification;
190 /*
191 * Indicates whether or not a command reply is already
192 * buffered. In this case, it means that the client is
193 * not consuming command replies before emitting a new
194 * one. This could be caused by a protocol error or a
195 * misbehaving/malicious client.
196 */
197 bool queued_command_reply;
198 struct lttng_dynamic_buffer buffer;
199 } outbound;
200 } communication;
201};
202
203struct channel_state_sample {
204 struct channel_key key;
205 struct cds_lfht_node channel_state_ht_node;
206 uint64_t highest_usage;
207 uint64_t lowest_usage;
e8360425 208 uint64_t channel_total_consumed;
ab0ee2ca
JG
209};
210
e4db5ace 211static unsigned long hash_channel_key(struct channel_key *key);
e8360425 212static int evaluate_condition(const struct lttng_condition *condition,
e4db5ace 213 struct lttng_evaluation **evaluation,
e8360425
JD
214 const struct notification_thread_state *state,
215 const struct channel_state_sample *previous_sample,
216 const struct channel_state_sample *latest_sample,
217 uint64_t previous_session_consumed_total,
218 uint64_t latest_session_consumed_total,
219 struct channel_info *channel_info);
e4db5ace 220static
9b63a4aa
JG
221int send_evaluation_to_clients(const struct lttng_trigger *trigger,
222 const struct lttng_evaluation *evaluation,
e4db5ace
JR
223 struct notification_client_list *client_list,
224 struct notification_thread_state *state,
225 uid_t channel_uid, gid_t channel_gid);
226
8abe313a 227
ea9a44f0 228/* session_info API */
8abe313a
JG
229static
230void session_info_destroy(void *_data);
231static
232void session_info_get(struct session_info *session_info);
233static
234void session_info_put(struct session_info *session_info);
235static
236struct session_info *session_info_create(const char *name,
ea9a44f0 237 uid_t uid, gid_t gid,
1eee26c5
JG
238 struct lttng_session_trigger_list *trigger_list,
239 struct cds_lfht *sessions_ht);
8abe313a
JG
240static
241void session_info_add_channel(struct session_info *session_info,
242 struct channel_info *channel_info);
243static
244void session_info_remove_channel(struct session_info *session_info,
245 struct channel_info *channel_info);
246
ea9a44f0
JG
247/* lttng_session_trigger_list API */
248static
249struct lttng_session_trigger_list *lttng_session_trigger_list_create(
250 const char *session_name,
251 struct cds_lfht *session_triggers_ht);
252static
253struct lttng_session_trigger_list *lttng_session_trigger_list_build(
254 const struct notification_thread_state *state,
255 const char *session_name);
256static
257void lttng_session_trigger_list_destroy(
258 struct lttng_session_trigger_list *list);
259static
260int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
261 const struct lttng_trigger *trigger);
262
263
ab0ee2ca
JG
264static
265int match_client(struct cds_lfht_node *node, const void *key)
266{
267 /* This double-cast is intended to supress pointer-to-cast warning. */
268 int socket = (int) (intptr_t) key;
269 struct notification_client *client;
270
271 client = caa_container_of(node, struct notification_client,
272 client_socket_ht_node);
273
274 return !!(client->socket == socket);
275}
276
277static
278int match_channel_trigger_list(struct cds_lfht_node *node, const void *key)
279{
280 struct channel_key *channel_key = (struct channel_key *) key;
281 struct lttng_channel_trigger_list *trigger_list;
282
283 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
284 channel_triggers_ht_node);
285
286 return !!((channel_key->key == trigger_list->channel_key.key) &&
287 (channel_key->domain == trigger_list->channel_key.domain));
288}
289
51eab943
JG
290static
291int match_session_trigger_list(struct cds_lfht_node *node, const void *key)
292{
293 const char *session_name = (const char *) key;
294 struct lttng_session_trigger_list *trigger_list;
295
296 trigger_list = caa_container_of(node, struct lttng_session_trigger_list,
297 session_triggers_ht_node);
298
299 return !!(strcmp(trigger_list->session_name, session_name) == 0);
300}
301
ab0ee2ca
JG
302static
303int match_channel_state_sample(struct cds_lfht_node *node, const void *key)
304{
305 struct channel_key *channel_key = (struct channel_key *) key;
306 struct channel_state_sample *sample;
307
308 sample = caa_container_of(node, struct channel_state_sample,
309 channel_state_ht_node);
310
311 return !!((channel_key->key == sample->key.key) &&
312 (channel_key->domain == sample->key.domain));
313}
314
315static
316int match_channel_info(struct cds_lfht_node *node, const void *key)
317{
318 struct channel_key *channel_key = (struct channel_key *) key;
319 struct channel_info *channel_info;
320
321 channel_info = caa_container_of(node, struct channel_info,
322 channels_ht_node);
323
324 return !!((channel_key->key == channel_info->key.key) &&
325 (channel_key->domain == channel_info->key.domain));
326}
327
328static
329int match_condition(struct cds_lfht_node *node, const void *key)
330{
331 struct lttng_condition *condition_key = (struct lttng_condition *) key;
332 struct lttng_trigger_ht_element *trigger;
333 struct lttng_condition *condition;
334
335 trigger = caa_container_of(node, struct lttng_trigger_ht_element,
336 node);
337 condition = lttng_trigger_get_condition(trigger->trigger);
338 assert(condition);
339
340 return !!lttng_condition_is_equal(condition_key, condition);
341}
342
343static
344int match_client_list(struct cds_lfht_node *node, const void *key)
345{
346 struct lttng_trigger *trigger_key = (struct lttng_trigger *) key;
347 struct notification_client_list *client_list;
348 struct lttng_condition *condition;
349 struct lttng_condition *condition_key = lttng_trigger_get_condition(
350 trigger_key);
351
352 assert(condition_key);
353
354 client_list = caa_container_of(node, struct notification_client_list,
355 notification_trigger_ht_node);
356 condition = lttng_trigger_get_condition(client_list->trigger);
357
358 return !!lttng_condition_is_equal(condition_key, condition);
359}
360
361static
362int match_client_list_condition(struct cds_lfht_node *node, const void *key)
363{
364 struct lttng_condition *condition_key = (struct lttng_condition *) key;
365 struct notification_client_list *client_list;
366 struct lttng_condition *condition;
367
368 assert(condition_key);
369
370 client_list = caa_container_of(node, struct notification_client_list,
371 notification_trigger_ht_node);
372 condition = lttng_trigger_get_condition(client_list->trigger);
373
374 return !!lttng_condition_is_equal(condition_key, condition);
375}
376
377static
378unsigned long lttng_condition_buffer_usage_hash(
9b63a4aa 379 const struct lttng_condition *_condition)
ab0ee2ca 380{
9a2746aa
JG
381 unsigned long hash;
382 unsigned long condition_type;
ab0ee2ca
JG
383 struct lttng_condition_buffer_usage *condition;
384
385 condition = container_of(_condition,
386 struct lttng_condition_buffer_usage, parent);
387
9a2746aa
JG
388 condition_type = (unsigned long) condition->parent.type;
389 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
ab0ee2ca
JG
390 if (condition->session_name) {
391 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
392 }
393 if (condition->channel_name) {
8f56701f 394 hash ^= hash_key_str(condition->channel_name, lttng_ht_seed);
ab0ee2ca
JG
395 }
396 if (condition->domain.set) {
397 hash ^= hash_key_ulong(
398 (void *) condition->domain.type,
399 lttng_ht_seed);
400 }
401 if (condition->threshold_ratio.set) {
402 uint64_t val;
403
404 val = condition->threshold_ratio.value * (double) UINT32_MAX;
405 hash ^= hash_key_u64(&val, lttng_ht_seed);
6633b0dd 406 } else if (condition->threshold_bytes.set) {
ab0ee2ca
JG
407 uint64_t val;
408
409 val = condition->threshold_bytes.value;
410 hash ^= hash_key_u64(&val, lttng_ht_seed);
411 }
412 return hash;
413}
414
e8360425
JD
415static
416unsigned long lttng_condition_session_consumed_size_hash(
9b63a4aa 417 const struct lttng_condition *_condition)
e8360425 418{
9a2746aa
JG
419 unsigned long hash;
420 unsigned long condition_type =
421 (unsigned long) LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE;
e8360425
JD
422 struct lttng_condition_session_consumed_size *condition;
423 uint64_t val;
424
425 condition = container_of(_condition,
426 struct lttng_condition_session_consumed_size, parent);
427
9a2746aa 428 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
e8360425
JD
429 if (condition->session_name) {
430 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
431 }
432 val = condition->consumed_threshold_bytes.value;
433 hash ^= hash_key_u64(&val, lttng_ht_seed);
434 return hash;
435}
436
a8393880
JG
437static
438unsigned long lttng_condition_session_rotation_hash(
439 const struct lttng_condition *_condition)
440{
441 unsigned long hash, condition_type;
442 struct lttng_condition_session_rotation *condition;
443
444 condition = container_of(_condition,
445 struct lttng_condition_session_rotation, parent);
446 condition_type = (unsigned long) condition->parent.type;
447 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
448 assert(condition->session_name);
449 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
450 return hash;
451}
9a2746aa 452
ab0ee2ca
JG
453/*
454 * The lttng_condition hashing code is kept in this file (rather than
455 * condition.c) since it makes use of GPLv2 code (hashtable utils), which we
456 * don't want to link in liblttng-ctl.
457 */
458static
9b63a4aa 459unsigned long lttng_condition_hash(const struct lttng_condition *condition)
ab0ee2ca
JG
460{
461 switch (condition->type) {
462 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
463 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
464 return lttng_condition_buffer_usage_hash(condition);
e8360425
JD
465 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
466 return lttng_condition_session_consumed_size_hash(condition);
a8393880
JG
467 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
468 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
469 return lttng_condition_session_rotation_hash(condition);
ab0ee2ca
JG
470 default:
471 ERR("[notification-thread] Unexpected condition type caught");
472 abort();
473 }
474}
475
e4db5ace
JR
476static
477unsigned long hash_channel_key(struct channel_key *key)
478{
479 unsigned long key_hash = hash_key_u64(&key->key, lttng_ht_seed);
480 unsigned long domain_hash = hash_key_ulong(
481 (void *) (unsigned long) key->domain, lttng_ht_seed);
482
483 return key_hash ^ domain_hash;
484}
485
ab0ee2ca
JG
486static
487void channel_info_destroy(struct channel_info *channel_info)
488{
489 if (!channel_info) {
490 return;
491 }
492
8abe313a
JG
493 if (channel_info->session_info) {
494 session_info_remove_channel(channel_info->session_info,
495 channel_info);
496 session_info_put(channel_info->session_info);
ab0ee2ca 497 }
8abe313a
JG
498 if (channel_info->name) {
499 free(channel_info->name);
ab0ee2ca
JG
500 }
501 free(channel_info);
502}
503
8abe313a 504/* Don't call directly, use the ref-counting mechanism. */
ab0ee2ca 505static
8abe313a 506void session_info_destroy(void *_data)
ab0ee2ca 507{
8abe313a 508 struct session_info *session_info = _data;
3b68d0a3 509 int ret;
ab0ee2ca 510
8abe313a
JG
511 assert(session_info);
512 if (session_info->channel_infos_ht) {
3b68d0a3
JR
513 ret = cds_lfht_destroy(session_info->channel_infos_ht, NULL);
514 if (ret) {
4c3f302b 515 ERR("[notification-thread] Failed to destroy channel information hash table");
3b68d0a3 516 }
8abe313a 517 }
ea9a44f0 518 lttng_session_trigger_list_destroy(session_info->trigger_list);
1eee26c5
JG
519
520 rcu_read_lock();
521 cds_lfht_del(session_info->sessions_ht,
522 &session_info->sessions_ht_node);
523 rcu_read_unlock();
8abe313a
JG
524 free(session_info->name);
525 free(session_info);
526}
ab0ee2ca 527
8abe313a
JG
528static
529void session_info_get(struct session_info *session_info)
530{
531 if (!session_info) {
532 return;
533 }
534 lttng_ref_get(&session_info->ref);
535}
536
537static
538void session_info_put(struct session_info *session_info)
539{
540 if (!session_info) {
541 return;
ab0ee2ca 542 }
8abe313a
JG
543 lttng_ref_put(&session_info->ref);
544}
545
546static
ea9a44f0 547struct session_info *session_info_create(const char *name, uid_t uid, gid_t gid,
1eee26c5
JG
548 struct lttng_session_trigger_list *trigger_list,
549 struct cds_lfht *sessions_ht)
8abe313a
JG
550{
551 struct session_info *session_info;
ab0ee2ca 552
8abe313a 553 assert(name);
ab0ee2ca 554
8abe313a
JG
555 session_info = zmalloc(sizeof(*session_info));
556 if (!session_info) {
557 goto end;
558 }
559 lttng_ref_init(&session_info->ref, session_info_destroy);
560
561 session_info->channel_infos_ht = cds_lfht_new(DEFAULT_HT_SIZE,
562 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
563 if (!session_info->channel_infos_ht) {
ab0ee2ca
JG
564 goto error;
565 }
8abe313a
JG
566
567 cds_lfht_node_init(&session_info->sessions_ht_node);
568 session_info->name = strdup(name);
569 if (!session_info->name) {
ab0ee2ca
JG
570 goto error;
571 }
8abe313a
JG
572 session_info->uid = uid;
573 session_info->gid = gid;
ea9a44f0 574 session_info->trigger_list = trigger_list;
1eee26c5 575 session_info->sessions_ht = sessions_ht;
8abe313a
JG
576end:
577 return session_info;
578error:
579 session_info_put(session_info);
580 return NULL;
581}
582
583static
584void session_info_add_channel(struct session_info *session_info,
585 struct channel_info *channel_info)
586{
587 rcu_read_lock();
588 cds_lfht_add(session_info->channel_infos_ht,
589 hash_channel_key(&channel_info->key),
590 &channel_info->session_info_channels_ht_node);
591 rcu_read_unlock();
592}
593
594static
595void session_info_remove_channel(struct session_info *session_info,
596 struct channel_info *channel_info)
597{
598 rcu_read_lock();
599 cds_lfht_del(session_info->channel_infos_ht,
600 &channel_info->session_info_channels_ht_node);
601 rcu_read_unlock();
602}
603
604static
605struct channel_info *channel_info_create(const char *channel_name,
606 struct channel_key *channel_key, uint64_t channel_capacity,
607 struct session_info *session_info)
608{
609 struct channel_info *channel_info = zmalloc(sizeof(*channel_info));
610
611 if (!channel_info) {
612 goto end;
613 }
614
ab0ee2ca 615 cds_lfht_node_init(&channel_info->channels_ht_node);
8abe313a
JG
616 cds_lfht_node_init(&channel_info->session_info_channels_ht_node);
617 memcpy(&channel_info->key, channel_key, sizeof(*channel_key));
618 channel_info->capacity = channel_capacity;
619
620 channel_info->name = strdup(channel_name);
621 if (!channel_info->name) {
622 goto error;
623 }
624
625 /*
626 * Set the references between session and channel infos:
627 * - channel_info holds a strong reference to session_info
628 * - session_info holds a weak reference to channel_info
629 */
630 session_info_get(session_info);
631 session_info_add_channel(session_info, channel_info);
632 channel_info->session_info = session_info;
ab0ee2ca 633end:
8abe313a 634 return channel_info;
ab0ee2ca 635error:
8abe313a 636 channel_info_destroy(channel_info);
ab0ee2ca
JG
637 return NULL;
638}
639
e4db5ace
JR
640/* This function must be called with the RCU read lock held. */
641static
642int evaluate_condition_for_client(struct lttng_trigger *trigger,
643 struct lttng_condition *condition,
644 struct notification_client *client,
645 struct notification_thread_state *state)
646{
647 int ret;
648 struct cds_lfht_iter iter;
649 struct cds_lfht_node *node;
650 struct channel_info *channel_info = NULL;
651 struct channel_key *channel_key = NULL;
652 struct channel_state_sample *last_sample = NULL;
653 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
654 struct lttng_evaluation *evaluation = NULL;
655 struct notification_client_list client_list = { 0 };
656 struct notification_client_list_element client_list_element = { 0 };
657
658 assert(trigger);
659 assert(condition);
660 assert(client);
661 assert(state);
662
663 /* Find the channel associated with the trigger. */
664 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter,
665 channel_trigger_list , channel_triggers_ht_node) {
666 struct lttng_trigger_list_element *element;
667
668 cds_list_for_each_entry(element, &channel_trigger_list->list, node) {
9b63a4aa
JG
669 const struct lttng_condition *current_condition =
670 lttng_trigger_get_const_condition(
e4db5ace
JR
671 element->trigger);
672
673 assert(current_condition);
674 if (!lttng_condition_is_equal(condition,
2ae99f0b 675 current_condition)) {
e4db5ace
JR
676 continue;
677 }
678
679 /* Found the trigger, save the channel key. */
680 channel_key = &channel_trigger_list->channel_key;
681 break;
682 }
683 if (channel_key) {
684 /* The channel key was found stop iteration. */
685 break;
686 }
687 }
688
689 if (!channel_key){
690 /* No channel found; normal exit. */
691 DBG("[notification-thread] No channel associated with newly subscribed-to condition");
692 ret = 0;
693 goto end;
694 }
695
696 /* Fetch channel info for the matching channel. */
697 cds_lfht_lookup(state->channels_ht,
698 hash_channel_key(channel_key),
699 match_channel_info,
700 channel_key,
701 &iter);
702 node = cds_lfht_iter_get_node(&iter);
703 assert(node);
704 channel_info = caa_container_of(node, struct channel_info,
705 channels_ht_node);
706
707 /* Retrieve the channel's last sample, if it exists. */
708 cds_lfht_lookup(state->channel_state_ht,
709 hash_channel_key(channel_key),
710 match_channel_state_sample,
711 channel_key,
712 &iter);
713 node = cds_lfht_iter_get_node(&iter);
714 if (node) {
715 last_sample = caa_container_of(node,
716 struct channel_state_sample,
717 channel_state_ht_node);
718 } else {
719 /* Nothing to evaluate, no sample was ever taken. Normal exit */
720 DBG("[notification-thread] No channel sample associated with newly subscribed-to condition");
721 ret = 0;
722 goto end;
723 }
724
e8360425
JD
725 ret = evaluate_condition(condition, &evaluation, state,
726 NULL, last_sample,
727 0, channel_info->session_info->consumed_data_size,
728 channel_info);
e4db5ace 729 if (ret) {
066a3c82 730 WARN("[notification-thread] Fatal error occurred while evaluating a newly subscribed-to condition");
e4db5ace
JR
731 goto end;
732 }
733
734 if (!evaluation) {
735 /* Evaluation yielded nothing. Normal exit. */
736 DBG("[notification-thread] Newly subscribed-to condition evaluated to false, nothing to report to client");
737 ret = 0;
738 goto end;
739 }
740
741 /*
742 * Create a temporary client list with the client currently
743 * subscribing.
744 */
745 cds_lfht_node_init(&client_list.notification_trigger_ht_node);
746 CDS_INIT_LIST_HEAD(&client_list.list);
747 client_list.trigger = trigger;
748
749 CDS_INIT_LIST_HEAD(&client_list_element.node);
750 client_list_element.client = client;
751 cds_list_add(&client_list_element.node, &client_list.list);
752
753 /* Send evaluation result to the newly-subscribed client. */
754 DBG("[notification-thread] Newly subscribed-to condition evaluated to true, notifying client");
755 ret = send_evaluation_to_clients(trigger, evaluation, &client_list,
8abe313a
JG
756 state, channel_info->session_info->uid,
757 channel_info->session_info->gid);
e4db5ace
JR
758
759end:
760 return ret;
761}
762
ab0ee2ca
JG
763static
764int notification_thread_client_subscribe(struct notification_client *client,
765 struct lttng_condition *condition,
766 struct notification_thread_state *state,
767 enum lttng_notification_channel_status *_status)
768{
769 int ret = 0;
770 struct cds_lfht_iter iter;
771 struct cds_lfht_node *node;
772 struct notification_client_list *client_list;
773 struct lttng_condition_list_element *condition_list_element = NULL;
774 struct notification_client_list_element *client_list_element = NULL;
775 enum lttng_notification_channel_status status =
776 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
777
778 /*
779 * Ensure that the client has not already subscribed to this condition
780 * before.
781 */
782 cds_list_for_each_entry(condition_list_element, &client->condition_list, node) {
783 if (lttng_condition_is_equal(condition_list_element->condition,
784 condition)) {
785 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ALREADY_SUBSCRIBED;
786 goto end;
787 }
788 }
789
790 condition_list_element = zmalloc(sizeof(*condition_list_element));
791 if (!condition_list_element) {
792 ret = -1;
793 goto error;
794 }
795 client_list_element = zmalloc(sizeof(*client_list_element));
796 if (!client_list_element) {
797 ret = -1;
798 goto error;
799 }
800
801 rcu_read_lock();
802
803 /*
804 * Add the newly-subscribed condition to the client's subscription list.
805 */
806 CDS_INIT_LIST_HEAD(&condition_list_element->node);
807 condition_list_element->condition = condition;
808 cds_list_add(&condition_list_element->node, &client->condition_list);
809
ab0ee2ca
JG
810 cds_lfht_lookup(state->notification_trigger_clients_ht,
811 lttng_condition_hash(condition),
812 match_client_list_condition,
813 condition,
814 &iter);
815 node = cds_lfht_iter_get_node(&iter);
816 if (!node) {
2ae99f0b
JG
817 /*
818 * No notification-emiting trigger registered with this
819 * condition. We don't evaluate the condition right away
820 * since this trigger is not registered yet.
821 */
4fb43b68 822 free(client_list_element);
ab0ee2ca
JG
823 goto end_unlock;
824 }
825
826 client_list = caa_container_of(node, struct notification_client_list,
827 notification_trigger_ht_node);
2ae99f0b
JG
828 /*
829 * The condition to which the client just subscribed is evaluated
830 * at this point so that conditions that are already TRUE result
831 * in a notification being sent out.
832 */
e4db5ace
JR
833 if (evaluate_condition_for_client(client_list->trigger, condition,
834 client, state)) {
835 WARN("[notification-thread] Evaluation of a condition on client subscription failed, aborting.");
836 ret = -1;
4bd5b4af 837 free(client_list_element);
e4db5ace
JR
838 goto end_unlock;
839 }
840
841 /*
842 * Add the client to the list of clients interested in a given trigger
843 * if a "notification" trigger with a corresponding condition was
844 * added prior.
845 */
ab0ee2ca
JG
846 client_list_element->client = client;
847 CDS_INIT_LIST_HEAD(&client_list_element->node);
848 cds_list_add(&client_list_element->node, &client_list->list);
849end_unlock:
850 rcu_read_unlock();
851end:
852 if (_status) {
853 *_status = status;
854 }
855 return ret;
856error:
857 free(condition_list_element);
858 free(client_list_element);
859 return ret;
860}
861
862static
863int notification_thread_client_unsubscribe(
864 struct notification_client *client,
865 struct lttng_condition *condition,
866 struct notification_thread_state *state,
867 enum lttng_notification_channel_status *_status)
868{
869 struct cds_lfht_iter iter;
870 struct cds_lfht_node *node;
871 struct notification_client_list *client_list;
872 struct lttng_condition_list_element *condition_list_element,
873 *condition_tmp;
874 struct notification_client_list_element *client_list_element,
875 *client_tmp;
876 bool condition_found = false;
877 enum lttng_notification_channel_status status =
878 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
879
880 /* Remove the condition from the client's condition list. */
881 cds_list_for_each_entry_safe(condition_list_element, condition_tmp,
882 &client->condition_list, node) {
883 if (!lttng_condition_is_equal(condition_list_element->condition,
884 condition)) {
885 continue;
886 }
887
888 cds_list_del(&condition_list_element->node);
889 /*
890 * The caller may be iterating on the client's conditions to
891 * tear down a client's connection. In this case, the condition
892 * will be destroyed at the end.
893 */
894 if (condition != condition_list_element->condition) {
895 lttng_condition_destroy(
896 condition_list_element->condition);
897 }
898 free(condition_list_element);
899 condition_found = true;
900 break;
901 }
902
903 if (!condition_found) {
904 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNKNOWN_CONDITION;
905 goto end;
906 }
907
908 /*
909 * Remove the client from the list of clients interested the trigger
910 * matching the condition.
911 */
912 rcu_read_lock();
913 cds_lfht_lookup(state->notification_trigger_clients_ht,
914 lttng_condition_hash(condition),
915 match_client_list_condition,
916 condition,
917 &iter);
918 node = cds_lfht_iter_get_node(&iter);
919 if (!node) {
920 goto end_unlock;
921 }
922
923 client_list = caa_container_of(node, struct notification_client_list,
924 notification_trigger_ht_node);
925 cds_list_for_each_entry_safe(client_list_element, client_tmp,
926 &client_list->list, node) {
927 if (client_list_element->client->socket != client->socket) {
928 continue;
929 }
930 cds_list_del(&client_list_element->node);
931 free(client_list_element);
932 break;
933 }
934end_unlock:
935 rcu_read_unlock();
936end:
937 lttng_condition_destroy(condition);
938 if (_status) {
939 *_status = status;
940 }
941 return 0;
942}
943
944static
945void notification_client_destroy(struct notification_client *client,
946 struct notification_thread_state *state)
947{
948 struct lttng_condition_list_element *condition_list_element, *tmp;
949
950 if (!client) {
951 return;
952 }
953
954 /* Release all conditions to which the client was subscribed. */
955 cds_list_for_each_entry_safe(condition_list_element, tmp,
956 &client->condition_list, node) {
957 (void) notification_thread_client_unsubscribe(client,
958 condition_list_element->condition, state, NULL);
959 }
960
961 if (client->socket >= 0) {
962 (void) lttcomm_close_unix_sock(client->socket);
963 }
964 lttng_dynamic_buffer_reset(&client->communication.inbound.buffer);
965 lttng_dynamic_buffer_reset(&client->communication.outbound.buffer);
966 free(client);
967}
968
969/*
970 * Call with rcu_read_lock held (and hold for the lifetime of the returned
971 * client pointer).
972 */
973static
974struct notification_client *get_client_from_socket(int socket,
975 struct notification_thread_state *state)
976{
977 struct cds_lfht_iter iter;
978 struct cds_lfht_node *node;
979 struct notification_client *client = NULL;
980
981 cds_lfht_lookup(state->client_socket_ht,
982 hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed),
983 match_client,
984 (void *) (unsigned long) socket,
985 &iter);
986 node = cds_lfht_iter_get_node(&iter);
987 if (!node) {
988 goto end;
989 }
990
991 client = caa_container_of(node, struct notification_client,
992 client_socket_ht_node);
993end:
994 return client;
995}
996
997static
e8360425 998bool buffer_usage_condition_applies_to_channel(
51eab943
JG
999 const struct lttng_condition *condition,
1000 const struct channel_info *channel_info)
ab0ee2ca
JG
1001{
1002 enum lttng_condition_status status;
e8360425
JD
1003 enum lttng_domain_type condition_domain;
1004 const char *condition_session_name = NULL;
1005 const char *condition_channel_name = NULL;
ab0ee2ca 1006
e8360425
JD
1007 status = lttng_condition_buffer_usage_get_domain_type(condition,
1008 &condition_domain);
1009 assert(status == LTTNG_CONDITION_STATUS_OK);
1010 if (channel_info->key.domain != condition_domain) {
ab0ee2ca
JG
1011 goto fail;
1012 }
1013
e8360425
JD
1014 status = lttng_condition_buffer_usage_get_session_name(
1015 condition, &condition_session_name);
1016 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
1017
1018 status = lttng_condition_buffer_usage_get_channel_name(
1019 condition, &condition_channel_name);
1020 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_channel_name);
1021
1022 if (strcmp(channel_info->session_info->name, condition_session_name)) {
1023 goto fail;
1024 }
1025 if (strcmp(channel_info->name, condition_channel_name)) {
ab0ee2ca
JG
1026 goto fail;
1027 }
1028
e8360425
JD
1029 return true;
1030fail:
1031 return false;
1032}
1033
1034static
1035bool session_consumed_size_condition_applies_to_channel(
51eab943
JG
1036 const struct lttng_condition *condition,
1037 const struct channel_info *channel_info)
e8360425
JD
1038{
1039 enum lttng_condition_status status;
1040 const char *condition_session_name = NULL;
1041
1042 status = lttng_condition_session_consumed_size_get_session_name(
1043 condition, &condition_session_name);
1044 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
1045
1046 if (strcmp(channel_info->session_info->name, condition_session_name)) {
ab0ee2ca
JG
1047 goto fail;
1048 }
1049
e8360425
JD
1050 return true;
1051fail:
1052 return false;
1053}
ab0ee2ca 1054
51eab943
JG
1055/*
1056 * Get the type of object to which a given trigger applies. Binding lets
1057 * the notification system evaluate a trigger's condition when a given
1058 * object's state is updated.
1059 *
1060 * For instance, a condition bound to a channel will be evaluated everytime
1061 * the channel's state is changed by a channel monitoring sample.
1062 */
e8360425 1063static
51eab943
JG
1064enum lttng_object_type get_trigger_binding_object(
1065 const struct lttng_trigger *trigger)
e8360425 1066{
51eab943
JG
1067 const struct lttng_condition *condition;
1068
1069 condition = lttng_trigger_get_const_condition(trigger);
1070 switch (lttng_condition_get_type(condition)) {
1071 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
1072 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
1073 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
1074 return LTTNG_OBJECT_TYPE_CHANNEL;
1075 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
1076 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
1077 return LTTNG_OBJECT_TYPE_SESSION;
1078 default:
1079 return LTTNG_OBJECT_TYPE_UNKNOWN;
1080 }
1081}
1082
1083static
1084bool trigger_applies_to_channel(const struct lttng_trigger *trigger,
1085 const struct channel_info *channel_info)
1086{
1087 const struct lttng_condition *condition;
e8360425 1088 bool trigger_applies;
ab0ee2ca 1089
51eab943 1090 condition = lttng_trigger_get_const_condition(trigger);
e8360425 1091 if (!condition) {
ab0ee2ca
JG
1092 goto fail;
1093 }
e8360425
JD
1094
1095 switch (lttng_condition_get_type(condition)) {
1096 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
1097 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
1098 trigger_applies = buffer_usage_condition_applies_to_channel(
1099 condition, channel_info);
1100 break;
1101 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
1102 trigger_applies = session_consumed_size_condition_applies_to_channel(
1103 condition, channel_info);
1104 break;
1105 default:
ab0ee2ca
JG
1106 goto fail;
1107 }
1108
e8360425 1109 return trigger_applies;
ab0ee2ca
JG
1110fail:
1111 return false;
1112}
1113
1114static
1115bool trigger_applies_to_client(struct lttng_trigger *trigger,
1116 struct notification_client *client)
1117{
1118 bool applies = false;
1119 struct lttng_condition_list_element *condition_list_element;
1120
1121 cds_list_for_each_entry(condition_list_element, &client->condition_list,
1122 node) {
1123 applies = lttng_condition_is_equal(
1124 condition_list_element->condition,
1125 lttng_trigger_get_condition(trigger));
1126 if (applies) {
1127 break;
1128 }
1129 }
1130 return applies;
1131}
1132
8abe313a
JG
1133static
1134int match_session(struct cds_lfht_node *node, const void *key)
1135{
1136 const char *name = key;
1137 struct session_info *session_info = caa_container_of(
1138 node, struct session_info, sessions_ht_node);
1139
1140 return !strcmp(session_info->name, name);
1141}
1142
ea9a44f0
JG
1143/*
1144 * Allocate an empty lttng_session_trigger_list for the session named
1145 * 'session_name'.
1146 *
1147 * No ownership of 'session_name' is assumed by the session trigger list.
1148 * It is the caller's responsability to ensure the session name is alive
1149 * for as long as this list is.
1150 */
1151static
1152struct lttng_session_trigger_list *lttng_session_trigger_list_create(
1153 const char *session_name,
1154 struct cds_lfht *session_triggers_ht)
1155{
1156 struct lttng_session_trigger_list *list;
1157
1158 list = zmalloc(sizeof(*list));
1159 if (!list) {
1160 goto end;
1161 }
1162 list->session_name = session_name;
1163 CDS_INIT_LIST_HEAD(&list->list);
1164 cds_lfht_node_init(&list->session_triggers_ht_node);
1165 list->session_triggers_ht = session_triggers_ht;
1166
1167 rcu_read_lock();
1168 /* Publish the list through the session_triggers_ht. */
1169 cds_lfht_add(session_triggers_ht,
1170 hash_key_str(session_name, lttng_ht_seed),
1171 &list->session_triggers_ht_node);
1172 rcu_read_unlock();
1173end:
1174 return list;
1175}
1176
1177static
1178void free_session_trigger_list_rcu(struct rcu_head *node)
1179{
1180 free(caa_container_of(node, struct lttng_session_trigger_list,
1181 rcu_node));
1182}
1183
1184static
1185void lttng_session_trigger_list_destroy(struct lttng_session_trigger_list *list)
1186{
1187 struct lttng_trigger_list_element *trigger_list_element, *tmp;
1188
1189 /* Empty the list element by element, and then free the list itself. */
1190 cds_list_for_each_entry_safe(trigger_list_element, tmp,
1191 &list->list, node) {
1192 cds_list_del(&trigger_list_element->node);
1193 free(trigger_list_element);
1194 }
1195 rcu_read_lock();
1196 /* Unpublish the list from the session_triggers_ht. */
1197 cds_lfht_del(list->session_triggers_ht,
1198 &list->session_triggers_ht_node);
1199 rcu_read_unlock();
1200 call_rcu(&list->rcu_node, free_session_trigger_list_rcu);
1201}
1202
1203static
1204int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
1205 const struct lttng_trigger *trigger)
1206{
1207 int ret = 0;
1208 struct lttng_trigger_list_element *new_element =
1209 zmalloc(sizeof(*new_element));
1210
1211 if (!new_element) {
1212 ret = -1;
1213 goto end;
1214 }
1215 CDS_INIT_LIST_HEAD(&new_element->node);
1216 new_element->trigger = trigger;
1217 cds_list_add(&new_element->node, &list->list);
1218end:
1219 return ret;
1220}
1221
1222static
1223bool trigger_applies_to_session(const struct lttng_trigger *trigger,
1224 const char *session_name)
1225{
1226 bool applies = false;
1227 const struct lttng_condition *condition;
1228
1229 condition = lttng_trigger_get_const_condition(trigger);
1230 switch (lttng_condition_get_type(condition)) {
1231 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
1232 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
1233 {
1234 enum lttng_condition_status condition_status;
1235 const char *condition_session_name;
1236
1237 condition_status = lttng_condition_session_rotation_get_session_name(
1238 condition, &condition_session_name);
1239 if (condition_status != LTTNG_CONDITION_STATUS_OK) {
1240 ERR("[notification-thread] Failed to retrieve session rotation condition's session name");
1241 goto end;
1242 }
1243
1244 assert(condition_session_name);
1245 applies = !strcmp(condition_session_name, session_name);
1246 break;
1247 }
1248 default:
1249 goto end;
1250 }
1251end:
1252 return applies;
1253}
1254
1255/*
1256 * Allocate and initialize an lttng_session_trigger_list which contains
1257 * all triggers that apply to the session named 'session_name'.
1258 *
1259 * No ownership of 'session_name' is assumed by the session trigger list.
1260 * It is the caller's responsability to ensure the session name is alive
1261 * for as long as this list is.
1262 */
1263static
1264struct lttng_session_trigger_list *lttng_session_trigger_list_build(
1265 const struct notification_thread_state *state,
1266 const char *session_name)
1267{
1268 int trigger_count = 0;
1269 struct lttng_session_trigger_list *session_trigger_list = NULL;
1270 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1271 struct cds_lfht_iter iter;
1272
1273 session_trigger_list = lttng_session_trigger_list_create(session_name,
1274 state->session_triggers_ht);
1275
1276 /* Add all triggers applying to the session named 'session_name'. */
1277 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1278 node) {
1279 int ret;
1280
1281 if (!trigger_applies_to_session(trigger_ht_element->trigger,
1282 session_name)) {
1283 continue;
1284 }
1285
1286 ret = lttng_session_trigger_list_add(session_trigger_list,
1287 trigger_ht_element->trigger);
1288 if (ret) {
1289 goto error;
1290 }
1291
1292 trigger_count++;
1293 }
1294
1295 DBG("[notification-thread] Found %i triggers that apply to newly created session",
1296 trigger_count);
1297 return session_trigger_list;
1298error:
1299 lttng_session_trigger_list_destroy(session_trigger_list);
1300 return NULL;
1301}
1302
8abe313a
JG
1303static
1304struct session_info *find_or_create_session_info(
1305 struct notification_thread_state *state,
1306 const char *name, uid_t uid, gid_t gid)
1307{
1308 struct session_info *session = NULL;
1309 struct cds_lfht_node *node;
1310 struct cds_lfht_iter iter;
ea9a44f0 1311 struct lttng_session_trigger_list *trigger_list;
8abe313a
JG
1312
1313 rcu_read_lock();
1314 cds_lfht_lookup(state->sessions_ht,
1315 hash_key_str(name, lttng_ht_seed),
1316 match_session,
1317 name,
1318 &iter);
1319 node = cds_lfht_iter_get_node(&iter);
1320 if (node) {
1321 DBG("[notification-thread] Found session info of session \"%s\" (uid = %i, gid = %i)",
1322 name, uid, gid);
1323 session = caa_container_of(node, struct session_info,
1324 sessions_ht_node);
1325 assert(session->uid == uid);
1326 assert(session->gid == gid);
1eee26c5
JG
1327 session_info_get(session);
1328 goto end;
ea9a44f0
JG
1329 }
1330
1331 trigger_list = lttng_session_trigger_list_build(state, name);
1332 if (!trigger_list) {
1333 goto error;
8abe313a
JG
1334 }
1335
1eee26c5
JG
1336 session = session_info_create(name, uid, gid, trigger_list,
1337 state->sessions_ht);
8abe313a
JG
1338 if (!session) {
1339 ERR("[notification-thread] Failed to allocation session info for session \"%s\" (uid = %i, gid = %i)",
1340 name, uid, gid);
ea9a44f0 1341 goto error;
8abe313a 1342 }
ea9a44f0 1343 trigger_list = NULL;
577983cb
JG
1344
1345 cds_lfht_add(state->sessions_ht, hash_key_str(name, lttng_ht_seed),
9b63a4aa 1346 &session->sessions_ht_node);
1eee26c5 1347end:
8abe313a
JG
1348 rcu_read_unlock();
1349 return session;
ea9a44f0
JG
1350error:
1351 rcu_read_unlock();
1352 session_info_put(session);
1353 return NULL;
8abe313a
JG
1354}
1355
ab0ee2ca
JG
1356static
1357int handle_notification_thread_command_add_channel(
8abe313a
JG
1358 struct notification_thread_state *state,
1359 const char *session_name, uid_t session_uid, gid_t session_gid,
1360 const char *channel_name, enum lttng_domain_type channel_domain,
1361 uint64_t channel_key_int, uint64_t channel_capacity,
1362 enum lttng_error_code *cmd_result)
ab0ee2ca
JG
1363{
1364 struct cds_list_head trigger_list;
8abe313a
JG
1365 struct channel_info *new_channel_info = NULL;
1366 struct channel_key channel_key = {
1367 .key = channel_key_int,
1368 .domain = channel_domain,
1369 };
ab0ee2ca
JG
1370 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
1371 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1372 int trigger_count = 0;
1373 struct cds_lfht_iter iter;
8abe313a 1374 struct session_info *session_info = NULL;
ab0ee2ca
JG
1375
1376 DBG("[notification-thread] Adding channel %s from session %s, channel key = %" PRIu64 " in %s domain",
8abe313a
JG
1377 channel_name, session_name, channel_key_int,
1378 channel_domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
ab0ee2ca
JG
1379
1380 CDS_INIT_LIST_HEAD(&trigger_list);
1381
8abe313a
JG
1382 session_info = find_or_create_session_info(state, session_name,
1383 session_uid, session_gid);
1384 if (!session_info) {
1385 /* Allocation error or an internal error occured. */
ab0ee2ca
JG
1386 goto error;
1387 }
1388
8abe313a
JG
1389 new_channel_info = channel_info_create(channel_name, &channel_key,
1390 channel_capacity, session_info);
1391 if (!new_channel_info) {
1392 goto error;
1393 }
ab0ee2ca
JG
1394
1395 /* Build a list of all triggers applying to the new channel. */
1396 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1397 node) {
1398 struct lttng_trigger_list_element *new_element;
1399
1400 if (!trigger_applies_to_channel(trigger_ht_element->trigger,
8abe313a 1401 new_channel_info)) {
ab0ee2ca
JG
1402 continue;
1403 }
1404
1405 new_element = zmalloc(sizeof(*new_element));
1406 if (!new_element) {
1407 goto error;
1408 }
1409 CDS_INIT_LIST_HEAD(&new_element->node);
1410 new_element->trigger = trigger_ht_element->trigger;
1411 cds_list_add(&new_element->node, &trigger_list);
1412 trigger_count++;
1413 }
1414
1415 DBG("[notification-thread] Found %i triggers that apply to newly added channel",
1416 trigger_count);
1417 channel_trigger_list = zmalloc(sizeof(*channel_trigger_list));
1418 if (!channel_trigger_list) {
1419 goto error;
1420 }
8abe313a 1421 channel_trigger_list->channel_key = new_channel_info->key;
ab0ee2ca
JG
1422 CDS_INIT_LIST_HEAD(&channel_trigger_list->list);
1423 cds_lfht_node_init(&channel_trigger_list->channel_triggers_ht_node);
1424 cds_list_splice(&trigger_list, &channel_trigger_list->list);
1425
1426 rcu_read_lock();
1427 /* Add channel to the channel_ht which owns the channel_infos. */
1428 cds_lfht_add(state->channels_ht,
8abe313a 1429 hash_channel_key(&new_channel_info->key),
ab0ee2ca
JG
1430 &new_channel_info->channels_ht_node);
1431 /*
1432 * Add the list of triggers associated with this channel to the
1433 * channel_triggers_ht.
1434 */
1435 cds_lfht_add(state->channel_triggers_ht,
8abe313a 1436 hash_channel_key(&new_channel_info->key),
ab0ee2ca
JG
1437 &channel_trigger_list->channel_triggers_ht_node);
1438 rcu_read_unlock();
1eee26c5 1439 session_info_put(session_info);
ab0ee2ca
JG
1440 *cmd_result = LTTNG_OK;
1441 return 0;
1442error:
ab0ee2ca 1443 channel_info_destroy(new_channel_info);
8abe313a 1444 session_info_put(session_info);
ab0ee2ca
JG
1445 return 1;
1446}
1447
1448static
1449int handle_notification_thread_command_remove_channel(
1450 struct notification_thread_state *state,
1451 uint64_t channel_key, enum lttng_domain_type domain,
1452 enum lttng_error_code *cmd_result)
1453{
1454 struct cds_lfht_node *node;
1455 struct cds_lfht_iter iter;
1456 struct lttng_channel_trigger_list *trigger_list;
1457 struct lttng_trigger_list_element *trigger_list_element, *tmp;
1458 struct channel_key key = { .key = channel_key, .domain = domain };
1459 struct channel_info *channel_info;
1460
1461 DBG("[notification-thread] Removing channel key = %" PRIu64 " in %s domain",
1462 channel_key, domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
1463
1464 rcu_read_lock();
1465
1466 cds_lfht_lookup(state->channel_triggers_ht,
1467 hash_channel_key(&key),
1468 match_channel_trigger_list,
1469 &key,
1470 &iter);
1471 node = cds_lfht_iter_get_node(&iter);
1472 /*
1473 * There is a severe internal error if we are being asked to remove a
1474 * channel that doesn't exist.
1475 */
1476 if (!node) {
1477 ERR("[notification-thread] Channel being removed is unknown to the notification thread");
1478 goto end;
1479 }
1480
1481 /* Free the list of triggers associated with this channel. */
1482 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
1483 channel_triggers_ht_node);
1484 cds_list_for_each_entry_safe(trigger_list_element, tmp,
1485 &trigger_list->list, node) {
1486 cds_list_del(&trigger_list_element->node);
1487 free(trigger_list_element);
1488 }
1489 cds_lfht_del(state->channel_triggers_ht, node);
1490 free(trigger_list);
1491
1492 /* Free sampled channel state. */
1493 cds_lfht_lookup(state->channel_state_ht,
1494 hash_channel_key(&key),
1495 match_channel_state_sample,
1496 &key,
1497 &iter);
1498 node = cds_lfht_iter_get_node(&iter);
1499 /*
1500 * This is expected to be NULL if the channel is destroyed before we
1501 * received a sample.
1502 */
1503 if (node) {
1504 struct channel_state_sample *sample = caa_container_of(node,
1505 struct channel_state_sample,
1506 channel_state_ht_node);
1507
1508 cds_lfht_del(state->channel_state_ht, node);
1509 free(sample);
1510 }
1511
1512 /* Remove the channel from the channels_ht and free it. */
1513 cds_lfht_lookup(state->channels_ht,
1514 hash_channel_key(&key),
1515 match_channel_info,
1516 &key,
1517 &iter);
1518 node = cds_lfht_iter_get_node(&iter);
1519 assert(node);
1520 channel_info = caa_container_of(node, struct channel_info,
1521 channels_ht_node);
1522 cds_lfht_del(state->channels_ht, node);
1523 channel_info_destroy(channel_info);
1524end:
1525 rcu_read_unlock();
1526 *cmd_result = LTTNG_OK;
1527 return 0;
1528}
1529
0ca52944
JG
1530static
1531int handle_notification_thread_command_session_rotation_ongoing(
1532 struct notification_thread_state *state,
1533 const char *session_name, uint64_t trace_archive_chunk_id,
1534 enum lttng_error_code *cmd_result)
1535{
1536 *cmd_result = LTTNG_OK;
1537 return 0;
1538}
1539
1540static
1541int handle_notification_thread_command_session_rotation_completed(
1542 struct notification_thread_state *state,
1543 const char *session_name, uint64_t trace_archive_chunk_id,
1544 const struct lttng_trace_archive_location *location,
1545 enum lttng_error_code *cmd_result)
1546{
1547 *cmd_result = LTTNG_OK;
1548 return 0;
1549}
1550
1da26331
JG
1551static
1552int condition_is_supported(struct lttng_condition *condition)
1553{
1554 int ret;
1555
1556 switch (lttng_condition_get_type(condition)) {
1557 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
1558 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
1559 {
1560 enum lttng_domain_type domain;
1561
1562 ret = lttng_condition_buffer_usage_get_domain_type(condition,
1563 &domain);
1564 if (ret) {
1565 ret = -1;
1566 goto end;
1567 }
1568
1569 if (domain != LTTNG_DOMAIN_KERNEL) {
1570 ret = 1;
1571 goto end;
1572 }
1573
1574 /*
1575 * Older kernel tracers don't expose the API to monitor their
1576 * buffers. Therefore, we reject triggers that require that
1577 * mechanism to be available to be evaluated.
1578 */
1579 ret = kernel_supports_ring_buffer_snapshot_sample_positions(
1580 kernel_tracer_fd);
1581 break;
1582 }
1583 default:
1584 ret = 1;
1585 }
1586end:
1587 return ret;
1588}
1589
51eab943
JG
1590/* Must be called with RCU read lock held. */
1591static
1592int bind_trigger_to_matching_session(const struct lttng_trigger *trigger,
1593 struct notification_thread_state *state)
1594{
1595 int ret = 0;
1596 struct cds_lfht_node *node;
1597 struct cds_lfht_iter iter;
1598 const struct lttng_condition *condition;
1599 const char *session_name;
1600 struct lttng_session_trigger_list *trigger_list;
1601
1602 condition = lttng_trigger_get_const_condition(trigger);
1603 switch (lttng_condition_get_type(condition)) {
1604 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
1605 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
1606 {
1607 enum lttng_condition_status status;
1608
1609 status = lttng_condition_session_rotation_get_session_name(
1610 condition, &session_name);
1611 if (status != LTTNG_CONDITION_STATUS_OK) {
1612 ERR("[notification-thread] Failed to bind trigger to session: unable to get 'session_rotation' condition's session name");
1613 ret = -1;
1614 goto end;
1615 }
1616 break;
1617 }
1618 default:
1619 ret = -1;
1620 goto end;
1621 }
1622
1623 cds_lfht_lookup(state->session_triggers_ht,
1624 hash_key_str(session_name, lttng_ht_seed),
1625 match_session_trigger_list,
1626 session_name,
1627 &iter);
1628 node = cds_lfht_iter_get_node(&iter);
1629 if (!node) {
1630 /*
1631 * Not an error, the list of triggers applying to that session
1632 * will be initialized when the session is created.
1633 */
1634 DBG("[notification-thread] Unable to bind trigger applying to session \"%s\" as it is not yet known to the notification system",
1635 session_name);
1636 goto end;
1637 }
1638
1639 trigger_list = caa_container_of(node,
1640 struct lttng_session_trigger_list,
1641 session_triggers_ht_node);
1642
1643 DBG("[notification-thread] Newly registered trigger bound to session \"%s\"",
1644 session_name);
1645 ret = lttng_session_trigger_list_add(trigger_list, trigger);
1646end:
1647 return ret;
1648}
1649
1650/* Must be called with RCU read lock held. */
1651static
1652int bind_trigger_to_matching_channels(const struct lttng_trigger *trigger,
1653 struct notification_thread_state *state)
1654{
1655 int ret = 0;
1656 struct cds_lfht_node *node;
1657 struct cds_lfht_iter iter;
1658 struct channel_info *channel;
1659
1660 cds_lfht_for_each_entry(state->channels_ht, &iter, channel,
1661 channels_ht_node) {
1662 struct lttng_trigger_list_element *trigger_list_element;
1663 struct lttng_channel_trigger_list *trigger_list;
1664
1665 if (!trigger_applies_to_channel(trigger, channel)) {
1666 continue;
1667 }
1668
1669 cds_lfht_lookup(state->channel_triggers_ht,
1670 hash_channel_key(&channel->key),
1671 match_channel_trigger_list,
1672 &channel->key,
1673 &iter);
1674 node = cds_lfht_iter_get_node(&iter);
1675 assert(node);
1676 trigger_list = caa_container_of(node,
1677 struct lttng_channel_trigger_list,
1678 channel_triggers_ht_node);
1679
1680 trigger_list_element = zmalloc(sizeof(*trigger_list_element));
1681 if (!trigger_list_element) {
1682 ret = -1;
1683 goto end;
1684 }
1685 CDS_INIT_LIST_HEAD(&trigger_list_element->node);
1686 trigger_list_element->trigger = trigger;
1687 cds_list_add(&trigger_list_element->node, &trigger_list->list);
1688 DBG("[notification-thread] Newly registered trigger bound to channel \"%s\"",
1689 channel->name);
1690 }
1691end:
1692 return ret;
1693}
1694
ab0ee2ca
JG
1695/*
1696 * FIXME A client's credentials are not checked when registering a trigger, nor
1697 * are they stored alongside with the trigger.
1698 *
1da26331 1699 * The effects of this are benign since:
ab0ee2ca 1700 * - The client will succeed in registering the trigger, as it is valid,
51eab943 1701 * - The trigger will, internally, be bound to the channel/session,
ab0ee2ca
JG
1702 * - The notifications will not be sent since the client's credentials
1703 * are checked against the channel at that moment.
1da26331
JG
1704 *
1705 * If this function returns a non-zero value, it means something is
50ca7858 1706 * fundamentally broken and the whole subsystem/thread will be torn down.
1da26331
JG
1707 *
1708 * If a non-fatal error occurs, just set the cmd_result to the appropriate
1709 * error code.
ab0ee2ca
JG
1710 */
1711static
1712int handle_notification_thread_command_register_trigger(
8abe313a
JG
1713 struct notification_thread_state *state,
1714 struct lttng_trigger *trigger,
1715 enum lttng_error_code *cmd_result)
ab0ee2ca
JG
1716{
1717 int ret = 0;
1718 struct lttng_condition *condition;
1719 struct notification_client *client;
1720 struct notification_client_list *client_list = NULL;
1721 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1722 struct notification_client_list_element *client_list_element, *tmp;
1723 struct cds_lfht_node *node;
1724 struct cds_lfht_iter iter;
ab0ee2ca
JG
1725 bool free_trigger = true;
1726
1727 rcu_read_lock();
1728
1729 condition = lttng_trigger_get_condition(trigger);
1da26331
JG
1730 assert(condition);
1731
1732 ret = condition_is_supported(condition);
1733 if (ret < 0) {
1734 goto error;
1735 } else if (ret == 0) {
1736 *cmd_result = LTTNG_ERR_NOT_SUPPORTED;
1737 goto error;
1738 } else {
1739 /* Feature is supported, continue. */
1740 ret = 0;
1741 }
1742
ab0ee2ca
JG
1743 trigger_ht_element = zmalloc(sizeof(*trigger_ht_element));
1744 if (!trigger_ht_element) {
1745 ret = -1;
1746 goto error;
1747 }
1748
1749 /* Add trigger to the trigger_ht. */
1750 cds_lfht_node_init(&trigger_ht_element->node);
1751 trigger_ht_element->trigger = trigger;
1752
1753 node = cds_lfht_add_unique(state->triggers_ht,
1754 lttng_condition_hash(condition),
1755 match_condition,
1756 condition,
1757 &trigger_ht_element->node);
1758 if (node != &trigger_ht_element->node) {
1759 /* Not a fatal error, simply report it to the client. */
1760 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
1761 goto error_free_ht_element;
1762 }
1763
1764 /*
1765 * Ownership of the trigger and of its wrapper was transfered to
1766 * the triggers_ht.
1767 */
1768 trigger_ht_element = NULL;
1769 free_trigger = false;
1770
1771 /*
1772 * The rest only applies to triggers that have a "notify" action.
1773 * It is not skipped as this is the only action type currently
1774 * supported.
1775 */
1776 client_list = zmalloc(sizeof(*client_list));
1777 if (!client_list) {
1778 ret = -1;
1779 goto error_free_ht_element;
1780 }
1781 cds_lfht_node_init(&client_list->notification_trigger_ht_node);
1782 CDS_INIT_LIST_HEAD(&client_list->list);
1783 client_list->trigger = trigger;
1784
1785 /* Build a list of clients to which this new trigger applies. */
1786 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
1787 client_socket_ht_node) {
1788 if (!trigger_applies_to_client(trigger, client)) {
1789 continue;
1790 }
1791
1792 client_list_element = zmalloc(sizeof(*client_list_element));
1793 if (!client_list_element) {
1794 ret = -1;
1795 goto error_free_client_list;
1796 }
1797 CDS_INIT_LIST_HEAD(&client_list_element->node);
1798 client_list_element->client = client;
1799 cds_list_add(&client_list_element->node, &client_list->list);
1800 }
1801
1802 cds_lfht_add(state->notification_trigger_clients_ht,
1803 lttng_condition_hash(condition),
1804 &client_list->notification_trigger_ht_node);
ab0ee2ca 1805
51eab943
JG
1806 switch (get_trigger_binding_object(trigger)) {
1807 case LTTNG_OBJECT_TYPE_SESSION:
1808 /* Add the trigger to the list if it matches a known session. */
1809 ret = bind_trigger_to_matching_session(trigger, state);
1810 if (ret) {
1811 goto error_free_client_list;
ab0ee2ca 1812 }
51eab943
JG
1813 case LTTNG_OBJECT_TYPE_CHANNEL:
1814 /*
1815 * Add the trigger to list of triggers bound to the channels
1816 * currently known.
1817 */
1818 ret = bind_trigger_to_matching_channels(trigger, state);
1819 if (ret) {
ab0ee2ca
JG
1820 goto error_free_client_list;
1821 }
51eab943
JG
1822 break;
1823 case LTTNG_OBJECT_TYPE_NONE:
1824 break;
1825 default:
1826 ERR("[notification-thread] Unknown object type on which to bind a newly registered trigger was encountered");
1827 ret = -1;
1828 goto error_free_client_list;
ab0ee2ca
JG
1829 }
1830
2ae99f0b
JG
1831 /*
1832 * Since there is nothing preventing clients from subscribing to a
1833 * condition before the corresponding trigger is registered, we have
1834 * to evaluate this new condition right away.
1835 *
1836 * At some point, we were waiting for the next "evaluation" (e.g. on
1837 * reception of a channel sample) to evaluate this new condition, but
1838 * that was broken.
1839 *
1840 * The reason it was broken is that waiting for the next sample
1841 * does not allow us to properly handle transitions for edge-triggered
1842 * conditions.
1843 *
1844 * Consider this example: when we handle a new channel sample, we
1845 * evaluate each conditions twice: once with the previous state, and
1846 * again with the newest state. We then use those two results to
1847 * determine whether a state change happened: a condition was false and
1848 * became true. If a state change happened, we have to notify clients.
1849 *
1850 * Now, if a client subscribes to a given notification and registers
1851 * a trigger *after* that subscription, we have to make sure the
1852 * condition is evaluated at this point while considering only the
1853 * current state. Otherwise, the next evaluation cycle may only see
1854 * that the evaluations remain the same (true for samples n-1 and n) and
1855 * the client will never know that the condition has been met.
1856 */
1857 cds_list_for_each_entry_safe(client_list_element, tmp,
1858 &client_list->list, node) {
1859 ret = evaluate_condition_for_client(trigger, condition,
1860 client_list_element->client, state);
1861 if (ret) {
1862 goto error_free_client_list;
1863 }
1864 }
1865
1866 /*
1867 * Client list ownership transferred to the
1868 * notification_trigger_clients_ht.
1869 */
1870 client_list = NULL;
1871
ab0ee2ca
JG
1872 *cmd_result = LTTNG_OK;
1873error_free_client_list:
1874 if (client_list) {
1875 cds_list_for_each_entry_safe(client_list_element, tmp,
1876 &client_list->list, node) {
1877 free(client_list_element);
1878 }
1879 free(client_list);
1880 }
1881error_free_ht_element:
1882 free(trigger_ht_element);
1883error:
1884 if (free_trigger) {
1885 struct lttng_action *action = lttng_trigger_get_action(trigger);
1886
1887 lttng_condition_destroy(condition);
1888 lttng_action_destroy(action);
1889 lttng_trigger_destroy(trigger);
1890 }
1891 rcu_read_unlock();
1892 return ret;
1893}
1894
cc2295b5 1895static
ab0ee2ca
JG
1896int handle_notification_thread_command_unregister_trigger(
1897 struct notification_thread_state *state,
1898 struct lttng_trigger *trigger,
1899 enum lttng_error_code *_cmd_reply)
1900{
1901 struct cds_lfht_iter iter;
1902 struct cds_lfht_node *node, *triggers_ht_node;
1903 struct lttng_channel_trigger_list *trigger_list;
1904 struct notification_client_list *client_list;
1905 struct notification_client_list_element *client_list_element, *tmp;
1906 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1907 struct lttng_condition *condition = lttng_trigger_get_condition(
1908 trigger);
1909 struct lttng_action *action;
1910 enum lttng_error_code cmd_reply;
1911
1912 rcu_read_lock();
1913
1914 cds_lfht_lookup(state->triggers_ht,
1915 lttng_condition_hash(condition),
1916 match_condition,
1917 condition,
1918 &iter);
1919 triggers_ht_node = cds_lfht_iter_get_node(&iter);
1920 if (!triggers_ht_node) {
1921 cmd_reply = LTTNG_ERR_TRIGGER_NOT_FOUND;
1922 goto end;
1923 } else {
1924 cmd_reply = LTTNG_OK;
1925 }
1926
1927 /* Remove trigger from channel_triggers_ht. */
1928 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter, trigger_list,
1929 channel_triggers_ht_node) {
1930 struct lttng_trigger_list_element *trigger_element, *tmp;
1931
1932 cds_list_for_each_entry_safe(trigger_element, tmp,
1933 &trigger_list->list, node) {
9b63a4aa
JG
1934 const struct lttng_condition *current_condition =
1935 lttng_trigger_get_const_condition(
ab0ee2ca
JG
1936 trigger_element->trigger);
1937
1938 assert(current_condition);
1939 if (!lttng_condition_is_equal(condition,
1940 current_condition)) {
1941 continue;
1942 }
1943
1944 DBG("[notification-thread] Removed trigger from channel_triggers_ht");
1945 cds_list_del(&trigger_element->node);
e4db5ace
JR
1946 /* A trigger can only appear once per channel */
1947 break;
ab0ee2ca
JG
1948 }
1949 }
1950
1951 /*
1952 * Remove and release the client list from
1953 * notification_trigger_clients_ht.
1954 */
1955 cds_lfht_lookup(state->notification_trigger_clients_ht,
1956 lttng_condition_hash(condition),
1957 match_client_list,
1958 trigger,
1959 &iter);
1960 node = cds_lfht_iter_get_node(&iter);
1961 assert(node);
1962 client_list = caa_container_of(node, struct notification_client_list,
1963 notification_trigger_ht_node);
1964 cds_list_for_each_entry_safe(client_list_element, tmp,
1965 &client_list->list, node) {
1966 free(client_list_element);
1967 }
1968 cds_lfht_del(state->notification_trigger_clients_ht, node);
1969 free(client_list);
1970
1971 /* Remove trigger from triggers_ht. */
1972 trigger_ht_element = caa_container_of(triggers_ht_node,
1973 struct lttng_trigger_ht_element, node);
1974 cds_lfht_del(state->triggers_ht, triggers_ht_node);
1975
1976 condition = lttng_trigger_get_condition(trigger_ht_element->trigger);
1977 lttng_condition_destroy(condition);
1978 action = lttng_trigger_get_action(trigger_ht_element->trigger);
1979 lttng_action_destroy(action);
1980 lttng_trigger_destroy(trigger_ht_element->trigger);
1981 free(trigger_ht_element);
1982end:
1983 rcu_read_unlock();
1984 if (_cmd_reply) {
1985 *_cmd_reply = cmd_reply;
1986 }
1987 return 0;
1988}
1989
1990/* Returns 0 on success, 1 on exit requested, negative value on error. */
1991int handle_notification_thread_command(
1992 struct notification_thread_handle *handle,
1993 struct notification_thread_state *state)
1994{
1995 int ret;
1996 uint64_t counter;
1997 struct notification_thread_command *cmd;
1998
8abe313a
JG
1999 /* Read the event pipe to put it back into a quiescent state. */
2000 ret = read(lttng_pipe_get_readfd(handle->cmd_queue.event_pipe), &counter,
2001 sizeof(counter));
ab0ee2ca
JG
2002 if (ret == -1) {
2003 goto error;
2004 }
2005
2006 pthread_mutex_lock(&handle->cmd_queue.lock);
2007 cmd = cds_list_first_entry(&handle->cmd_queue.list,
2008 struct notification_thread_command, cmd_list_node);
2009 switch (cmd->type) {
2010 case NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER:
2011 DBG("[notification-thread] Received register trigger command");
2012 ret = handle_notification_thread_command_register_trigger(
2013 state, cmd->parameters.trigger,
2014 &cmd->reply_code);
2015 break;
2016 case NOTIFICATION_COMMAND_TYPE_UNREGISTER_TRIGGER:
2017 DBG("[notification-thread] Received unregister trigger command");
2018 ret = handle_notification_thread_command_unregister_trigger(
2019 state, cmd->parameters.trigger,
2020 &cmd->reply_code);
2021 break;
2022 case NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL:
2023 DBG("[notification-thread] Received add channel command");
2024 ret = handle_notification_thread_command_add_channel(
8abe313a
JG
2025 state,
2026 cmd->parameters.add_channel.session.name,
2027 cmd->parameters.add_channel.session.uid,
2028 cmd->parameters.add_channel.session.gid,
2029 cmd->parameters.add_channel.channel.name,
2030 cmd->parameters.add_channel.channel.domain,
2031 cmd->parameters.add_channel.channel.key,
2032 cmd->parameters.add_channel.channel.capacity,
ab0ee2ca
JG
2033 &cmd->reply_code);
2034 break;
2035 case NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL:
2036 DBG("[notification-thread] Received remove channel command");
2037 ret = handle_notification_thread_command_remove_channel(
2038 state, cmd->parameters.remove_channel.key,
2039 cmd->parameters.remove_channel.domain,
2040 &cmd->reply_code);
2041 break;
0ca52944
JG
2042 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING:
2043 DBG("[notification-thread] Received session rotation ongoing command");
2044 ret = handle_notification_thread_command_session_rotation_ongoing(
2045 state,
2046 cmd->parameters.session_rotation_ongoing.session_name,
2047 cmd->parameters.session_rotation_ongoing.trace_archive_chunk_id,
2048 &cmd->reply_code);
2049 break;
2050 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED:
2051 DBG("[notification-thread] Received session rotation completed command");
2052 ret = handle_notification_thread_command_session_rotation_completed(
2053 state,
2054 cmd->parameters.session_rotation_completed.session_name,
2055 cmd->parameters.session_rotation_completed.trace_archive_chunk_id,
2056 cmd->parameters.session_rotation_completed.location,
2057 &cmd->reply_code);
2058 break;
ab0ee2ca
JG
2059 case NOTIFICATION_COMMAND_TYPE_QUIT:
2060 DBG("[notification-thread] Received quit command");
2061 cmd->reply_code = LTTNG_OK;
2062 ret = 1;
2063 goto end;
2064 default:
2065 ERR("[notification-thread] Unknown internal command received");
2066 goto error_unlock;
2067 }
2068
2069 if (ret) {
2070 goto error_unlock;
2071 }
2072end:
2073 cds_list_del(&cmd->cmd_list_node);
8ada111f 2074 lttng_waiter_wake_up(&cmd->reply_waiter);
ab0ee2ca
JG
2075 pthread_mutex_unlock(&handle->cmd_queue.lock);
2076 return ret;
2077error_unlock:
2078 /* Wake-up and return a fatal error to the calling thread. */
8ada111f 2079 lttng_waiter_wake_up(&cmd->reply_waiter);
ab0ee2ca
JG
2080 pthread_mutex_unlock(&handle->cmd_queue.lock);
2081 cmd->reply_code = LTTNG_ERR_FATAL;
2082error:
2083 /* Indicate a fatal error to the caller. */
2084 return -1;
2085}
2086
2087static
2088unsigned long hash_client_socket(int socket)
2089{
2090 return hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed);
2091}
2092
2093static
2094int socket_set_non_blocking(int socket)
2095{
2096 int ret, flags;
2097
2098 /* Set the pipe as non-blocking. */
2099 ret = fcntl(socket, F_GETFL, 0);
2100 if (ret == -1) {
2101 PERROR("fcntl get socket flags");
2102 goto end;
2103 }
2104 flags = ret;
2105
2106 ret = fcntl(socket, F_SETFL, flags | O_NONBLOCK);
2107 if (ret == -1) {
2108 PERROR("fcntl set O_NONBLOCK socket flag");
2109 goto end;
2110 }
2111 DBG("Client socket (fd = %i) set as non-blocking", socket);
2112end:
2113 return ret;
2114}
2115
2116static
14fa22f8 2117int client_reset_inbound_state(struct notification_client *client)
ab0ee2ca
JG
2118{
2119 int ret;
2120
2121 ret = lttng_dynamic_buffer_set_size(
2122 &client->communication.inbound.buffer, 0);
2123 assert(!ret);
2124
2125 client->communication.inbound.bytes_to_receive =
2126 sizeof(struct lttng_notification_channel_message);
2127 client->communication.inbound.msg_type =
2128 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN;
ab0ee2ca
JG
2129 LTTNG_SOCK_SET_UID_CRED(&client->communication.inbound.creds, -1);
2130 LTTNG_SOCK_SET_GID_CRED(&client->communication.inbound.creds, -1);
14fa22f8
JG
2131 ret = lttng_dynamic_buffer_set_size(
2132 &client->communication.inbound.buffer,
2133 client->communication.inbound.bytes_to_receive);
2134 return ret;
ab0ee2ca
JG
2135}
2136
2137int handle_notification_thread_client_connect(
2138 struct notification_thread_state *state)
2139{
2140 int ret;
2141 struct notification_client *client;
2142
2143 DBG("[notification-thread] Handling new notification channel client connection");
2144
2145 client = zmalloc(sizeof(*client));
2146 if (!client) {
2147 /* Fatal error. */
2148 ret = -1;
2149 goto error;
2150 }
2151 CDS_INIT_LIST_HEAD(&client->condition_list);
2152 lttng_dynamic_buffer_init(&client->communication.inbound.buffer);
2153 lttng_dynamic_buffer_init(&client->communication.outbound.buffer);
01ea340e 2154 client->communication.inbound.expect_creds = true;
14fa22f8
JG
2155 ret = client_reset_inbound_state(client);
2156 if (ret) {
2157 ERR("[notification-thread] Failed to reset client communication's inbound state");
2158 ret = 0;
2159 goto error;
2160 }
ab0ee2ca
JG
2161
2162 ret = lttcomm_accept_unix_sock(state->notification_channel_socket);
2163 if (ret < 0) {
2164 ERR("[notification-thread] Failed to accept new notification channel client connection");
2165 ret = 0;
2166 goto error;
2167 }
2168
2169 client->socket = ret;
2170
2171 ret = socket_set_non_blocking(client->socket);
2172 if (ret) {
2173 ERR("[notification-thread] Failed to set new notification channel client connection socket as non-blocking");
2174 goto error;
2175 }
2176
2177 ret = lttcomm_setsockopt_creds_unix_sock(client->socket);
2178 if (ret < 0) {
2179 ERR("[notification-thread] Failed to set socket options on new notification channel client socket");
2180 ret = 0;
2181 goto error;
2182 }
2183
2184 ret = lttng_poll_add(&state->events, client->socket,
2185 LPOLLIN | LPOLLERR |
2186 LPOLLHUP | LPOLLRDHUP);
2187 if (ret < 0) {
2188 ERR("[notification-thread] Failed to add notification channel client socket to poll set");
2189 ret = 0;
2190 goto error;
2191 }
2192 DBG("[notification-thread] Added new notification channel client socket (%i) to poll set",
2193 client->socket);
2194
ab0ee2ca
JG
2195 rcu_read_lock();
2196 cds_lfht_add(state->client_socket_ht,
2197 hash_client_socket(client->socket),
2198 &client->client_socket_ht_node);
2199 rcu_read_unlock();
2200
2201 return ret;
2202error:
2203 notification_client_destroy(client, state);
2204 return ret;
2205}
2206
2207int handle_notification_thread_client_disconnect(
2208 int client_socket,
2209 struct notification_thread_state *state)
2210{
2211 int ret = 0;
2212 struct notification_client *client;
2213
2214 rcu_read_lock();
2215 DBG("[notification-thread] Closing client connection (socket fd = %i)",
2216 client_socket);
2217 client = get_client_from_socket(client_socket, state);
2218 if (!client) {
2219 /* Internal state corruption, fatal error. */
2220 ERR("[notification-thread] Unable to find client (socket fd = %i)",
2221 client_socket);
2222 ret = -1;
2223 goto end;
2224 }
2225
2226 ret = lttng_poll_del(&state->events, client_socket);
2227 if (ret) {
2228 ERR("[notification-thread] Failed to remove client socket from poll set");
2229 }
2230 cds_lfht_del(state->client_socket_ht,
2231 &client->client_socket_ht_node);
2232 notification_client_destroy(client, state);
2233end:
2234 rcu_read_unlock();
2235 return ret;
2236}
2237
2238int handle_notification_thread_client_disconnect_all(
2239 struct notification_thread_state *state)
2240{
2241 struct cds_lfht_iter iter;
2242 struct notification_client *client;
2243 bool error_encoutered = false;
2244
2245 rcu_read_lock();
2246 DBG("[notification-thread] Closing all client connections");
2247 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
2248 client_socket_ht_node) {
2249 int ret;
2250
2251 ret = handle_notification_thread_client_disconnect(
2252 client->socket, state);
2253 if (ret) {
2254 error_encoutered = true;
2255 }
2256 }
2257 rcu_read_unlock();
2258 return error_encoutered ? 1 : 0;
2259}
2260
2261int handle_notification_thread_trigger_unregister_all(
2262 struct notification_thread_state *state)
2263{
4149ace8 2264 bool error_occurred = false;
ab0ee2ca
JG
2265 struct cds_lfht_iter iter;
2266 struct lttng_trigger_ht_element *trigger_ht_element;
2267
2268 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
2269 node) {
2270 int ret = handle_notification_thread_command_unregister_trigger(
2271 state, trigger_ht_element->trigger, NULL);
2272 if (ret) {
4149ace8 2273 error_occurred = true;
ab0ee2ca
JG
2274 }
2275 }
4149ace8 2276 return error_occurred ? -1 : 0;
ab0ee2ca
JG
2277}
2278
2279static
2280int client_flush_outgoing_queue(struct notification_client *client,
2281 struct notification_thread_state *state)
2282{
2283 ssize_t ret;
2284 size_t to_send_count;
2285
2286 assert(client->communication.outbound.buffer.size != 0);
2287 to_send_count = client->communication.outbound.buffer.size;
2288 DBG("[notification-thread] Flushing client (socket fd = %i) outgoing queue",
2289 client->socket);
2290
2291 ret = lttcomm_send_unix_sock_non_block(client->socket,
2292 client->communication.outbound.buffer.data,
2293 to_send_count);
2294 if ((ret < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) ||
2295 (ret > 0 && ret < to_send_count)) {
2296 DBG("[notification-thread] Client (socket fd = %i) outgoing queue could not be completely flushed",
2297 client->socket);
2298 to_send_count -= max(ret, 0);
2299
2300 memcpy(client->communication.outbound.buffer.data,
2301 client->communication.outbound.buffer.data +
2302 client->communication.outbound.buffer.size - to_send_count,
2303 to_send_count);
2304 ret = lttng_dynamic_buffer_set_size(
2305 &client->communication.outbound.buffer,
2306 to_send_count);
2307 if (ret) {
2308 goto error;
2309 }
2310
2311 /*
2312 * We want to be notified whenever there is buffer space
2313 * available to send the rest of the payload.
2314 */
2315 ret = lttng_poll_mod(&state->events, client->socket,
2316 CLIENT_POLL_MASK_IN_OUT);
2317 if (ret) {
2318 goto error;
2319 }
2320 } else if (ret < 0) {
2321 /* Generic error, disconnect the client. */
2322 ERR("[notification-thread] Failed to send flush outgoing queue, disconnecting client (socket fd = %i)",
2323 client->socket);
2324 ret = handle_notification_thread_client_disconnect(
2325 client->socket, state);
2326 if (ret) {
2327 goto error;
2328 }
2329 } else {
2330 /* No error and flushed the queue completely. */
2331 ret = lttng_dynamic_buffer_set_size(
2332 &client->communication.outbound.buffer, 0);
2333 if (ret) {
2334 goto error;
2335 }
2336 ret = lttng_poll_mod(&state->events, client->socket,
2337 CLIENT_POLL_MASK_IN);
2338 if (ret) {
2339 goto error;
2340 }
2341
2342 client->communication.outbound.queued_command_reply = false;
2343 client->communication.outbound.dropped_notification = false;
2344 }
2345
2346 return 0;
2347error:
2348 return -1;
2349}
2350
2351static
2352int client_send_command_reply(struct notification_client *client,
2353 struct notification_thread_state *state,
2354 enum lttng_notification_channel_status status)
2355{
2356 int ret;
2357 struct lttng_notification_channel_command_reply reply = {
2358 .status = (int8_t) status,
2359 };
2360 struct lttng_notification_channel_message msg = {
2361 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY,
2362 .size = sizeof(reply),
2363 };
2364 char buffer[sizeof(msg) + sizeof(reply)];
2365
2366 if (client->communication.outbound.queued_command_reply) {
2367 /* Protocol error. */
2368 goto error;
2369 }
2370
2371 memcpy(buffer, &msg, sizeof(msg));
2372 memcpy(buffer + sizeof(msg), &reply, sizeof(reply));
2373 DBG("[notification-thread] Send command reply (%i)", (int) status);
2374
2375 /* Enqueue buffer to outgoing queue and flush it. */
2376 ret = lttng_dynamic_buffer_append(
2377 &client->communication.outbound.buffer,
2378 buffer, sizeof(buffer));
2379 if (ret) {
2380 goto error;
2381 }
2382
2383 ret = client_flush_outgoing_queue(client, state);
2384 if (ret) {
2385 goto error;
2386 }
2387
2388 if (client->communication.outbound.buffer.size != 0) {
2389 /* Queue could not be emptied. */
2390 client->communication.outbound.queued_command_reply = true;
2391 }
2392
2393 return 0;
2394error:
2395 return -1;
2396}
2397
2398static
2399int client_dispatch_message(struct notification_client *client,
2400 struct notification_thread_state *state)
2401{
2402 int ret = 0;
2403
2404 if (client->communication.inbound.msg_type !=
2405 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE &&
2406 client->communication.inbound.msg_type !=
2407 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN &&
2408 !client->validated) {
2409 WARN("[notification-thread] client attempted a command before handshake");
2410 ret = -1;
2411 goto end;
2412 }
2413
2414 switch (client->communication.inbound.msg_type) {
2415 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN:
2416 {
2417 /*
2418 * Receiving message header. The function will be called again
2419 * once the rest of the message as been received and can be
2420 * interpreted.
2421 */
2422 const struct lttng_notification_channel_message *msg;
2423
2424 assert(sizeof(*msg) ==
2425 client->communication.inbound.buffer.size);
2426 msg = (const struct lttng_notification_channel_message *)
2427 client->communication.inbound.buffer.data;
2428
2429 if (msg->size == 0 || msg->size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) {
2430 ERR("[notification-thread] Invalid notification channel message: length = %u", msg->size);
2431 ret = -1;
2432 goto end;
2433 }
2434
2435 switch (msg->type) {
2436 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
2437 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
2438 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
2439 break;
2440 default:
2441 ret = -1;
2442 ERR("[notification-thread] Invalid notification channel message: unexpected message type");
2443 goto end;
2444 }
2445
2446 client->communication.inbound.bytes_to_receive = msg->size;
2447 client->communication.inbound.msg_type =
2448 (enum lttng_notification_channel_message_type) msg->type;
ab0ee2ca 2449 ret = lttng_dynamic_buffer_set_size(
14fa22f8 2450 &client->communication.inbound.buffer, msg->size);
ab0ee2ca
JG
2451 if (ret) {
2452 goto end;
2453 }
2454 break;
2455 }
2456 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
2457 {
2458 struct lttng_notification_channel_command_handshake *handshake_client;
2459 struct lttng_notification_channel_command_handshake handshake_reply = {
2460 .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
2461 .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
2462 };
2463 struct lttng_notification_channel_message msg_header = {
2464 .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE,
2465 .size = sizeof(handshake_reply),
2466 };
2467 enum lttng_notification_channel_status status =
2468 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
2469 char send_buffer[sizeof(msg_header) + sizeof(handshake_reply)];
2470
2471 memcpy(send_buffer, &msg_header, sizeof(msg_header));
2472 memcpy(send_buffer + sizeof(msg_header), &handshake_reply,
2473 sizeof(handshake_reply));
2474
2475 handshake_client =
2476 (struct lttng_notification_channel_command_handshake *)
2477 client->communication.inbound.buffer.data;
47a32869 2478 client->major = handshake_client->major;
ab0ee2ca
JG
2479 client->minor = handshake_client->minor;
2480 if (!client->communication.inbound.creds_received) {
2481 ERR("[notification-thread] No credentials received from client");
2482 ret = -1;
2483 goto end;
2484 }
2485
2486 client->uid = LTTNG_SOCK_GET_UID_CRED(
2487 &client->communication.inbound.creds);
2488 client->gid = LTTNG_SOCK_GET_GID_CRED(
2489 &client->communication.inbound.creds);
2490 DBG("[notification-thread] Received handshake from client (uid = %u, gid = %u) with version %i.%i",
2491 client->uid, client->gid, (int) client->major,
2492 (int) client->minor);
2493
2494 if (handshake_client->major != LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) {
2495 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNSUPPORTED_VERSION;
2496 }
2497
2498 ret = lttng_dynamic_buffer_append(&client->communication.outbound.buffer,
2499 send_buffer, sizeof(send_buffer));
2500 if (ret) {
2501 ERR("[notification-thread] Failed to send protocol version to notification channel client");
2502 goto end;
2503 }
2504
2505 ret = client_flush_outgoing_queue(client, state);
2506 if (ret) {
2507 goto end;
2508 }
2509
2510 ret = client_send_command_reply(client, state, status);
2511 if (ret) {
2512 ERR("[notification-thread] Failed to send reply to notification channel client");
2513 goto end;
2514 }
2515
2516 /* Set reception state to receive the next message header. */
14fa22f8
JG
2517 ret = client_reset_inbound_state(client);
2518 if (ret) {
2519 ERR("[notification-thread] Failed to reset client communication's inbound state");
2520 goto end;
2521 }
ab0ee2ca
JG
2522 client->validated = true;
2523 break;
2524 }
2525 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
2526 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
2527 {
2528 struct lttng_condition *condition;
2529 enum lttng_notification_channel_status status =
2530 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
2531 const struct lttng_buffer_view condition_view =
2532 lttng_buffer_view_from_dynamic_buffer(
2533 &client->communication.inbound.buffer,
2534 0, -1);
2535 size_t expected_condition_size =
2536 client->communication.inbound.buffer.size;
2537
2538 ret = lttng_condition_create_from_buffer(&condition_view,
2539 &condition);
2540 if (ret != expected_condition_size) {
2541 ERR("[notification-thread] Malformed condition received from client");
2542 goto end;
2543 }
2544
2545 if (client->communication.inbound.msg_type ==
2546 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE) {
ab0ee2ca
JG
2547 ret = notification_thread_client_subscribe(client,
2548 condition, state, &status);
2549 } else {
2550 ret = notification_thread_client_unsubscribe(client,
2551 condition, state, &status);
2552 }
2553 if (ret) {
2554 goto end;
2555 }
2556
2557 ret = client_send_command_reply(client, state, status);
2558 if (ret) {
2559 ERR("[notification-thread] Failed to send reply to notification channel client");
2560 goto end;
2561 }
2562
2563 /* Set reception state to receive the next message header. */
14fa22f8
JG
2564 ret = client_reset_inbound_state(client);
2565 if (ret) {
2566 ERR("[notification-thread] Failed to reset client communication's inbound state");
2567 goto end;
2568 }
ab0ee2ca
JG
2569 break;
2570 }
2571 default:
2572 abort();
2573 }
2574end:
2575 return ret;
2576}
2577
2578/* Incoming data from client. */
2579int handle_notification_thread_client_in(
2580 struct notification_thread_state *state, int socket)
2581{
14fa22f8 2582 int ret = 0;
ab0ee2ca
JG
2583 struct notification_client *client;
2584 ssize_t recv_ret;
2585 size_t offset;
2586
2587 client = get_client_from_socket(socket, state);
2588 if (!client) {
2589 /* Internal error, abort. */
2590 ret = -1;
2591 goto end;
2592 }
2593
14fa22f8
JG
2594 offset = client->communication.inbound.buffer.size -
2595 client->communication.inbound.bytes_to_receive;
01ea340e 2596 if (client->communication.inbound.expect_creds) {
ab0ee2ca
JG
2597 recv_ret = lttcomm_recv_creds_unix_sock(socket,
2598 client->communication.inbound.buffer.data + offset,
2599 client->communication.inbound.bytes_to_receive,
2600 &client->communication.inbound.creds);
2601 if (recv_ret > 0) {
01ea340e 2602 client->communication.inbound.expect_creds = false;
ab0ee2ca
JG
2603 client->communication.inbound.creds_received = true;
2604 }
2605 } else {
2606 recv_ret = lttcomm_recv_unix_sock_non_block(socket,
2607 client->communication.inbound.buffer.data + offset,
2608 client->communication.inbound.bytes_to_receive);
2609 }
2610 if (recv_ret < 0) {
2611 goto error_disconnect_client;
2612 }
2613
2614 client->communication.inbound.bytes_to_receive -= recv_ret;
ab0ee2ca
JG
2615 if (client->communication.inbound.bytes_to_receive == 0) {
2616 ret = client_dispatch_message(client, state);
2617 if (ret) {
2618 /*
2619 * Only returns an error if this client must be
2620 * disconnected.
2621 */
2622 goto error_disconnect_client;
2623 }
2624 } else {
2625 goto end;
2626 }
2627end:
2628 return ret;
2629error_disconnect_client:
2630 ret = handle_notification_thread_client_disconnect(socket, state);
2631 return ret;
2632}
2633
2634/* Client ready to receive outgoing data. */
2635int handle_notification_thread_client_out(
2636 struct notification_thread_state *state, int socket)
2637{
2638 int ret;
2639 struct notification_client *client;
2640
2641 client = get_client_from_socket(socket, state);
2642 if (!client) {
2643 /* Internal error, abort. */
2644 ret = -1;
2645 goto end;
2646 }
2647
2648 ret = client_flush_outgoing_queue(client, state);
2649 if (ret) {
2650 goto end;
2651 }
2652end:
2653 return ret;
2654}
2655
2656static
e8360425
JD
2657bool evaluate_buffer_usage_condition(const struct lttng_condition *condition,
2658 const struct channel_state_sample *sample,
2659 uint64_t buffer_capacity)
ab0ee2ca
JG
2660{
2661 bool result = false;
2662 uint64_t threshold;
2663 enum lttng_condition_type condition_type;
e8360425 2664 const struct lttng_condition_buffer_usage *use_condition = container_of(
ab0ee2ca
JG
2665 condition, struct lttng_condition_buffer_usage,
2666 parent);
2667
ab0ee2ca
JG
2668 if (use_condition->threshold_bytes.set) {
2669 threshold = use_condition->threshold_bytes.value;
2670 } else {
2671 /*
2672 * Threshold was expressed as a ratio.
2673 *
2674 * TODO the threshold (in bytes) of conditions expressed
2675 * as a ratio of total buffer size could be cached to
2676 * forego this double-multiplication or it could be performed
2677 * as fixed-point math.
2678 *
2679 * Note that caching should accomodate the case where the
2680 * condition applies to multiple channels (i.e. don't assume
2681 * that all channels matching my_chann* have the same size...)
2682 */
2683 threshold = (uint64_t) (use_condition->threshold_ratio.value *
2684 (double) buffer_capacity);
2685 }
2686
2687 condition_type = lttng_condition_get_type(condition);
2688 if (condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW) {
2689 DBG("[notification-thread] Low buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
2690 threshold, sample->highest_usage);
2691
2692 /*
2693 * The low condition should only be triggered once _all_ of the
2694 * streams in a channel have gone below the "low" threshold.
2695 */
2696 if (sample->highest_usage <= threshold) {
2697 result = true;
2698 }
2699 } else {
2700 DBG("[notification-thread] High buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
2701 threshold, sample->highest_usage);
2702
2703 /*
2704 * For high buffer usage scenarios, we want to trigger whenever
2705 * _any_ of the streams has reached the "high" threshold.
2706 */
2707 if (sample->highest_usage >= threshold) {
2708 result = true;
2709 }
2710 }
e8360425 2711
ab0ee2ca
JG
2712 return result;
2713}
2714
2715static
e8360425
JD
2716bool evaluate_session_consumed_size_condition(
2717 const struct lttng_condition *condition,
2718 uint64_t session_consumed_size)
2719{
2720 uint64_t threshold;
2721 const struct lttng_condition_session_consumed_size *size_condition =
2722 container_of(condition,
2723 struct lttng_condition_session_consumed_size,
2724 parent);
2725
2726 threshold = size_condition->consumed_threshold_bytes.value;
2727 DBG("[notification-thread] Session consumed size condition being evaluated: threshold = %" PRIu64 ", current size = %" PRIu64,
2728 threshold, session_consumed_size);
2729 return session_consumed_size >= threshold;
2730}
2731
2732static
2733int evaluate_condition(const struct lttng_condition *condition,
ab0ee2ca 2734 struct lttng_evaluation **evaluation,
e8360425
JD
2735 const struct notification_thread_state *state,
2736 const struct channel_state_sample *previous_sample,
2737 const struct channel_state_sample *latest_sample,
2738 uint64_t previous_session_consumed_total,
2739 uint64_t latest_session_consumed_total,
2740 struct channel_info *channel_info)
ab0ee2ca
JG
2741{
2742 int ret = 0;
2743 enum lttng_condition_type condition_type;
e8360425
JD
2744 const bool previous_sample_available = !!previous_sample;
2745 bool previous_sample_result = false;
ab0ee2ca
JG
2746 bool latest_sample_result;
2747
2748 condition_type = lttng_condition_get_type(condition);
ab0ee2ca 2749
e8360425
JD
2750 switch (condition_type) {
2751 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
2752 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
2753 if (caa_likely(previous_sample_available)) {
2754 previous_sample_result =
2755 evaluate_buffer_usage_condition(condition,
2756 previous_sample, channel_info->capacity);
2757 }
2758 latest_sample_result = evaluate_buffer_usage_condition(
2759 condition, latest_sample,
2760 channel_info->capacity);
2761 break;
2762 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
2763 if (caa_likely(previous_sample_available)) {
2764 previous_sample_result =
2765 evaluate_session_consumed_size_condition(
2766 condition,
2767 previous_session_consumed_total);
2768 }
2769 latest_sample_result =
2770 evaluate_session_consumed_size_condition(
2771 condition,
2772 latest_session_consumed_total);
2773 break;
2774 default:
2775 /* Unknown condition type; internal error. */
2776 abort();
2777 }
ab0ee2ca
JG
2778
2779 if (!latest_sample_result ||
2780 (previous_sample_result == latest_sample_result)) {
2781 /*
2782 * Only trigger on a condition evaluation transition.
2783 *
2784 * NOTE: This edge-triggered logic may not be appropriate for
2785 * future condition types.
2786 */
2787 goto end;
2788 }
2789
e8360425
JD
2790 if (!evaluation || !latest_sample_result) {
2791 goto end;
2792 }
2793
2794 switch (condition_type) {
2795 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
2796 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
ab0ee2ca
JG
2797 *evaluation = lttng_evaluation_buffer_usage_create(
2798 condition_type,
2799 latest_sample->highest_usage,
e8360425
JD
2800 channel_info->capacity);
2801 break;
2802 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
2803 *evaluation = lttng_evaluation_session_consumed_size_create(
e8360425
JD
2804 latest_session_consumed_total);
2805 break;
2806 default:
2807 abort();
2808 }
2809
2810 if (!*evaluation) {
2811 ret = -1;
2812 goto end;
ab0ee2ca
JG
2813 }
2814end:
2815 return ret;
2816}
2817
2818static
2819int client_enqueue_dropped_notification(struct notification_client *client,
2820 struct notification_thread_state *state)
2821{
2822 int ret;
2823 struct lttng_notification_channel_message msg = {
2824 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED,
2825 .size = 0,
2826 };
2827
2828 ret = lttng_dynamic_buffer_append(
2829 &client->communication.outbound.buffer, &msg,
2830 sizeof(msg));
2831 return ret;
2832}
2833
2834static
9b63a4aa
JG
2835int send_evaluation_to_clients(const struct lttng_trigger *trigger,
2836 const struct lttng_evaluation *evaluation,
ab0ee2ca
JG
2837 struct notification_client_list* client_list,
2838 struct notification_thread_state *state,
2839 uid_t channel_uid, gid_t channel_gid)
2840{
2841 int ret = 0;
2842 struct lttng_dynamic_buffer msg_buffer;
2843 struct notification_client_list_element *client_list_element, *tmp;
9b63a4aa
JG
2844 const struct lttng_notification notification = {
2845 .condition = (struct lttng_condition *) lttng_trigger_get_const_condition(trigger),
2846 .evaluation = (struct lttng_evaluation *) evaluation,
2847 };
3647288f
JG
2848 struct lttng_notification_channel_message msg_header = {
2849 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION,
2850 };
ab0ee2ca
JG
2851
2852 lttng_dynamic_buffer_init(&msg_buffer);
2853
3647288f
JG
2854 ret = lttng_dynamic_buffer_append(&msg_buffer, &msg_header,
2855 sizeof(msg_header));
ab0ee2ca
JG
2856 if (ret) {
2857 goto end;
2858 }
2859
9b63a4aa 2860 ret = lttng_notification_serialize(&notification, &msg_buffer);
ab0ee2ca 2861 if (ret) {
ab0ee2ca
JG
2862 ERR("[notification-thread] Failed to serialize notification");
2863 ret = -1;
2864 goto end;
2865 }
2866
3647288f
JG
2867 /* Update payload size. */
2868 ((struct lttng_notification_channel_message * ) msg_buffer.data)->size =
2869 (uint32_t) (msg_buffer.size - sizeof(msg_header));
2870
ab0ee2ca
JG
2871 cds_list_for_each_entry_safe(client_list_element, tmp,
2872 &client_list->list, node) {
2873 struct notification_client *client =
2874 client_list_element->client;
2875
2876 if (client->uid != channel_uid && client->gid != channel_gid &&
2877 client->uid != 0) {
2878 /* Client is not allowed to monitor this channel. */
2879 DBG("[notification-thread] Skipping client at it does not have the permission to receive notification for this channel");
2880 continue;
2881 }
2882
2883 DBG("[notification-thread] Sending notification to client (fd = %i, %zu bytes)",
2884 client->socket, msg_buffer.size);
2885 if (client->communication.outbound.buffer.size) {
2886 /*
2887 * Outgoing data is already buffered for this client;
2888 * drop the notification and enqueue a "dropped
2889 * notification" message if this is the first dropped
2890 * notification since the socket spilled-over to the
2891 * queue.
2892 */
2893 DBG("[notification-thread] Dropping notification addressed to client (socket fd = %i)",
2894 client->socket);
2895 if (!client->communication.outbound.dropped_notification) {
2896 client->communication.outbound.dropped_notification = true;
2897 ret = client_enqueue_dropped_notification(
2898 client, state);
2899 if (ret) {
2900 goto end;
2901 }
2902 }
2903 continue;
2904 }
2905
2906 ret = lttng_dynamic_buffer_append_buffer(
2907 &client->communication.outbound.buffer,
2908 &msg_buffer);
2909 if (ret) {
2910 goto end;
2911 }
2912
2913 ret = client_flush_outgoing_queue(client, state);
2914 if (ret) {
2915 goto end;
2916 }
2917 }
2918 ret = 0;
2919end:
ab0ee2ca
JG
2920 lttng_dynamic_buffer_reset(&msg_buffer);
2921 return ret;
2922}
2923
2924int handle_notification_thread_channel_sample(
2925 struct notification_thread_state *state, int pipe,
2926 enum lttng_domain_type domain)
2927{
2928 int ret = 0;
2929 struct lttcomm_consumer_channel_monitor_msg sample_msg;
ab0ee2ca
JG
2930 struct channel_info *channel_info;
2931 struct cds_lfht_node *node;
2932 struct cds_lfht_iter iter;
2933 struct lttng_channel_trigger_list *trigger_list;
2934 struct lttng_trigger_list_element *trigger_list_element;
2935 bool previous_sample_available = false;
e8360425
JD
2936 struct channel_state_sample previous_sample, latest_sample;
2937 uint64_t previous_session_consumed_total, latest_session_consumed_total;
ab0ee2ca
JG
2938
2939 /*
2940 * The monitoring pipe only holds messages smaller than PIPE_BUF,
2941 * ensuring that read/write of sampling messages are atomic.
2942 */
7c2551ef 2943 ret = lttng_read(pipe, &sample_msg, sizeof(sample_msg));
ab0ee2ca
JG
2944 if (ret != sizeof(sample_msg)) {
2945 ERR("[notification-thread] Failed to read from monitoring pipe (fd = %i)",
2946 pipe);
2947 ret = -1;
2948 goto end;
2949 }
2950
2951 ret = 0;
2952 latest_sample.key.key = sample_msg.key;
2953 latest_sample.key.domain = domain;
2954 latest_sample.highest_usage = sample_msg.highest;
2955 latest_sample.lowest_usage = sample_msg.lowest;
e8360425 2956 latest_sample.channel_total_consumed = sample_msg.total_consumed;
ab0ee2ca
JG
2957
2958 rcu_read_lock();
2959
2960 /* Retrieve the channel's informations */
2961 cds_lfht_lookup(state->channels_ht,
2962 hash_channel_key(&latest_sample.key),
2963 match_channel_info,
2964 &latest_sample.key,
2965 &iter);
2966 node = cds_lfht_iter_get_node(&iter);
e0b5f87b 2967 if (caa_unlikely(!node)) {
ab0ee2ca
JG
2968 /*
2969 * Not an error since the consumer can push a sample to the pipe
2970 * and the rest of the session daemon could notify us of the
2971 * channel's destruction before we get a chance to process that
2972 * sample.
2973 */
2974 DBG("[notification-thread] Received a sample for an unknown channel from consumerd, key = %" PRIu64 " in %s domain",
2975 latest_sample.key.key,
2976 domain == LTTNG_DOMAIN_KERNEL ? "kernel" :
2977 "user space");
2978 goto end_unlock;
2979 }
2980 channel_info = caa_container_of(node, struct channel_info,
2981 channels_ht_node);
e8360425 2982 DBG("[notification-thread] Handling channel sample for channel %s (key = %" PRIu64 ") in session %s (highest usage = %" PRIu64 ", lowest usage = %" PRIu64", total consumed = %" PRIu64")",
8abe313a 2983 channel_info->name,
ab0ee2ca 2984 latest_sample.key.key,
8abe313a 2985 channel_info->session_info->name,
ab0ee2ca 2986 latest_sample.highest_usage,
e8360425
JD
2987 latest_sample.lowest_usage,
2988 latest_sample.channel_total_consumed);
2989
2990 previous_session_consumed_total =
2991 channel_info->session_info->consumed_data_size;
ab0ee2ca
JG
2992
2993 /* Retrieve the channel's last sample, if it exists, and update it. */
2994 cds_lfht_lookup(state->channel_state_ht,
2995 hash_channel_key(&latest_sample.key),
2996 match_channel_state_sample,
2997 &latest_sample.key,
2998 &iter);
2999 node = cds_lfht_iter_get_node(&iter);
e0b5f87b 3000 if (caa_likely(node)) {
ab0ee2ca
JG
3001 struct channel_state_sample *stored_sample;
3002
3003 /* Update the sample stored. */
3004 stored_sample = caa_container_of(node,
3005 struct channel_state_sample,
3006 channel_state_ht_node);
e8360425 3007
ab0ee2ca
JG
3008 memcpy(&previous_sample, stored_sample,
3009 sizeof(previous_sample));
3010 stored_sample->highest_usage = latest_sample.highest_usage;
3011 stored_sample->lowest_usage = latest_sample.lowest_usage;
0f0479d7 3012 stored_sample->channel_total_consumed = latest_sample.channel_total_consumed;
ab0ee2ca 3013 previous_sample_available = true;
e8360425
JD
3014
3015 latest_session_consumed_total =
3016 previous_session_consumed_total +
3017 (latest_sample.channel_total_consumed - previous_sample.channel_total_consumed);
ab0ee2ca
JG
3018 } else {
3019 /*
3020 * This is the channel's first sample, allocate space for and
3021 * store the new sample.
3022 */
3023 struct channel_state_sample *stored_sample;
3024
3025 stored_sample = zmalloc(sizeof(*stored_sample));
3026 if (!stored_sample) {
3027 ret = -1;
3028 goto end_unlock;
3029 }
3030
3031 memcpy(stored_sample, &latest_sample, sizeof(*stored_sample));
3032 cds_lfht_node_init(&stored_sample->channel_state_ht_node);
3033 cds_lfht_add(state->channel_state_ht,
3034 hash_channel_key(&stored_sample->key),
3035 &stored_sample->channel_state_ht_node);
e8360425
JD
3036
3037 latest_session_consumed_total =
3038 previous_session_consumed_total +
3039 latest_sample.channel_total_consumed;
ab0ee2ca
JG
3040 }
3041
e8360425
JD
3042 channel_info->session_info->consumed_data_size =
3043 latest_session_consumed_total;
3044
ab0ee2ca
JG
3045 /* Find triggers associated with this channel. */
3046 cds_lfht_lookup(state->channel_triggers_ht,
3047 hash_channel_key(&latest_sample.key),
3048 match_channel_trigger_list,
3049 &latest_sample.key,
3050 &iter);
3051 node = cds_lfht_iter_get_node(&iter);
e0b5f87b 3052 if (caa_likely(!node)) {
ab0ee2ca
JG
3053 goto end_unlock;
3054 }
3055
3056 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
3057 channel_triggers_ht_node);
3058 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
3059 node) {
9b63a4aa
JG
3060 const struct lttng_condition *condition;
3061 const struct lttng_action *action;
3062 const struct lttng_trigger *trigger;
ab0ee2ca
JG
3063 struct notification_client_list *client_list;
3064 struct lttng_evaluation *evaluation = NULL;
3065
3066 trigger = trigger_list_element->trigger;
9b63a4aa 3067 condition = lttng_trigger_get_const_condition(trigger);
ab0ee2ca 3068 assert(condition);
9b63a4aa 3069 action = lttng_trigger_get_const_action(trigger);
ab0ee2ca
JG
3070
3071 /* Notify actions are the only type currently supported. */
9b63a4aa 3072 assert(lttng_action_get_type_const(action) ==
ab0ee2ca
JG
3073 LTTNG_ACTION_TYPE_NOTIFY);
3074
3075 /*
3076 * Check if any client is subscribed to the result of this
3077 * evaluation.
3078 */
3079 cds_lfht_lookup(state->notification_trigger_clients_ht,
3080 lttng_condition_hash(condition),
3081 match_client_list,
3082 trigger,
3083 &iter);
3084 node = cds_lfht_iter_get_node(&iter);
3085 assert(node);
3086
3087 client_list = caa_container_of(node,
3088 struct notification_client_list,
3089 notification_trigger_ht_node);
3090 if (cds_list_empty(&client_list->list)) {
3091 /*
3092 * No clients interested in the evaluation's result,
3093 * skip it.
3094 */
3095 continue;
3096 }
3097
3098 ret = evaluate_condition(condition, &evaluation, state,
3099 previous_sample_available ? &previous_sample : NULL,
e8360425
JD
3100 &latest_sample,
3101 previous_session_consumed_total,
3102 latest_session_consumed_total,
3103 channel_info);
3104 if (caa_unlikely(ret)) {
ab0ee2ca
JG
3105 goto end_unlock;
3106 }
3107
e8360425 3108 if (caa_likely(!evaluation)) {
ab0ee2ca
JG
3109 continue;
3110 }
3111
3112 /* Dispatch evaluation result to all clients. */
3113 ret = send_evaluation_to_clients(trigger_list_element->trigger,
3114 evaluation, client_list, state,
8abe313a
JG
3115 channel_info->session_info->uid,
3116 channel_info->session_info->gid);
3117 lttng_evaluation_destroy(evaluation);
e0b5f87b 3118 if (caa_unlikely(ret)) {
ab0ee2ca
JG
3119 goto end_unlock;
3120 }
3121 }
3122end_unlock:
3123 rcu_read_unlock();
3124end:
3125 return ret;
3126}
This page took 0.334124 seconds and 4 git commands to generate.