Fix: previous channel total is not updated
[lttng-tools.git] / src / bin / lttng-sessiond / notification-thread-events.c
CommitLineData
ab0ee2ca
JG
1/*
2 * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18#define _LGPL_SOURCE
19#include <urcu.h>
20#include <urcu/rculfhash.h>
21
ab0ee2ca
JG
22#include <common/defaults.h>
23#include <common/error.h>
24#include <common/futex.h>
25#include <common/unix.h>
26#include <common/dynamic-buffer.h>
27#include <common/hashtable/utils.h>
28#include <common/sessiond-comm/sessiond-comm.h>
29#include <common/macros.h>
30#include <lttng/condition/condition.h>
31#include <lttng/action/action.h>
32#include <lttng/notification/notification-internal.h>
33#include <lttng/condition/condition-internal.h>
34#include <lttng/condition/buffer-usage-internal.h>
35#include <lttng/notification/channel-internal.h>
1da26331 36
ab0ee2ca
JG
37#include <time.h>
38#include <unistd.h>
39#include <assert.h>
40#include <inttypes.h>
d53ea4e4 41#include <fcntl.h>
ab0ee2ca 42
1da26331
JG
43#include "notification-thread.h"
44#include "notification-thread-events.h"
45#include "notification-thread-commands.h"
46#include "lttng-sessiond.h"
47#include "kernel.h"
48
ab0ee2ca
JG
49#define CLIENT_POLL_MASK_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
50#define CLIENT_POLL_MASK_IN_OUT (CLIENT_POLL_MASK_IN | LPOLLOUT)
51
52struct lttng_trigger_list_element {
53 struct lttng_trigger *trigger;
54 struct cds_list_head node;
55};
56
57struct lttng_channel_trigger_list {
58 struct channel_key channel_key;
59 struct cds_list_head list;
60 struct cds_lfht_node channel_triggers_ht_node;
61};
62
63struct lttng_trigger_ht_element {
64 struct lttng_trigger *trigger;
65 struct cds_lfht_node node;
66};
67
68struct lttng_condition_list_element {
69 struct lttng_condition *condition;
70 struct cds_list_head node;
71};
72
73struct notification_client_list_element {
74 struct notification_client *client;
75 struct cds_list_head node;
76};
77
78struct notification_client_list {
79 struct lttng_trigger *trigger;
80 struct cds_list_head list;
81 struct cds_lfht_node notification_trigger_ht_node;
82};
83
84struct notification_client {
85 int socket;
86 /* Client protocol version. */
87 uint8_t major, minor;
88 uid_t uid;
89 gid_t gid;
90 /*
5332364f 91 * Indicates if the credentials and versions of the client have been
ab0ee2ca
JG
92 * checked.
93 */
94 bool validated;
95 /*
96 * Conditions to which the client's notification channel is subscribed.
97 * List of struct lttng_condition_list_node. The condition member is
98 * owned by the client.
99 */
100 struct cds_list_head condition_list;
101 struct cds_lfht_node client_socket_ht_node;
102 struct {
103 struct {
14fa22f8
JG
104 /*
105 * During the reception of a message, the reception
106 * buffers' "size" is set to contain the current
107 * message's complete payload.
108 */
ab0ee2ca
JG
109 struct lttng_dynamic_buffer buffer;
110 /* Bytes left to receive for the current message. */
111 size_t bytes_to_receive;
112 /* Type of the message being received. */
113 enum lttng_notification_channel_message_type msg_type;
114 /*
115 * Indicates whether or not credentials are expected
116 * from the client.
117 */
01ea340e 118 bool expect_creds;
ab0ee2ca
JG
119 /*
120 * Indicates whether or not credentials were received
121 * from the client.
122 */
123 bool creds_received;
14fa22f8 124 /* Only used during credentials reception. */
ab0ee2ca
JG
125 lttng_sock_cred creds;
126 } inbound;
127 struct {
128 /*
129 * Indicates whether or not a notification addressed to
130 * this client was dropped because a command reply was
131 * already buffered.
132 *
133 * A notification is dropped whenever the buffer is not
134 * empty.
135 */
136 bool dropped_notification;
137 /*
138 * Indicates whether or not a command reply is already
139 * buffered. In this case, it means that the client is
140 * not consuming command replies before emitting a new
141 * one. This could be caused by a protocol error or a
142 * misbehaving/malicious client.
143 */
144 bool queued_command_reply;
145 struct lttng_dynamic_buffer buffer;
146 } outbound;
147 } communication;
148};
149
150struct channel_state_sample {
151 struct channel_key key;
152 struct cds_lfht_node channel_state_ht_node;
153 uint64_t highest_usage;
154 uint64_t lowest_usage;
155};
156
e4db5ace
JR
157static unsigned long hash_channel_key(struct channel_key *key);
158static int evaluate_condition(struct lttng_condition *condition,
159 struct lttng_evaluation **evaluation,
160 struct notification_thread_state *state,
161 struct channel_state_sample *previous_sample,
162 struct channel_state_sample *latest_sample,
163 uint64_t buffer_capacity);
164static
165int send_evaluation_to_clients(struct lttng_trigger *trigger,
166 struct lttng_evaluation *evaluation,
167 struct notification_client_list *client_list,
168 struct notification_thread_state *state,
169 uid_t channel_uid, gid_t channel_gid);
170
8abe313a
JG
171
172static
173void session_info_destroy(void *_data);
174static
175void session_info_get(struct session_info *session_info);
176static
177void session_info_put(struct session_info *session_info);
178static
179struct session_info *session_info_create(const char *name,
180 uid_t uid, gid_t gid);
181static
182void session_info_add_channel(struct session_info *session_info,
183 struct channel_info *channel_info);
184static
185void session_info_remove_channel(struct session_info *session_info,
186 struct channel_info *channel_info);
187
ab0ee2ca
JG
188static
189int match_client(struct cds_lfht_node *node, const void *key)
190{
191 /* This double-cast is intended to supress pointer-to-cast warning. */
192 int socket = (int) (intptr_t) key;
193 struct notification_client *client;
194
195 client = caa_container_of(node, struct notification_client,
196 client_socket_ht_node);
197
198 return !!(client->socket == socket);
199}
200
201static
202int match_channel_trigger_list(struct cds_lfht_node *node, const void *key)
203{
204 struct channel_key *channel_key = (struct channel_key *) key;
205 struct lttng_channel_trigger_list *trigger_list;
206
207 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
208 channel_triggers_ht_node);
209
210 return !!((channel_key->key == trigger_list->channel_key.key) &&
211 (channel_key->domain == trigger_list->channel_key.domain));
212}
213
214static
215int match_channel_state_sample(struct cds_lfht_node *node, const void *key)
216{
217 struct channel_key *channel_key = (struct channel_key *) key;
218 struct channel_state_sample *sample;
219
220 sample = caa_container_of(node, struct channel_state_sample,
221 channel_state_ht_node);
222
223 return !!((channel_key->key == sample->key.key) &&
224 (channel_key->domain == sample->key.domain));
225}
226
227static
228int match_channel_info(struct cds_lfht_node *node, const void *key)
229{
230 struct channel_key *channel_key = (struct channel_key *) key;
231 struct channel_info *channel_info;
232
233 channel_info = caa_container_of(node, struct channel_info,
234 channels_ht_node);
235
236 return !!((channel_key->key == channel_info->key.key) &&
237 (channel_key->domain == channel_info->key.domain));
238}
239
240static
241int match_condition(struct cds_lfht_node *node, const void *key)
242{
243 struct lttng_condition *condition_key = (struct lttng_condition *) key;
244 struct lttng_trigger_ht_element *trigger;
245 struct lttng_condition *condition;
246
247 trigger = caa_container_of(node, struct lttng_trigger_ht_element,
248 node);
249 condition = lttng_trigger_get_condition(trigger->trigger);
250 assert(condition);
251
252 return !!lttng_condition_is_equal(condition_key, condition);
253}
254
255static
256int match_client_list(struct cds_lfht_node *node, const void *key)
257{
258 struct lttng_trigger *trigger_key = (struct lttng_trigger *) key;
259 struct notification_client_list *client_list;
260 struct lttng_condition *condition;
261 struct lttng_condition *condition_key = lttng_trigger_get_condition(
262 trigger_key);
263
264 assert(condition_key);
265
266 client_list = caa_container_of(node, struct notification_client_list,
267 notification_trigger_ht_node);
268 condition = lttng_trigger_get_condition(client_list->trigger);
269
270 return !!lttng_condition_is_equal(condition_key, condition);
271}
272
273static
274int match_client_list_condition(struct cds_lfht_node *node, const void *key)
275{
276 struct lttng_condition *condition_key = (struct lttng_condition *) key;
277 struct notification_client_list *client_list;
278 struct lttng_condition *condition;
279
280 assert(condition_key);
281
282 client_list = caa_container_of(node, struct notification_client_list,
283 notification_trigger_ht_node);
284 condition = lttng_trigger_get_condition(client_list->trigger);
285
286 return !!lttng_condition_is_equal(condition_key, condition);
287}
288
289static
290unsigned long lttng_condition_buffer_usage_hash(
291 struct lttng_condition *_condition)
292{
293 unsigned long hash = 0;
294 struct lttng_condition_buffer_usage *condition;
295
296 condition = container_of(_condition,
297 struct lttng_condition_buffer_usage, parent);
298
299 if (condition->session_name) {
300 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
301 }
302 if (condition->channel_name) {
8f56701f 303 hash ^= hash_key_str(condition->channel_name, lttng_ht_seed);
ab0ee2ca
JG
304 }
305 if (condition->domain.set) {
306 hash ^= hash_key_ulong(
307 (void *) condition->domain.type,
308 lttng_ht_seed);
309 }
310 if (condition->threshold_ratio.set) {
311 uint64_t val;
312
313 val = condition->threshold_ratio.value * (double) UINT32_MAX;
314 hash ^= hash_key_u64(&val, lttng_ht_seed);
6633b0dd 315 } else if (condition->threshold_bytes.set) {
ab0ee2ca
JG
316 uint64_t val;
317
318 val = condition->threshold_bytes.value;
319 hash ^= hash_key_u64(&val, lttng_ht_seed);
320 }
321 return hash;
322}
323
324/*
325 * The lttng_condition hashing code is kept in this file (rather than
326 * condition.c) since it makes use of GPLv2 code (hashtable utils), which we
327 * don't want to link in liblttng-ctl.
328 */
329static
330unsigned long lttng_condition_hash(struct lttng_condition *condition)
331{
332 switch (condition->type) {
333 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
334 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
335 return lttng_condition_buffer_usage_hash(condition);
336 default:
337 ERR("[notification-thread] Unexpected condition type caught");
338 abort();
339 }
340}
341
e4db5ace
JR
342static
343unsigned long hash_channel_key(struct channel_key *key)
344{
345 unsigned long key_hash = hash_key_u64(&key->key, lttng_ht_seed);
346 unsigned long domain_hash = hash_key_ulong(
347 (void *) (unsigned long) key->domain, lttng_ht_seed);
348
349 return key_hash ^ domain_hash;
350}
351
ab0ee2ca
JG
352static
353void channel_info_destroy(struct channel_info *channel_info)
354{
355 if (!channel_info) {
356 return;
357 }
358
8abe313a
JG
359 if (channel_info->session_info) {
360 session_info_remove_channel(channel_info->session_info,
361 channel_info);
362 session_info_put(channel_info->session_info);
ab0ee2ca 363 }
8abe313a
JG
364 if (channel_info->name) {
365 free(channel_info->name);
ab0ee2ca
JG
366 }
367 free(channel_info);
368}
369
8abe313a 370/* Don't call directly, use the ref-counting mechanism. */
ab0ee2ca 371static
8abe313a 372void session_info_destroy(void *_data)
ab0ee2ca 373{
8abe313a 374 struct session_info *session_info = _data;
ab0ee2ca 375
8abe313a
JG
376 assert(session_info);
377 if (session_info->channel_infos_ht) {
378 cds_lfht_destroy(session_info->channel_infos_ht, NULL);
379 }
380 free(session_info->name);
381 free(session_info);
382}
ab0ee2ca 383
8abe313a
JG
384static
385void session_info_get(struct session_info *session_info)
386{
387 if (!session_info) {
388 return;
389 }
390 lttng_ref_get(&session_info->ref);
391}
392
393static
394void session_info_put(struct session_info *session_info)
395{
396 if (!session_info) {
397 return;
ab0ee2ca 398 }
8abe313a
JG
399 lttng_ref_put(&session_info->ref);
400}
401
402static
403struct session_info *session_info_create(const char *name, uid_t uid, gid_t gid)
404{
405 struct session_info *session_info;
ab0ee2ca 406
8abe313a 407 assert(name);
ab0ee2ca 408
8abe313a
JG
409 session_info = zmalloc(sizeof(*session_info));
410 if (!session_info) {
411 goto end;
412 }
413 lttng_ref_init(&session_info->ref, session_info_destroy);
414
415 session_info->channel_infos_ht = cds_lfht_new(DEFAULT_HT_SIZE,
416 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
417 if (!session_info->channel_infos_ht) {
ab0ee2ca
JG
418 goto error;
419 }
8abe313a
JG
420
421 cds_lfht_node_init(&session_info->sessions_ht_node);
422 session_info->name = strdup(name);
423 if (!session_info->name) {
ab0ee2ca
JG
424 goto error;
425 }
8abe313a
JG
426 session_info->uid = uid;
427 session_info->gid = gid;
428end:
429 return session_info;
430error:
431 session_info_put(session_info);
432 return NULL;
433}
434
435static
436void session_info_add_channel(struct session_info *session_info,
437 struct channel_info *channel_info)
438{
439 rcu_read_lock();
440 cds_lfht_add(session_info->channel_infos_ht,
441 hash_channel_key(&channel_info->key),
442 &channel_info->session_info_channels_ht_node);
443 rcu_read_unlock();
444}
445
446static
447void session_info_remove_channel(struct session_info *session_info,
448 struct channel_info *channel_info)
449{
450 rcu_read_lock();
451 cds_lfht_del(session_info->channel_infos_ht,
452 &channel_info->session_info_channels_ht_node);
453 rcu_read_unlock();
454}
455
456static
457struct channel_info *channel_info_create(const char *channel_name,
458 struct channel_key *channel_key, uint64_t channel_capacity,
459 struct session_info *session_info)
460{
461 struct channel_info *channel_info = zmalloc(sizeof(*channel_info));
462
463 if (!channel_info) {
464 goto end;
465 }
466
ab0ee2ca 467 cds_lfht_node_init(&channel_info->channels_ht_node);
8abe313a
JG
468 cds_lfht_node_init(&channel_info->session_info_channels_ht_node);
469 memcpy(&channel_info->key, channel_key, sizeof(*channel_key));
470 channel_info->capacity = channel_capacity;
471
472 channel_info->name = strdup(channel_name);
473 if (!channel_info->name) {
474 goto error;
475 }
476
477 /*
478 * Set the references between session and channel infos:
479 * - channel_info holds a strong reference to session_info
480 * - session_info holds a weak reference to channel_info
481 */
482 session_info_get(session_info);
483 session_info_add_channel(session_info, channel_info);
484 channel_info->session_info = session_info;
ab0ee2ca 485end:
8abe313a 486 return channel_info;
ab0ee2ca 487error:
8abe313a 488 channel_info_destroy(channel_info);
ab0ee2ca
JG
489 return NULL;
490}
491
e4db5ace
JR
492/* This function must be called with the RCU read lock held. */
493static
494int evaluate_condition_for_client(struct lttng_trigger *trigger,
495 struct lttng_condition *condition,
496 struct notification_client *client,
497 struct notification_thread_state *state)
498{
499 int ret;
500 struct cds_lfht_iter iter;
501 struct cds_lfht_node *node;
502 struct channel_info *channel_info = NULL;
503 struct channel_key *channel_key = NULL;
504 struct channel_state_sample *last_sample = NULL;
505 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
506 struct lttng_evaluation *evaluation = NULL;
507 struct notification_client_list client_list = { 0 };
508 struct notification_client_list_element client_list_element = { 0 };
509
510 assert(trigger);
511 assert(condition);
512 assert(client);
513 assert(state);
514
515 /* Find the channel associated with the trigger. */
516 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter,
517 channel_trigger_list , channel_triggers_ht_node) {
518 struct lttng_trigger_list_element *element;
519
520 cds_list_for_each_entry(element, &channel_trigger_list->list, node) {
521 struct lttng_condition *current_condition =
522 lttng_trigger_get_condition(
523 element->trigger);
524
525 assert(current_condition);
526 if (!lttng_condition_is_equal(condition,
2ae99f0b 527 current_condition)) {
e4db5ace
JR
528 continue;
529 }
530
531 /* Found the trigger, save the channel key. */
532 channel_key = &channel_trigger_list->channel_key;
533 break;
534 }
535 if (channel_key) {
536 /* The channel key was found stop iteration. */
537 break;
538 }
539 }
540
541 if (!channel_key){
542 /* No channel found; normal exit. */
543 DBG("[notification-thread] No channel associated with newly subscribed-to condition");
544 ret = 0;
545 goto end;
546 }
547
548 /* Fetch channel info for the matching channel. */
549 cds_lfht_lookup(state->channels_ht,
550 hash_channel_key(channel_key),
551 match_channel_info,
552 channel_key,
553 &iter);
554 node = cds_lfht_iter_get_node(&iter);
555 assert(node);
556 channel_info = caa_container_of(node, struct channel_info,
557 channels_ht_node);
558
559 /* Retrieve the channel's last sample, if it exists. */
560 cds_lfht_lookup(state->channel_state_ht,
561 hash_channel_key(channel_key),
562 match_channel_state_sample,
563 channel_key,
564 &iter);
565 node = cds_lfht_iter_get_node(&iter);
566 if (node) {
567 last_sample = caa_container_of(node,
568 struct channel_state_sample,
569 channel_state_ht_node);
570 } else {
571 /* Nothing to evaluate, no sample was ever taken. Normal exit */
572 DBG("[notification-thread] No channel sample associated with newly subscribed-to condition");
573 ret = 0;
574 goto end;
575 }
576
577 ret = evaluate_condition(condition, &evaluation, state, NULL,
578 last_sample, channel_info->capacity);
579 if (ret) {
066a3c82 580 WARN("[notification-thread] Fatal error occurred while evaluating a newly subscribed-to condition");
e4db5ace
JR
581 goto end;
582 }
583
584 if (!evaluation) {
585 /* Evaluation yielded nothing. Normal exit. */
586 DBG("[notification-thread] Newly subscribed-to condition evaluated to false, nothing to report to client");
587 ret = 0;
588 goto end;
589 }
590
591 /*
592 * Create a temporary client list with the client currently
593 * subscribing.
594 */
595 cds_lfht_node_init(&client_list.notification_trigger_ht_node);
596 CDS_INIT_LIST_HEAD(&client_list.list);
597 client_list.trigger = trigger;
598
599 CDS_INIT_LIST_HEAD(&client_list_element.node);
600 client_list_element.client = client;
601 cds_list_add(&client_list_element.node, &client_list.list);
602
603 /* Send evaluation result to the newly-subscribed client. */
604 DBG("[notification-thread] Newly subscribed-to condition evaluated to true, notifying client");
605 ret = send_evaluation_to_clients(trigger, evaluation, &client_list,
8abe313a
JG
606 state, channel_info->session_info->uid,
607 channel_info->session_info->gid);
e4db5ace
JR
608
609end:
610 return ret;
611}
612
ab0ee2ca
JG
613static
614int notification_thread_client_subscribe(struct notification_client *client,
615 struct lttng_condition *condition,
616 struct notification_thread_state *state,
617 enum lttng_notification_channel_status *_status)
618{
619 int ret = 0;
620 struct cds_lfht_iter iter;
621 struct cds_lfht_node *node;
622 struct notification_client_list *client_list;
623 struct lttng_condition_list_element *condition_list_element = NULL;
624 struct notification_client_list_element *client_list_element = NULL;
625 enum lttng_notification_channel_status status =
626 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
627
628 /*
629 * Ensure that the client has not already subscribed to this condition
630 * before.
631 */
632 cds_list_for_each_entry(condition_list_element, &client->condition_list, node) {
633 if (lttng_condition_is_equal(condition_list_element->condition,
634 condition)) {
635 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ALREADY_SUBSCRIBED;
636 goto end;
637 }
638 }
639
640 condition_list_element = zmalloc(sizeof(*condition_list_element));
641 if (!condition_list_element) {
642 ret = -1;
643 goto error;
644 }
645 client_list_element = zmalloc(sizeof(*client_list_element));
646 if (!client_list_element) {
647 ret = -1;
648 goto error;
649 }
650
651 rcu_read_lock();
652
653 /*
654 * Add the newly-subscribed condition to the client's subscription list.
655 */
656 CDS_INIT_LIST_HEAD(&condition_list_element->node);
657 condition_list_element->condition = condition;
658 cds_list_add(&condition_list_element->node, &client->condition_list);
659
ab0ee2ca
JG
660 cds_lfht_lookup(state->notification_trigger_clients_ht,
661 lttng_condition_hash(condition),
662 match_client_list_condition,
663 condition,
664 &iter);
665 node = cds_lfht_iter_get_node(&iter);
666 if (!node) {
2ae99f0b
JG
667 /*
668 * No notification-emiting trigger registered with this
669 * condition. We don't evaluate the condition right away
670 * since this trigger is not registered yet.
671 */
4fb43b68 672 free(client_list_element);
ab0ee2ca
JG
673 goto end_unlock;
674 }
675
676 client_list = caa_container_of(node, struct notification_client_list,
677 notification_trigger_ht_node);
2ae99f0b
JG
678 /*
679 * The condition to which the client just subscribed is evaluated
680 * at this point so that conditions that are already TRUE result
681 * in a notification being sent out.
682 */
e4db5ace
JR
683 if (evaluate_condition_for_client(client_list->trigger, condition,
684 client, state)) {
685 WARN("[notification-thread] Evaluation of a condition on client subscription failed, aborting.");
686 ret = -1;
687 goto end_unlock;
688 }
689
690 /*
691 * Add the client to the list of clients interested in a given trigger
692 * if a "notification" trigger with a corresponding condition was
693 * added prior.
694 */
ab0ee2ca
JG
695 client_list_element->client = client;
696 CDS_INIT_LIST_HEAD(&client_list_element->node);
697 cds_list_add(&client_list_element->node, &client_list->list);
698end_unlock:
699 rcu_read_unlock();
700end:
701 if (_status) {
702 *_status = status;
703 }
704 return ret;
705error:
706 free(condition_list_element);
707 free(client_list_element);
708 return ret;
709}
710
711static
712int notification_thread_client_unsubscribe(
713 struct notification_client *client,
714 struct lttng_condition *condition,
715 struct notification_thread_state *state,
716 enum lttng_notification_channel_status *_status)
717{
718 struct cds_lfht_iter iter;
719 struct cds_lfht_node *node;
720 struct notification_client_list *client_list;
721 struct lttng_condition_list_element *condition_list_element,
722 *condition_tmp;
723 struct notification_client_list_element *client_list_element,
724 *client_tmp;
725 bool condition_found = false;
726 enum lttng_notification_channel_status status =
727 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
728
729 /* Remove the condition from the client's condition list. */
730 cds_list_for_each_entry_safe(condition_list_element, condition_tmp,
731 &client->condition_list, node) {
732 if (!lttng_condition_is_equal(condition_list_element->condition,
733 condition)) {
734 continue;
735 }
736
737 cds_list_del(&condition_list_element->node);
738 /*
739 * The caller may be iterating on the client's conditions to
740 * tear down a client's connection. In this case, the condition
741 * will be destroyed at the end.
742 */
743 if (condition != condition_list_element->condition) {
744 lttng_condition_destroy(
745 condition_list_element->condition);
746 }
747 free(condition_list_element);
748 condition_found = true;
749 break;
750 }
751
752 if (!condition_found) {
753 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNKNOWN_CONDITION;
754 goto end;
755 }
756
757 /*
758 * Remove the client from the list of clients interested the trigger
759 * matching the condition.
760 */
761 rcu_read_lock();
762 cds_lfht_lookup(state->notification_trigger_clients_ht,
763 lttng_condition_hash(condition),
764 match_client_list_condition,
765 condition,
766 &iter);
767 node = cds_lfht_iter_get_node(&iter);
768 if (!node) {
769 goto end_unlock;
770 }
771
772 client_list = caa_container_of(node, struct notification_client_list,
773 notification_trigger_ht_node);
774 cds_list_for_each_entry_safe(client_list_element, client_tmp,
775 &client_list->list, node) {
776 if (client_list_element->client->socket != client->socket) {
777 continue;
778 }
779 cds_list_del(&client_list_element->node);
780 free(client_list_element);
781 break;
782 }
783end_unlock:
784 rcu_read_unlock();
785end:
786 lttng_condition_destroy(condition);
787 if (_status) {
788 *_status = status;
789 }
790 return 0;
791}
792
793static
794void notification_client_destroy(struct notification_client *client,
795 struct notification_thread_state *state)
796{
797 struct lttng_condition_list_element *condition_list_element, *tmp;
798
799 if (!client) {
800 return;
801 }
802
803 /* Release all conditions to which the client was subscribed. */
804 cds_list_for_each_entry_safe(condition_list_element, tmp,
805 &client->condition_list, node) {
806 (void) notification_thread_client_unsubscribe(client,
807 condition_list_element->condition, state, NULL);
808 }
809
810 if (client->socket >= 0) {
811 (void) lttcomm_close_unix_sock(client->socket);
812 }
813 lttng_dynamic_buffer_reset(&client->communication.inbound.buffer);
814 lttng_dynamic_buffer_reset(&client->communication.outbound.buffer);
815 free(client);
816}
817
818/*
819 * Call with rcu_read_lock held (and hold for the lifetime of the returned
820 * client pointer).
821 */
822static
823struct notification_client *get_client_from_socket(int socket,
824 struct notification_thread_state *state)
825{
826 struct cds_lfht_iter iter;
827 struct cds_lfht_node *node;
828 struct notification_client *client = NULL;
829
830 cds_lfht_lookup(state->client_socket_ht,
831 hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed),
832 match_client,
833 (void *) (unsigned long) socket,
834 &iter);
835 node = cds_lfht_iter_get_node(&iter);
836 if (!node) {
837 goto end;
838 }
839
840 client = caa_container_of(node, struct notification_client,
841 client_socket_ht_node);
842end:
843 return client;
844}
845
846static
847bool trigger_applies_to_channel(struct lttng_trigger *trigger,
8abe313a 848 struct channel_info *channel_info)
ab0ee2ca
JG
849{
850 enum lttng_condition_status status;
851 struct lttng_condition *condition;
852 const char *trigger_session_name = NULL;
853 const char *trigger_channel_name = NULL;
854 enum lttng_domain_type trigger_domain;
855
856 condition = lttng_trigger_get_condition(trigger);
857 if (!condition) {
858 goto fail;
859 }
860
861 switch (lttng_condition_get_type(condition)) {
862 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
863 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
864 break;
865 default:
866 goto fail;
867 }
868
869 status = lttng_condition_buffer_usage_get_domain_type(condition,
870 &trigger_domain);
871 assert(status == LTTNG_CONDITION_STATUS_OK);
8abe313a 872 if (channel_info->key.domain != trigger_domain) {
ab0ee2ca
JG
873 goto fail;
874 }
875
876 status = lttng_condition_buffer_usage_get_session_name(
877 condition, &trigger_session_name);
878 assert((status == LTTNG_CONDITION_STATUS_OK) && trigger_session_name);
879
880 status = lttng_condition_buffer_usage_get_channel_name(
881 condition, &trigger_channel_name);
882 assert((status == LTTNG_CONDITION_STATUS_OK) && trigger_channel_name);
883
8abe313a 884 if (strcmp(channel_info->session_info->name, trigger_session_name)) {
ab0ee2ca
JG
885 goto fail;
886 }
8abe313a 887 if (strcmp(channel_info->name, trigger_channel_name)) {
ab0ee2ca
JG
888 goto fail;
889 }
890
891 return true;
892fail:
893 return false;
894}
895
896static
897bool trigger_applies_to_client(struct lttng_trigger *trigger,
898 struct notification_client *client)
899{
900 bool applies = false;
901 struct lttng_condition_list_element *condition_list_element;
902
903 cds_list_for_each_entry(condition_list_element, &client->condition_list,
904 node) {
905 applies = lttng_condition_is_equal(
906 condition_list_element->condition,
907 lttng_trigger_get_condition(trigger));
908 if (applies) {
909 break;
910 }
911 }
912 return applies;
913}
914
8abe313a
JG
915static
916int match_session(struct cds_lfht_node *node, const void *key)
917{
918 const char *name = key;
919 struct session_info *session_info = caa_container_of(
920 node, struct session_info, sessions_ht_node);
921
922 return !strcmp(session_info->name, name);
923}
924
925static
926struct session_info *find_or_create_session_info(
927 struct notification_thread_state *state,
928 const char *name, uid_t uid, gid_t gid)
929{
930 struct session_info *session = NULL;
931 struct cds_lfht_node *node;
932 struct cds_lfht_iter iter;
933
934 rcu_read_lock();
935 cds_lfht_lookup(state->sessions_ht,
936 hash_key_str(name, lttng_ht_seed),
937 match_session,
938 name,
939 &iter);
940 node = cds_lfht_iter_get_node(&iter);
941 if (node) {
942 DBG("[notification-thread] Found session info of session \"%s\" (uid = %i, gid = %i)",
943 name, uid, gid);
944 session = caa_container_of(node, struct session_info,
945 sessions_ht_node);
946 assert(session->uid == uid);
947 assert(session->gid == gid);
948 goto end;
949 }
950
951 session = session_info_create(name, uid, gid);
952 if (!session) {
953 ERR("[notification-thread] Failed to allocation session info for session \"%s\" (uid = %i, gid = %i)",
954 name, uid, gid);
955 goto end;
956 }
957end:
958 rcu_read_unlock();
959 return session;
960}
961
ab0ee2ca
JG
962static
963int handle_notification_thread_command_add_channel(
8abe313a
JG
964 struct notification_thread_state *state,
965 const char *session_name, uid_t session_uid, gid_t session_gid,
966 const char *channel_name, enum lttng_domain_type channel_domain,
967 uint64_t channel_key_int, uint64_t channel_capacity,
968 enum lttng_error_code *cmd_result)
ab0ee2ca
JG
969{
970 struct cds_list_head trigger_list;
8abe313a
JG
971 struct channel_info *new_channel_info = NULL;
972 struct channel_key channel_key = {
973 .key = channel_key_int,
974 .domain = channel_domain,
975 };
ab0ee2ca
JG
976 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
977 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
978 int trigger_count = 0;
979 struct cds_lfht_iter iter;
8abe313a 980 struct session_info *session_info = NULL;
ab0ee2ca
JG
981
982 DBG("[notification-thread] Adding channel %s from session %s, channel key = %" PRIu64 " in %s domain",
8abe313a
JG
983 channel_name, session_name, channel_key_int,
984 channel_domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
ab0ee2ca
JG
985
986 CDS_INIT_LIST_HEAD(&trigger_list);
987
8abe313a
JG
988 session_info = find_or_create_session_info(state, session_name,
989 session_uid, session_gid);
990 if (!session_info) {
991 /* Allocation error or an internal error occured. */
ab0ee2ca
JG
992 goto error;
993 }
994
8abe313a
JG
995 new_channel_info = channel_info_create(channel_name, &channel_key,
996 channel_capacity, session_info);
997 if (!new_channel_info) {
998 goto error;
999 }
ab0ee2ca
JG
1000
1001 /* Build a list of all triggers applying to the new channel. */
1002 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1003 node) {
1004 struct lttng_trigger_list_element *new_element;
1005
1006 if (!trigger_applies_to_channel(trigger_ht_element->trigger,
8abe313a 1007 new_channel_info)) {
ab0ee2ca
JG
1008 continue;
1009 }
1010
1011 new_element = zmalloc(sizeof(*new_element));
1012 if (!new_element) {
1013 goto error;
1014 }
1015 CDS_INIT_LIST_HEAD(&new_element->node);
1016 new_element->trigger = trigger_ht_element->trigger;
1017 cds_list_add(&new_element->node, &trigger_list);
1018 trigger_count++;
1019 }
1020
1021 DBG("[notification-thread] Found %i triggers that apply to newly added channel",
1022 trigger_count);
1023 channel_trigger_list = zmalloc(sizeof(*channel_trigger_list));
1024 if (!channel_trigger_list) {
1025 goto error;
1026 }
8abe313a 1027 channel_trigger_list->channel_key = new_channel_info->key;
ab0ee2ca
JG
1028 CDS_INIT_LIST_HEAD(&channel_trigger_list->list);
1029 cds_lfht_node_init(&channel_trigger_list->channel_triggers_ht_node);
1030 cds_list_splice(&trigger_list, &channel_trigger_list->list);
1031
1032 rcu_read_lock();
1033 /* Add channel to the channel_ht which owns the channel_infos. */
1034 cds_lfht_add(state->channels_ht,
8abe313a 1035 hash_channel_key(&new_channel_info->key),
ab0ee2ca
JG
1036 &new_channel_info->channels_ht_node);
1037 /*
1038 * Add the list of triggers associated with this channel to the
1039 * channel_triggers_ht.
1040 */
1041 cds_lfht_add(state->channel_triggers_ht,
8abe313a 1042 hash_channel_key(&new_channel_info->key),
ab0ee2ca
JG
1043 &channel_trigger_list->channel_triggers_ht_node);
1044 rcu_read_unlock();
1045 *cmd_result = LTTNG_OK;
1046 return 0;
1047error:
ab0ee2ca 1048 channel_info_destroy(new_channel_info);
8abe313a 1049 session_info_put(session_info);
ab0ee2ca
JG
1050 return 1;
1051}
1052
1053static
1054int handle_notification_thread_command_remove_channel(
1055 struct notification_thread_state *state,
1056 uint64_t channel_key, enum lttng_domain_type domain,
1057 enum lttng_error_code *cmd_result)
1058{
1059 struct cds_lfht_node *node;
1060 struct cds_lfht_iter iter;
1061 struct lttng_channel_trigger_list *trigger_list;
1062 struct lttng_trigger_list_element *trigger_list_element, *tmp;
1063 struct channel_key key = { .key = channel_key, .domain = domain };
1064 struct channel_info *channel_info;
1065
1066 DBG("[notification-thread] Removing channel key = %" PRIu64 " in %s domain",
1067 channel_key, domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
1068
1069 rcu_read_lock();
1070
1071 cds_lfht_lookup(state->channel_triggers_ht,
1072 hash_channel_key(&key),
1073 match_channel_trigger_list,
1074 &key,
1075 &iter);
1076 node = cds_lfht_iter_get_node(&iter);
1077 /*
1078 * There is a severe internal error if we are being asked to remove a
1079 * channel that doesn't exist.
1080 */
1081 if (!node) {
1082 ERR("[notification-thread] Channel being removed is unknown to the notification thread");
1083 goto end;
1084 }
1085
1086 /* Free the list of triggers associated with this channel. */
1087 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
1088 channel_triggers_ht_node);
1089 cds_list_for_each_entry_safe(trigger_list_element, tmp,
1090 &trigger_list->list, node) {
1091 cds_list_del(&trigger_list_element->node);
1092 free(trigger_list_element);
1093 }
1094 cds_lfht_del(state->channel_triggers_ht, node);
1095 free(trigger_list);
1096
1097 /* Free sampled channel state. */
1098 cds_lfht_lookup(state->channel_state_ht,
1099 hash_channel_key(&key),
1100 match_channel_state_sample,
1101 &key,
1102 &iter);
1103 node = cds_lfht_iter_get_node(&iter);
1104 /*
1105 * This is expected to be NULL if the channel is destroyed before we
1106 * received a sample.
1107 */
1108 if (node) {
1109 struct channel_state_sample *sample = caa_container_of(node,
1110 struct channel_state_sample,
1111 channel_state_ht_node);
1112
1113 cds_lfht_del(state->channel_state_ht, node);
1114 free(sample);
1115 }
1116
1117 /* Remove the channel from the channels_ht and free it. */
1118 cds_lfht_lookup(state->channels_ht,
1119 hash_channel_key(&key),
1120 match_channel_info,
1121 &key,
1122 &iter);
1123 node = cds_lfht_iter_get_node(&iter);
1124 assert(node);
1125 channel_info = caa_container_of(node, struct channel_info,
1126 channels_ht_node);
1127 cds_lfht_del(state->channels_ht, node);
1128 channel_info_destroy(channel_info);
1129end:
1130 rcu_read_unlock();
1131 *cmd_result = LTTNG_OK;
1132 return 0;
1133}
1134
1da26331
JG
1135static
1136int condition_is_supported(struct lttng_condition *condition)
1137{
1138 int ret;
1139
1140 switch (lttng_condition_get_type(condition)) {
1141 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
1142 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
1143 {
1144 enum lttng_domain_type domain;
1145
1146 ret = lttng_condition_buffer_usage_get_domain_type(condition,
1147 &domain);
1148 if (ret) {
1149 ret = -1;
1150 goto end;
1151 }
1152
1153 if (domain != LTTNG_DOMAIN_KERNEL) {
1154 ret = 1;
1155 goto end;
1156 }
1157
1158 /*
1159 * Older kernel tracers don't expose the API to monitor their
1160 * buffers. Therefore, we reject triggers that require that
1161 * mechanism to be available to be evaluated.
1162 */
1163 ret = kernel_supports_ring_buffer_snapshot_sample_positions(
1164 kernel_tracer_fd);
1165 break;
1166 }
1167 default:
1168 ret = 1;
1169 }
1170end:
1171 return ret;
1172}
1173
ab0ee2ca
JG
1174/*
1175 * FIXME A client's credentials are not checked when registering a trigger, nor
1176 * are they stored alongside with the trigger.
1177 *
1da26331 1178 * The effects of this are benign since:
ab0ee2ca
JG
1179 * - The client will succeed in registering the trigger, as it is valid,
1180 * - The trigger will, internally, be bound to the channel,
1181 * - The notifications will not be sent since the client's credentials
1182 * are checked against the channel at that moment.
1da26331
JG
1183 *
1184 * If this function returns a non-zero value, it means something is
50ca7858 1185 * fundamentally broken and the whole subsystem/thread will be torn down.
1da26331
JG
1186 *
1187 * If a non-fatal error occurs, just set the cmd_result to the appropriate
1188 * error code.
ab0ee2ca
JG
1189 */
1190static
1191int handle_notification_thread_command_register_trigger(
8abe313a
JG
1192 struct notification_thread_state *state,
1193 struct lttng_trigger *trigger,
1194 enum lttng_error_code *cmd_result)
ab0ee2ca
JG
1195{
1196 int ret = 0;
1197 struct lttng_condition *condition;
1198 struct notification_client *client;
1199 struct notification_client_list *client_list = NULL;
1200 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1201 struct notification_client_list_element *client_list_element, *tmp;
1202 struct cds_lfht_node *node;
1203 struct cds_lfht_iter iter;
1204 struct channel_info *channel;
1205 bool free_trigger = true;
1206
1207 rcu_read_lock();
1208
1209 condition = lttng_trigger_get_condition(trigger);
1da26331
JG
1210 assert(condition);
1211
1212 ret = condition_is_supported(condition);
1213 if (ret < 0) {
1214 goto error;
1215 } else if (ret == 0) {
1216 *cmd_result = LTTNG_ERR_NOT_SUPPORTED;
1217 goto error;
1218 } else {
1219 /* Feature is supported, continue. */
1220 ret = 0;
1221 }
1222
ab0ee2ca
JG
1223 trigger_ht_element = zmalloc(sizeof(*trigger_ht_element));
1224 if (!trigger_ht_element) {
1225 ret = -1;
1226 goto error;
1227 }
1228
1229 /* Add trigger to the trigger_ht. */
1230 cds_lfht_node_init(&trigger_ht_element->node);
1231 trigger_ht_element->trigger = trigger;
1232
1233 node = cds_lfht_add_unique(state->triggers_ht,
1234 lttng_condition_hash(condition),
1235 match_condition,
1236 condition,
1237 &trigger_ht_element->node);
1238 if (node != &trigger_ht_element->node) {
1239 /* Not a fatal error, simply report it to the client. */
1240 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
1241 goto error_free_ht_element;
1242 }
1243
1244 /*
1245 * Ownership of the trigger and of its wrapper was transfered to
1246 * the triggers_ht.
1247 */
1248 trigger_ht_element = NULL;
1249 free_trigger = false;
1250
1251 /*
1252 * The rest only applies to triggers that have a "notify" action.
1253 * It is not skipped as this is the only action type currently
1254 * supported.
1255 */
1256 client_list = zmalloc(sizeof(*client_list));
1257 if (!client_list) {
1258 ret = -1;
1259 goto error_free_ht_element;
1260 }
1261 cds_lfht_node_init(&client_list->notification_trigger_ht_node);
1262 CDS_INIT_LIST_HEAD(&client_list->list);
1263 client_list->trigger = trigger;
1264
1265 /* Build a list of clients to which this new trigger applies. */
1266 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
1267 client_socket_ht_node) {
1268 if (!trigger_applies_to_client(trigger, client)) {
1269 continue;
1270 }
1271
1272 client_list_element = zmalloc(sizeof(*client_list_element));
1273 if (!client_list_element) {
1274 ret = -1;
1275 goto error_free_client_list;
1276 }
1277 CDS_INIT_LIST_HEAD(&client_list_element->node);
1278 client_list_element->client = client;
1279 cds_list_add(&client_list_element->node, &client_list->list);
1280 }
1281
1282 cds_lfht_add(state->notification_trigger_clients_ht,
1283 lttng_condition_hash(condition),
1284 &client_list->notification_trigger_ht_node);
ab0ee2ca
JG
1285
1286 /*
1287 * Add the trigger to list of triggers bound to the channels currently
1288 * known.
1289 */
1290 cds_lfht_for_each_entry(state->channels_ht, &iter, channel,
1291 channels_ht_node) {
1292 struct lttng_trigger_list_element *trigger_list_element;
1293 struct lttng_channel_trigger_list *trigger_list;
1294
1295 if (!trigger_applies_to_channel(trigger, channel)) {
1296 continue;
1297 }
1298
1299 cds_lfht_lookup(state->channel_triggers_ht,
1300 hash_channel_key(&channel->key),
1301 match_channel_trigger_list,
1302 &channel->key,
1303 &iter);
1304 node = cds_lfht_iter_get_node(&iter);
1305 assert(node);
ab0ee2ca
JG
1306 trigger_list = caa_container_of(node,
1307 struct lttng_channel_trigger_list,
1308 channel_triggers_ht_node);
1309
1310 trigger_list_element = zmalloc(sizeof(*trigger_list_element));
1311 if (!trigger_list_element) {
1312 ret = -1;
1313 goto error_free_client_list;
1314 }
1315 CDS_INIT_LIST_HEAD(&trigger_list_element->node);
1316 trigger_list_element->trigger = trigger;
1317 cds_list_add(&trigger_list_element->node, &trigger_list->list);
e4db5ace 1318
ab0ee2ca
JG
1319 /* A trigger can only apply to one channel. */
1320 break;
1321 }
1322
2ae99f0b
JG
1323 /*
1324 * Since there is nothing preventing clients from subscribing to a
1325 * condition before the corresponding trigger is registered, we have
1326 * to evaluate this new condition right away.
1327 *
1328 * At some point, we were waiting for the next "evaluation" (e.g. on
1329 * reception of a channel sample) to evaluate this new condition, but
1330 * that was broken.
1331 *
1332 * The reason it was broken is that waiting for the next sample
1333 * does not allow us to properly handle transitions for edge-triggered
1334 * conditions.
1335 *
1336 * Consider this example: when we handle a new channel sample, we
1337 * evaluate each conditions twice: once with the previous state, and
1338 * again with the newest state. We then use those two results to
1339 * determine whether a state change happened: a condition was false and
1340 * became true. If a state change happened, we have to notify clients.
1341 *
1342 * Now, if a client subscribes to a given notification and registers
1343 * a trigger *after* that subscription, we have to make sure the
1344 * condition is evaluated at this point while considering only the
1345 * current state. Otherwise, the next evaluation cycle may only see
1346 * that the evaluations remain the same (true for samples n-1 and n) and
1347 * the client will never know that the condition has been met.
1348 */
1349 cds_list_for_each_entry_safe(client_list_element, tmp,
1350 &client_list->list, node) {
1351 ret = evaluate_condition_for_client(trigger, condition,
1352 client_list_element->client, state);
1353 if (ret) {
1354 goto error_free_client_list;
1355 }
1356 }
1357
1358 /*
1359 * Client list ownership transferred to the
1360 * notification_trigger_clients_ht.
1361 */
1362 client_list = NULL;
1363
ab0ee2ca
JG
1364 *cmd_result = LTTNG_OK;
1365error_free_client_list:
1366 if (client_list) {
1367 cds_list_for_each_entry_safe(client_list_element, tmp,
1368 &client_list->list, node) {
1369 free(client_list_element);
1370 }
1371 free(client_list);
1372 }
1373error_free_ht_element:
1374 free(trigger_ht_element);
1375error:
1376 if (free_trigger) {
1377 struct lttng_action *action = lttng_trigger_get_action(trigger);
1378
1379 lttng_condition_destroy(condition);
1380 lttng_action_destroy(action);
1381 lttng_trigger_destroy(trigger);
1382 }
1383 rcu_read_unlock();
1384 return ret;
1385}
1386
cc2295b5 1387static
ab0ee2ca
JG
1388int handle_notification_thread_command_unregister_trigger(
1389 struct notification_thread_state *state,
1390 struct lttng_trigger *trigger,
1391 enum lttng_error_code *_cmd_reply)
1392{
1393 struct cds_lfht_iter iter;
1394 struct cds_lfht_node *node, *triggers_ht_node;
1395 struct lttng_channel_trigger_list *trigger_list;
1396 struct notification_client_list *client_list;
1397 struct notification_client_list_element *client_list_element, *tmp;
1398 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1399 struct lttng_condition *condition = lttng_trigger_get_condition(
1400 trigger);
1401 struct lttng_action *action;
1402 enum lttng_error_code cmd_reply;
1403
1404 rcu_read_lock();
1405
1406 cds_lfht_lookup(state->triggers_ht,
1407 lttng_condition_hash(condition),
1408 match_condition,
1409 condition,
1410 &iter);
1411 triggers_ht_node = cds_lfht_iter_get_node(&iter);
1412 if (!triggers_ht_node) {
1413 cmd_reply = LTTNG_ERR_TRIGGER_NOT_FOUND;
1414 goto end;
1415 } else {
1416 cmd_reply = LTTNG_OK;
1417 }
1418
1419 /* Remove trigger from channel_triggers_ht. */
1420 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter, trigger_list,
1421 channel_triggers_ht_node) {
1422 struct lttng_trigger_list_element *trigger_element, *tmp;
1423
1424 cds_list_for_each_entry_safe(trigger_element, tmp,
1425 &trigger_list->list, node) {
1426 struct lttng_condition *current_condition =
1427 lttng_trigger_get_condition(
1428 trigger_element->trigger);
1429
1430 assert(current_condition);
1431 if (!lttng_condition_is_equal(condition,
1432 current_condition)) {
1433 continue;
1434 }
1435
1436 DBG("[notification-thread] Removed trigger from channel_triggers_ht");
1437 cds_list_del(&trigger_element->node);
e4db5ace
JR
1438 /* A trigger can only appear once per channel */
1439 break;
ab0ee2ca
JG
1440 }
1441 }
1442
1443 /*
1444 * Remove and release the client list from
1445 * notification_trigger_clients_ht.
1446 */
1447 cds_lfht_lookup(state->notification_trigger_clients_ht,
1448 lttng_condition_hash(condition),
1449 match_client_list,
1450 trigger,
1451 &iter);
1452 node = cds_lfht_iter_get_node(&iter);
1453 assert(node);
1454 client_list = caa_container_of(node, struct notification_client_list,
1455 notification_trigger_ht_node);
1456 cds_list_for_each_entry_safe(client_list_element, tmp,
1457 &client_list->list, node) {
1458 free(client_list_element);
1459 }
1460 cds_lfht_del(state->notification_trigger_clients_ht, node);
1461 free(client_list);
1462
1463 /* Remove trigger from triggers_ht. */
1464 trigger_ht_element = caa_container_of(triggers_ht_node,
1465 struct lttng_trigger_ht_element, node);
1466 cds_lfht_del(state->triggers_ht, triggers_ht_node);
1467
1468 condition = lttng_trigger_get_condition(trigger_ht_element->trigger);
1469 lttng_condition_destroy(condition);
1470 action = lttng_trigger_get_action(trigger_ht_element->trigger);
1471 lttng_action_destroy(action);
1472 lttng_trigger_destroy(trigger_ht_element->trigger);
1473 free(trigger_ht_element);
1474end:
1475 rcu_read_unlock();
1476 if (_cmd_reply) {
1477 *_cmd_reply = cmd_reply;
1478 }
1479 return 0;
1480}
1481
1482/* Returns 0 on success, 1 on exit requested, negative value on error. */
1483int handle_notification_thread_command(
1484 struct notification_thread_handle *handle,
1485 struct notification_thread_state *state)
1486{
1487 int ret;
1488 uint64_t counter;
1489 struct notification_thread_command *cmd;
1490
8abe313a
JG
1491 /* Read the event pipe to put it back into a quiescent state. */
1492 ret = read(lttng_pipe_get_readfd(handle->cmd_queue.event_pipe), &counter,
1493 sizeof(counter));
ab0ee2ca
JG
1494 if (ret == -1) {
1495 goto error;
1496 }
1497
1498 pthread_mutex_lock(&handle->cmd_queue.lock);
1499 cmd = cds_list_first_entry(&handle->cmd_queue.list,
1500 struct notification_thread_command, cmd_list_node);
1501 switch (cmd->type) {
1502 case NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER:
1503 DBG("[notification-thread] Received register trigger command");
1504 ret = handle_notification_thread_command_register_trigger(
1505 state, cmd->parameters.trigger,
1506 &cmd->reply_code);
1507 break;
1508 case NOTIFICATION_COMMAND_TYPE_UNREGISTER_TRIGGER:
1509 DBG("[notification-thread] Received unregister trigger command");
1510 ret = handle_notification_thread_command_unregister_trigger(
1511 state, cmd->parameters.trigger,
1512 &cmd->reply_code);
1513 break;
1514 case NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL:
1515 DBG("[notification-thread] Received add channel command");
1516 ret = handle_notification_thread_command_add_channel(
8abe313a
JG
1517 state,
1518 cmd->parameters.add_channel.session.name,
1519 cmd->parameters.add_channel.session.uid,
1520 cmd->parameters.add_channel.session.gid,
1521 cmd->parameters.add_channel.channel.name,
1522 cmd->parameters.add_channel.channel.domain,
1523 cmd->parameters.add_channel.channel.key,
1524 cmd->parameters.add_channel.channel.capacity,
ab0ee2ca
JG
1525 &cmd->reply_code);
1526 break;
1527 case NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL:
1528 DBG("[notification-thread] Received remove channel command");
1529 ret = handle_notification_thread_command_remove_channel(
1530 state, cmd->parameters.remove_channel.key,
1531 cmd->parameters.remove_channel.domain,
1532 &cmd->reply_code);
1533 break;
1534 case NOTIFICATION_COMMAND_TYPE_QUIT:
1535 DBG("[notification-thread] Received quit command");
1536 cmd->reply_code = LTTNG_OK;
1537 ret = 1;
1538 goto end;
1539 default:
1540 ERR("[notification-thread] Unknown internal command received");
1541 goto error_unlock;
1542 }
1543
1544 if (ret) {
1545 goto error_unlock;
1546 }
1547end:
1548 cds_list_del(&cmd->cmd_list_node);
8ada111f 1549 lttng_waiter_wake_up(&cmd->reply_waiter);
ab0ee2ca
JG
1550 pthread_mutex_unlock(&handle->cmd_queue.lock);
1551 return ret;
1552error_unlock:
1553 /* Wake-up and return a fatal error to the calling thread. */
8ada111f 1554 lttng_waiter_wake_up(&cmd->reply_waiter);
ab0ee2ca
JG
1555 pthread_mutex_unlock(&handle->cmd_queue.lock);
1556 cmd->reply_code = LTTNG_ERR_FATAL;
1557error:
1558 /* Indicate a fatal error to the caller. */
1559 return -1;
1560}
1561
1562static
1563unsigned long hash_client_socket(int socket)
1564{
1565 return hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed);
1566}
1567
1568static
1569int socket_set_non_blocking(int socket)
1570{
1571 int ret, flags;
1572
1573 /* Set the pipe as non-blocking. */
1574 ret = fcntl(socket, F_GETFL, 0);
1575 if (ret == -1) {
1576 PERROR("fcntl get socket flags");
1577 goto end;
1578 }
1579 flags = ret;
1580
1581 ret = fcntl(socket, F_SETFL, flags | O_NONBLOCK);
1582 if (ret == -1) {
1583 PERROR("fcntl set O_NONBLOCK socket flag");
1584 goto end;
1585 }
1586 DBG("Client socket (fd = %i) set as non-blocking", socket);
1587end:
1588 return ret;
1589}
1590
1591static
14fa22f8 1592int client_reset_inbound_state(struct notification_client *client)
ab0ee2ca
JG
1593{
1594 int ret;
1595
1596 ret = lttng_dynamic_buffer_set_size(
1597 &client->communication.inbound.buffer, 0);
1598 assert(!ret);
1599
1600 client->communication.inbound.bytes_to_receive =
1601 sizeof(struct lttng_notification_channel_message);
1602 client->communication.inbound.msg_type =
1603 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN;
ab0ee2ca
JG
1604 LTTNG_SOCK_SET_UID_CRED(&client->communication.inbound.creds, -1);
1605 LTTNG_SOCK_SET_GID_CRED(&client->communication.inbound.creds, -1);
14fa22f8
JG
1606 ret = lttng_dynamic_buffer_set_size(
1607 &client->communication.inbound.buffer,
1608 client->communication.inbound.bytes_to_receive);
1609 return ret;
ab0ee2ca
JG
1610}
1611
1612int handle_notification_thread_client_connect(
1613 struct notification_thread_state *state)
1614{
1615 int ret;
1616 struct notification_client *client;
1617
1618 DBG("[notification-thread] Handling new notification channel client connection");
1619
1620 client = zmalloc(sizeof(*client));
1621 if (!client) {
1622 /* Fatal error. */
1623 ret = -1;
1624 goto error;
1625 }
1626 CDS_INIT_LIST_HEAD(&client->condition_list);
1627 lttng_dynamic_buffer_init(&client->communication.inbound.buffer);
1628 lttng_dynamic_buffer_init(&client->communication.outbound.buffer);
01ea340e 1629 client->communication.inbound.expect_creds = true;
14fa22f8
JG
1630 ret = client_reset_inbound_state(client);
1631 if (ret) {
1632 ERR("[notification-thread] Failed to reset client communication's inbound state");
1633 ret = 0;
1634 goto error;
1635 }
ab0ee2ca
JG
1636
1637 ret = lttcomm_accept_unix_sock(state->notification_channel_socket);
1638 if (ret < 0) {
1639 ERR("[notification-thread] Failed to accept new notification channel client connection");
1640 ret = 0;
1641 goto error;
1642 }
1643
1644 client->socket = ret;
1645
1646 ret = socket_set_non_blocking(client->socket);
1647 if (ret) {
1648 ERR("[notification-thread] Failed to set new notification channel client connection socket as non-blocking");
1649 goto error;
1650 }
1651
1652 ret = lttcomm_setsockopt_creds_unix_sock(client->socket);
1653 if (ret < 0) {
1654 ERR("[notification-thread] Failed to set socket options on new notification channel client socket");
1655 ret = 0;
1656 goto error;
1657 }
1658
1659 ret = lttng_poll_add(&state->events, client->socket,
1660 LPOLLIN | LPOLLERR |
1661 LPOLLHUP | LPOLLRDHUP);
1662 if (ret < 0) {
1663 ERR("[notification-thread] Failed to add notification channel client socket to poll set");
1664 ret = 0;
1665 goto error;
1666 }
1667 DBG("[notification-thread] Added new notification channel client socket (%i) to poll set",
1668 client->socket);
1669
ab0ee2ca
JG
1670 rcu_read_lock();
1671 cds_lfht_add(state->client_socket_ht,
1672 hash_client_socket(client->socket),
1673 &client->client_socket_ht_node);
1674 rcu_read_unlock();
1675
1676 return ret;
1677error:
1678 notification_client_destroy(client, state);
1679 return ret;
1680}
1681
1682int handle_notification_thread_client_disconnect(
1683 int client_socket,
1684 struct notification_thread_state *state)
1685{
1686 int ret = 0;
1687 struct notification_client *client;
1688
1689 rcu_read_lock();
1690 DBG("[notification-thread] Closing client connection (socket fd = %i)",
1691 client_socket);
1692 client = get_client_from_socket(client_socket, state);
1693 if (!client) {
1694 /* Internal state corruption, fatal error. */
1695 ERR("[notification-thread] Unable to find client (socket fd = %i)",
1696 client_socket);
1697 ret = -1;
1698 goto end;
1699 }
1700
1701 ret = lttng_poll_del(&state->events, client_socket);
1702 if (ret) {
1703 ERR("[notification-thread] Failed to remove client socket from poll set");
1704 }
1705 cds_lfht_del(state->client_socket_ht,
1706 &client->client_socket_ht_node);
1707 notification_client_destroy(client, state);
1708end:
1709 rcu_read_unlock();
1710 return ret;
1711}
1712
1713int handle_notification_thread_client_disconnect_all(
1714 struct notification_thread_state *state)
1715{
1716 struct cds_lfht_iter iter;
1717 struct notification_client *client;
1718 bool error_encoutered = false;
1719
1720 rcu_read_lock();
1721 DBG("[notification-thread] Closing all client connections");
1722 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
1723 client_socket_ht_node) {
1724 int ret;
1725
1726 ret = handle_notification_thread_client_disconnect(
1727 client->socket, state);
1728 if (ret) {
1729 error_encoutered = true;
1730 }
1731 }
1732 rcu_read_unlock();
1733 return error_encoutered ? 1 : 0;
1734}
1735
1736int handle_notification_thread_trigger_unregister_all(
1737 struct notification_thread_state *state)
1738{
4149ace8 1739 bool error_occurred = false;
ab0ee2ca
JG
1740 struct cds_lfht_iter iter;
1741 struct lttng_trigger_ht_element *trigger_ht_element;
1742
1743 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1744 node) {
1745 int ret = handle_notification_thread_command_unregister_trigger(
1746 state, trigger_ht_element->trigger, NULL);
1747 if (ret) {
4149ace8 1748 error_occurred = true;
ab0ee2ca
JG
1749 }
1750 }
4149ace8 1751 return error_occurred ? -1 : 0;
ab0ee2ca
JG
1752}
1753
1754static
1755int client_flush_outgoing_queue(struct notification_client *client,
1756 struct notification_thread_state *state)
1757{
1758 ssize_t ret;
1759 size_t to_send_count;
1760
1761 assert(client->communication.outbound.buffer.size != 0);
1762 to_send_count = client->communication.outbound.buffer.size;
1763 DBG("[notification-thread] Flushing client (socket fd = %i) outgoing queue",
1764 client->socket);
1765
1766 ret = lttcomm_send_unix_sock_non_block(client->socket,
1767 client->communication.outbound.buffer.data,
1768 to_send_count);
1769 if ((ret < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) ||
1770 (ret > 0 && ret < to_send_count)) {
1771 DBG("[notification-thread] Client (socket fd = %i) outgoing queue could not be completely flushed",
1772 client->socket);
1773 to_send_count -= max(ret, 0);
1774
1775 memcpy(client->communication.outbound.buffer.data,
1776 client->communication.outbound.buffer.data +
1777 client->communication.outbound.buffer.size - to_send_count,
1778 to_send_count);
1779 ret = lttng_dynamic_buffer_set_size(
1780 &client->communication.outbound.buffer,
1781 to_send_count);
1782 if (ret) {
1783 goto error;
1784 }
1785
1786 /*
1787 * We want to be notified whenever there is buffer space
1788 * available to send the rest of the payload.
1789 */
1790 ret = lttng_poll_mod(&state->events, client->socket,
1791 CLIENT_POLL_MASK_IN_OUT);
1792 if (ret) {
1793 goto error;
1794 }
1795 } else if (ret < 0) {
1796 /* Generic error, disconnect the client. */
1797 ERR("[notification-thread] Failed to send flush outgoing queue, disconnecting client (socket fd = %i)",
1798 client->socket);
1799 ret = handle_notification_thread_client_disconnect(
1800 client->socket, state);
1801 if (ret) {
1802 goto error;
1803 }
1804 } else {
1805 /* No error and flushed the queue completely. */
1806 ret = lttng_dynamic_buffer_set_size(
1807 &client->communication.outbound.buffer, 0);
1808 if (ret) {
1809 goto error;
1810 }
1811 ret = lttng_poll_mod(&state->events, client->socket,
1812 CLIENT_POLL_MASK_IN);
1813 if (ret) {
1814 goto error;
1815 }
1816
1817 client->communication.outbound.queued_command_reply = false;
1818 client->communication.outbound.dropped_notification = false;
1819 }
1820
1821 return 0;
1822error:
1823 return -1;
1824}
1825
1826static
1827int client_send_command_reply(struct notification_client *client,
1828 struct notification_thread_state *state,
1829 enum lttng_notification_channel_status status)
1830{
1831 int ret;
1832 struct lttng_notification_channel_command_reply reply = {
1833 .status = (int8_t) status,
1834 };
1835 struct lttng_notification_channel_message msg = {
1836 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY,
1837 .size = sizeof(reply),
1838 };
1839 char buffer[sizeof(msg) + sizeof(reply)];
1840
1841 if (client->communication.outbound.queued_command_reply) {
1842 /* Protocol error. */
1843 goto error;
1844 }
1845
1846 memcpy(buffer, &msg, sizeof(msg));
1847 memcpy(buffer + sizeof(msg), &reply, sizeof(reply));
1848 DBG("[notification-thread] Send command reply (%i)", (int) status);
1849
1850 /* Enqueue buffer to outgoing queue and flush it. */
1851 ret = lttng_dynamic_buffer_append(
1852 &client->communication.outbound.buffer,
1853 buffer, sizeof(buffer));
1854 if (ret) {
1855 goto error;
1856 }
1857
1858 ret = client_flush_outgoing_queue(client, state);
1859 if (ret) {
1860 goto error;
1861 }
1862
1863 if (client->communication.outbound.buffer.size != 0) {
1864 /* Queue could not be emptied. */
1865 client->communication.outbound.queued_command_reply = true;
1866 }
1867
1868 return 0;
1869error:
1870 return -1;
1871}
1872
1873static
1874int client_dispatch_message(struct notification_client *client,
1875 struct notification_thread_state *state)
1876{
1877 int ret = 0;
1878
1879 if (client->communication.inbound.msg_type !=
1880 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE &&
1881 client->communication.inbound.msg_type !=
1882 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN &&
1883 !client->validated) {
1884 WARN("[notification-thread] client attempted a command before handshake");
1885 ret = -1;
1886 goto end;
1887 }
1888
1889 switch (client->communication.inbound.msg_type) {
1890 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN:
1891 {
1892 /*
1893 * Receiving message header. The function will be called again
1894 * once the rest of the message as been received and can be
1895 * interpreted.
1896 */
1897 const struct lttng_notification_channel_message *msg;
1898
1899 assert(sizeof(*msg) ==
1900 client->communication.inbound.buffer.size);
1901 msg = (const struct lttng_notification_channel_message *)
1902 client->communication.inbound.buffer.data;
1903
1904 if (msg->size == 0 || msg->size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) {
1905 ERR("[notification-thread] Invalid notification channel message: length = %u", msg->size);
1906 ret = -1;
1907 goto end;
1908 }
1909
1910 switch (msg->type) {
1911 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
1912 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
1913 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
1914 break;
1915 default:
1916 ret = -1;
1917 ERR("[notification-thread] Invalid notification channel message: unexpected message type");
1918 goto end;
1919 }
1920
1921 client->communication.inbound.bytes_to_receive = msg->size;
1922 client->communication.inbound.msg_type =
1923 (enum lttng_notification_channel_message_type) msg->type;
ab0ee2ca 1924 ret = lttng_dynamic_buffer_set_size(
14fa22f8 1925 &client->communication.inbound.buffer, msg->size);
ab0ee2ca
JG
1926 if (ret) {
1927 goto end;
1928 }
1929 break;
1930 }
1931 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
1932 {
1933 struct lttng_notification_channel_command_handshake *handshake_client;
1934 struct lttng_notification_channel_command_handshake handshake_reply = {
1935 .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
1936 .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
1937 };
1938 struct lttng_notification_channel_message msg_header = {
1939 .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE,
1940 .size = sizeof(handshake_reply),
1941 };
1942 enum lttng_notification_channel_status status =
1943 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
1944 char send_buffer[sizeof(msg_header) + sizeof(handshake_reply)];
1945
1946 memcpy(send_buffer, &msg_header, sizeof(msg_header));
1947 memcpy(send_buffer + sizeof(msg_header), &handshake_reply,
1948 sizeof(handshake_reply));
1949
1950 handshake_client =
1951 (struct lttng_notification_channel_command_handshake *)
1952 client->communication.inbound.buffer.data;
47a32869 1953 client->major = handshake_client->major;
ab0ee2ca
JG
1954 client->minor = handshake_client->minor;
1955 if (!client->communication.inbound.creds_received) {
1956 ERR("[notification-thread] No credentials received from client");
1957 ret = -1;
1958 goto end;
1959 }
1960
1961 client->uid = LTTNG_SOCK_GET_UID_CRED(
1962 &client->communication.inbound.creds);
1963 client->gid = LTTNG_SOCK_GET_GID_CRED(
1964 &client->communication.inbound.creds);
1965 DBG("[notification-thread] Received handshake from client (uid = %u, gid = %u) with version %i.%i",
1966 client->uid, client->gid, (int) client->major,
1967 (int) client->minor);
1968
1969 if (handshake_client->major != LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) {
1970 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNSUPPORTED_VERSION;
1971 }
1972
1973 ret = lttng_dynamic_buffer_append(&client->communication.outbound.buffer,
1974 send_buffer, sizeof(send_buffer));
1975 if (ret) {
1976 ERR("[notification-thread] Failed to send protocol version to notification channel client");
1977 goto end;
1978 }
1979
1980 ret = client_flush_outgoing_queue(client, state);
1981 if (ret) {
1982 goto end;
1983 }
1984
1985 ret = client_send_command_reply(client, state, status);
1986 if (ret) {
1987 ERR("[notification-thread] Failed to send reply to notification channel client");
1988 goto end;
1989 }
1990
1991 /* Set reception state to receive the next message header. */
14fa22f8
JG
1992 ret = client_reset_inbound_state(client);
1993 if (ret) {
1994 ERR("[notification-thread] Failed to reset client communication's inbound state");
1995 goto end;
1996 }
ab0ee2ca
JG
1997 client->validated = true;
1998 break;
1999 }
2000 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
2001 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
2002 {
2003 struct lttng_condition *condition;
2004 enum lttng_notification_channel_status status =
2005 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
2006 const struct lttng_buffer_view condition_view =
2007 lttng_buffer_view_from_dynamic_buffer(
2008 &client->communication.inbound.buffer,
2009 0, -1);
2010 size_t expected_condition_size =
2011 client->communication.inbound.buffer.size;
2012
2013 ret = lttng_condition_create_from_buffer(&condition_view,
2014 &condition);
2015 if (ret != expected_condition_size) {
2016 ERR("[notification-thread] Malformed condition received from client");
2017 goto end;
2018 }
2019
2020 if (client->communication.inbound.msg_type ==
2021 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE) {
ab0ee2ca
JG
2022 ret = notification_thread_client_subscribe(client,
2023 condition, state, &status);
2024 } else {
2025 ret = notification_thread_client_unsubscribe(client,
2026 condition, state, &status);
2027 }
2028 if (ret) {
2029 goto end;
2030 }
2031
2032 ret = client_send_command_reply(client, state, status);
2033 if (ret) {
2034 ERR("[notification-thread] Failed to send reply to notification channel client");
2035 goto end;
2036 }
2037
2038 /* Set reception state to receive the next message header. */
14fa22f8
JG
2039 ret = client_reset_inbound_state(client);
2040 if (ret) {
2041 ERR("[notification-thread] Failed to reset client communication's inbound state");
2042 goto end;
2043 }
ab0ee2ca
JG
2044 break;
2045 }
2046 default:
2047 abort();
2048 }
2049end:
2050 return ret;
2051}
2052
2053/* Incoming data from client. */
2054int handle_notification_thread_client_in(
2055 struct notification_thread_state *state, int socket)
2056{
14fa22f8 2057 int ret = 0;
ab0ee2ca
JG
2058 struct notification_client *client;
2059 ssize_t recv_ret;
2060 size_t offset;
2061
2062 client = get_client_from_socket(socket, state);
2063 if (!client) {
2064 /* Internal error, abort. */
2065 ret = -1;
2066 goto end;
2067 }
2068
14fa22f8
JG
2069 offset = client->communication.inbound.buffer.size -
2070 client->communication.inbound.bytes_to_receive;
01ea340e 2071 if (client->communication.inbound.expect_creds) {
ab0ee2ca
JG
2072 recv_ret = lttcomm_recv_creds_unix_sock(socket,
2073 client->communication.inbound.buffer.data + offset,
2074 client->communication.inbound.bytes_to_receive,
2075 &client->communication.inbound.creds);
2076 if (recv_ret > 0) {
01ea340e 2077 client->communication.inbound.expect_creds = false;
ab0ee2ca
JG
2078 client->communication.inbound.creds_received = true;
2079 }
2080 } else {
2081 recv_ret = lttcomm_recv_unix_sock_non_block(socket,
2082 client->communication.inbound.buffer.data + offset,
2083 client->communication.inbound.bytes_to_receive);
2084 }
2085 if (recv_ret < 0) {
2086 goto error_disconnect_client;
2087 }
2088
2089 client->communication.inbound.bytes_to_receive -= recv_ret;
ab0ee2ca
JG
2090 if (client->communication.inbound.bytes_to_receive == 0) {
2091 ret = client_dispatch_message(client, state);
2092 if (ret) {
2093 /*
2094 * Only returns an error if this client must be
2095 * disconnected.
2096 */
2097 goto error_disconnect_client;
2098 }
2099 } else {
2100 goto end;
2101 }
2102end:
2103 return ret;
2104error_disconnect_client:
2105 ret = handle_notification_thread_client_disconnect(socket, state);
2106 return ret;
2107}
2108
2109/* Client ready to receive outgoing data. */
2110int handle_notification_thread_client_out(
2111 struct notification_thread_state *state, int socket)
2112{
2113 int ret;
2114 struct notification_client *client;
2115
2116 client = get_client_from_socket(socket, state);
2117 if (!client) {
2118 /* Internal error, abort. */
2119 ret = -1;
2120 goto end;
2121 }
2122
2123 ret = client_flush_outgoing_queue(client, state);
2124 if (ret) {
2125 goto end;
2126 }
2127end:
2128 return ret;
2129}
2130
2131static
2132bool evaluate_buffer_usage_condition(struct lttng_condition *condition,
2133 struct channel_state_sample *sample, uint64_t buffer_capacity)
2134{
2135 bool result = false;
2136 uint64_t threshold;
2137 enum lttng_condition_type condition_type;
2138 struct lttng_condition_buffer_usage *use_condition = container_of(
2139 condition, struct lttng_condition_buffer_usage,
2140 parent);
2141
2142 if (!sample) {
2143 goto end;
2144 }
2145
2146 if (use_condition->threshold_bytes.set) {
2147 threshold = use_condition->threshold_bytes.value;
2148 } else {
2149 /*
2150 * Threshold was expressed as a ratio.
2151 *
2152 * TODO the threshold (in bytes) of conditions expressed
2153 * as a ratio of total buffer size could be cached to
2154 * forego this double-multiplication or it could be performed
2155 * as fixed-point math.
2156 *
2157 * Note that caching should accomodate the case where the
2158 * condition applies to multiple channels (i.e. don't assume
2159 * that all channels matching my_chann* have the same size...)
2160 */
2161 threshold = (uint64_t) (use_condition->threshold_ratio.value *
2162 (double) buffer_capacity);
2163 }
2164
2165 condition_type = lttng_condition_get_type(condition);
2166 if (condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW) {
2167 DBG("[notification-thread] Low buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
2168 threshold, sample->highest_usage);
2169
2170 /*
2171 * The low condition should only be triggered once _all_ of the
2172 * streams in a channel have gone below the "low" threshold.
2173 */
2174 if (sample->highest_usage <= threshold) {
2175 result = true;
2176 }
2177 } else {
2178 DBG("[notification-thread] High buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
2179 threshold, sample->highest_usage);
2180
2181 /*
2182 * For high buffer usage scenarios, we want to trigger whenever
2183 * _any_ of the streams has reached the "high" threshold.
2184 */
2185 if (sample->highest_usage >= threshold) {
2186 result = true;
2187 }
2188 }
2189end:
2190 return result;
2191}
2192
2193static
2194int evaluate_condition(struct lttng_condition *condition,
2195 struct lttng_evaluation **evaluation,
2196 struct notification_thread_state *state,
2197 struct channel_state_sample *previous_sample,
2198 struct channel_state_sample *latest_sample,
2199 uint64_t buffer_capacity)
2200{
2201 int ret = 0;
2202 enum lttng_condition_type condition_type;
2203 bool previous_sample_result;
2204 bool latest_sample_result;
2205
2206 condition_type = lttng_condition_get_type(condition);
2207 /* No other condition type supported for the moment. */
2208 assert(condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW ||
2209 condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH);
2210
2211 previous_sample_result = evaluate_buffer_usage_condition(condition,
2212 previous_sample, buffer_capacity);
2213 latest_sample_result = evaluate_buffer_usage_condition(condition,
2214 latest_sample, buffer_capacity);
2215
2216 if (!latest_sample_result ||
2217 (previous_sample_result == latest_sample_result)) {
2218 /*
2219 * Only trigger on a condition evaluation transition.
2220 *
2221 * NOTE: This edge-triggered logic may not be appropriate for
2222 * future condition types.
2223 */
2224 goto end;
2225 }
2226
2227 if (evaluation && latest_sample_result) {
2228 *evaluation = lttng_evaluation_buffer_usage_create(
2229 condition_type,
2230 latest_sample->highest_usage,
2231 buffer_capacity);
2232 if (!*evaluation) {
2233 ret = -1;
2234 goto end;
2235 }
2236 }
2237end:
2238 return ret;
2239}
2240
2241static
2242int client_enqueue_dropped_notification(struct notification_client *client,
2243 struct notification_thread_state *state)
2244{
2245 int ret;
2246 struct lttng_notification_channel_message msg = {
2247 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED,
2248 .size = 0,
2249 };
2250
2251 ret = lttng_dynamic_buffer_append(
2252 &client->communication.outbound.buffer, &msg,
2253 sizeof(msg));
2254 return ret;
2255}
2256
2257static
2258int send_evaluation_to_clients(struct lttng_trigger *trigger,
2259 struct lttng_evaluation *evaluation,
2260 struct notification_client_list* client_list,
2261 struct notification_thread_state *state,
2262 uid_t channel_uid, gid_t channel_gid)
2263{
2264 int ret = 0;
2265 struct lttng_dynamic_buffer msg_buffer;
2266 struct notification_client_list_element *client_list_element, *tmp;
2267 struct lttng_notification *notification;
2268 struct lttng_condition *condition;
2269 ssize_t expected_notification_size, notification_size;
2270 struct lttng_notification_channel_message msg;
2271
2272 lttng_dynamic_buffer_init(&msg_buffer);
2273
2274 condition = lttng_trigger_get_condition(trigger);
2275 assert(condition);
2276
2277 notification = lttng_notification_create(condition, evaluation);
2278 if (!notification) {
2279 ret = -1;
2280 goto end;
2281 }
2282
2283 expected_notification_size = lttng_notification_serialize(notification,
2284 NULL);
2285 if (expected_notification_size < 0) {
2286 ERR("[notification-thread] Failed to get size of serialized notification");
2287 ret = -1;
2288 goto end;
2289 }
2290
2291 msg.type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION;
2292 msg.size = (uint32_t) expected_notification_size;
2293 ret = lttng_dynamic_buffer_append(&msg_buffer, &msg, sizeof(msg));
2294 if (ret) {
2295 goto end;
2296 }
2297
2298 ret = lttng_dynamic_buffer_set_size(&msg_buffer,
2299 msg_buffer.size + expected_notification_size);
2300 if (ret) {
2301 goto end;
2302 }
2303
2304 notification_size = lttng_notification_serialize(notification,
2305 msg_buffer.data + sizeof(msg));
2306 if (notification_size != expected_notification_size) {
2307 ERR("[notification-thread] Failed to serialize notification");
2308 ret = -1;
2309 goto end;
2310 }
2311
2312 cds_list_for_each_entry_safe(client_list_element, tmp,
2313 &client_list->list, node) {
2314 struct notification_client *client =
2315 client_list_element->client;
2316
2317 if (client->uid != channel_uid && client->gid != channel_gid &&
2318 client->uid != 0) {
2319 /* Client is not allowed to monitor this channel. */
2320 DBG("[notification-thread] Skipping client at it does not have the permission to receive notification for this channel");
2321 continue;
2322 }
2323
2324 DBG("[notification-thread] Sending notification to client (fd = %i, %zu bytes)",
2325 client->socket, msg_buffer.size);
2326 if (client->communication.outbound.buffer.size) {
2327 /*
2328 * Outgoing data is already buffered for this client;
2329 * drop the notification and enqueue a "dropped
2330 * notification" message if this is the first dropped
2331 * notification since the socket spilled-over to the
2332 * queue.
2333 */
2334 DBG("[notification-thread] Dropping notification addressed to client (socket fd = %i)",
2335 client->socket);
2336 if (!client->communication.outbound.dropped_notification) {
2337 client->communication.outbound.dropped_notification = true;
2338 ret = client_enqueue_dropped_notification(
2339 client, state);
2340 if (ret) {
2341 goto end;
2342 }
2343 }
2344 continue;
2345 }
2346
2347 ret = lttng_dynamic_buffer_append_buffer(
2348 &client->communication.outbound.buffer,
2349 &msg_buffer);
2350 if (ret) {
2351 goto end;
2352 }
2353
2354 ret = client_flush_outgoing_queue(client, state);
2355 if (ret) {
2356 goto end;
2357 }
2358 }
2359 ret = 0;
2360end:
2361 lttng_notification_destroy(notification);
2362 lttng_dynamic_buffer_reset(&msg_buffer);
2363 return ret;
2364}
2365
2366int handle_notification_thread_channel_sample(
2367 struct notification_thread_state *state, int pipe,
2368 enum lttng_domain_type domain)
2369{
2370 int ret = 0;
2371 struct lttcomm_consumer_channel_monitor_msg sample_msg;
2372 struct channel_state_sample previous_sample, latest_sample;
2373 struct channel_info *channel_info;
2374 struct cds_lfht_node *node;
2375 struct cds_lfht_iter iter;
2376 struct lttng_channel_trigger_list *trigger_list;
2377 struct lttng_trigger_list_element *trigger_list_element;
2378 bool previous_sample_available = false;
2379
2380 /*
2381 * The monitoring pipe only holds messages smaller than PIPE_BUF,
2382 * ensuring that read/write of sampling messages are atomic.
2383 */
7c2551ef 2384 ret = lttng_read(pipe, &sample_msg, sizeof(sample_msg));
ab0ee2ca
JG
2385 if (ret != sizeof(sample_msg)) {
2386 ERR("[notification-thread] Failed to read from monitoring pipe (fd = %i)",
2387 pipe);
2388 ret = -1;
2389 goto end;
2390 }
2391
2392 ret = 0;
2393 latest_sample.key.key = sample_msg.key;
2394 latest_sample.key.domain = domain;
2395 latest_sample.highest_usage = sample_msg.highest;
2396 latest_sample.lowest_usage = sample_msg.lowest;
2397
2398 rcu_read_lock();
2399
2400 /* Retrieve the channel's informations */
2401 cds_lfht_lookup(state->channels_ht,
2402 hash_channel_key(&latest_sample.key),
2403 match_channel_info,
2404 &latest_sample.key,
2405 &iter);
2406 node = cds_lfht_iter_get_node(&iter);
e0b5f87b 2407 if (caa_unlikely(!node)) {
ab0ee2ca
JG
2408 /*
2409 * Not an error since the consumer can push a sample to the pipe
2410 * and the rest of the session daemon could notify us of the
2411 * channel's destruction before we get a chance to process that
2412 * sample.
2413 */
2414 DBG("[notification-thread] Received a sample for an unknown channel from consumerd, key = %" PRIu64 " in %s domain",
2415 latest_sample.key.key,
2416 domain == LTTNG_DOMAIN_KERNEL ? "kernel" :
2417 "user space");
2418 goto end_unlock;
2419 }
2420 channel_info = caa_container_of(node, struct channel_info,
2421 channels_ht_node);
2422 DBG("[notification-thread] Handling channel sample for channel %s (key = %" PRIu64 ") in session %s (highest usage = %" PRIu64 ", lowest usage = %" PRIu64")",
8abe313a 2423 channel_info->name,
ab0ee2ca 2424 latest_sample.key.key,
8abe313a 2425 channel_info->session_info->name,
ab0ee2ca
JG
2426 latest_sample.highest_usage,
2427 latest_sample.lowest_usage);
2428
2429 /* Retrieve the channel's last sample, if it exists, and update it. */
2430 cds_lfht_lookup(state->channel_state_ht,
2431 hash_channel_key(&latest_sample.key),
2432 match_channel_state_sample,
2433 &latest_sample.key,
2434 &iter);
2435 node = cds_lfht_iter_get_node(&iter);
e0b5f87b 2436 if (caa_likely(node)) {
ab0ee2ca
JG
2437 struct channel_state_sample *stored_sample;
2438
2439 /* Update the sample stored. */
2440 stored_sample = caa_container_of(node,
2441 struct channel_state_sample,
2442 channel_state_ht_node);
2443 memcpy(&previous_sample, stored_sample,
2444 sizeof(previous_sample));
2445 stored_sample->highest_usage = latest_sample.highest_usage;
2446 stored_sample->lowest_usage = latest_sample.lowest_usage;
0f0479d7 2447 stored_sample->channel_total_consumed = latest_sample.channel_total_consumed;
ab0ee2ca
JG
2448 previous_sample_available = true;
2449 } else {
2450 /*
2451 * This is the channel's first sample, allocate space for and
2452 * store the new sample.
2453 */
2454 struct channel_state_sample *stored_sample;
2455
2456 stored_sample = zmalloc(sizeof(*stored_sample));
2457 if (!stored_sample) {
2458 ret = -1;
2459 goto end_unlock;
2460 }
2461
2462 memcpy(stored_sample, &latest_sample, sizeof(*stored_sample));
2463 cds_lfht_node_init(&stored_sample->channel_state_ht_node);
2464 cds_lfht_add(state->channel_state_ht,
2465 hash_channel_key(&stored_sample->key),
2466 &stored_sample->channel_state_ht_node);
2467 }
2468
2469 /* Find triggers associated with this channel. */
2470 cds_lfht_lookup(state->channel_triggers_ht,
2471 hash_channel_key(&latest_sample.key),
2472 match_channel_trigger_list,
2473 &latest_sample.key,
2474 &iter);
2475 node = cds_lfht_iter_get_node(&iter);
e0b5f87b 2476 if (caa_likely(!node)) {
ab0ee2ca
JG
2477 goto end_unlock;
2478 }
2479
2480 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
2481 channel_triggers_ht_node);
2482 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
2483 node) {
2484 struct lttng_condition *condition;
2485 struct lttng_action *action;
2486 struct lttng_trigger *trigger;
2487 struct notification_client_list *client_list;
2488 struct lttng_evaluation *evaluation = NULL;
2489
2490 trigger = trigger_list_element->trigger;
2491 condition = lttng_trigger_get_condition(trigger);
2492 assert(condition);
2493 action = lttng_trigger_get_action(trigger);
2494
2495 /* Notify actions are the only type currently supported. */
2496 assert(lttng_action_get_type(action) ==
2497 LTTNG_ACTION_TYPE_NOTIFY);
2498
2499 /*
2500 * Check if any client is subscribed to the result of this
2501 * evaluation.
2502 */
2503 cds_lfht_lookup(state->notification_trigger_clients_ht,
2504 lttng_condition_hash(condition),
2505 match_client_list,
2506 trigger,
2507 &iter);
2508 node = cds_lfht_iter_get_node(&iter);
2509 assert(node);
2510
2511 client_list = caa_container_of(node,
2512 struct notification_client_list,
2513 notification_trigger_ht_node);
2514 if (cds_list_empty(&client_list->list)) {
2515 /*
2516 * No clients interested in the evaluation's result,
2517 * skip it.
2518 */
2519 continue;
2520 }
2521
2522 ret = evaluate_condition(condition, &evaluation, state,
2523 previous_sample_available ? &previous_sample : NULL,
2524 &latest_sample, channel_info->capacity);
2525 if (ret) {
2526 goto end_unlock;
2527 }
2528
2529 if (!evaluation) {
2530 continue;
2531 }
2532
2533 /* Dispatch evaluation result to all clients. */
2534 ret = send_evaluation_to_clients(trigger_list_element->trigger,
2535 evaluation, client_list, state,
8abe313a
JG
2536 channel_info->session_info->uid,
2537 channel_info->session_info->gid);
2538 lttng_evaluation_destroy(evaluation);
e0b5f87b 2539 if (caa_unlikely(ret)) {
ab0ee2ca
JG
2540 goto end_unlock;
2541 }
2542 }
2543end_unlock:
2544 rcu_read_unlock();
2545end:
2546 return ret;
2547}
This page took 0.218507 seconds and 4 git commands to generate.