Session consumed size notification
[lttng-tools.git] / src / bin / lttng-sessiond / notification-thread-events.c
CommitLineData
ab0ee2ca
JG
1/*
2 * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18#define _LGPL_SOURCE
19#include <urcu.h>
20#include <urcu/rculfhash.h>
21
ab0ee2ca
JG
22#include <common/defaults.h>
23#include <common/error.h>
24#include <common/futex.h>
25#include <common/unix.h>
26#include <common/dynamic-buffer.h>
27#include <common/hashtable/utils.h>
28#include <common/sessiond-comm/sessiond-comm.h>
29#include <common/macros.h>
30#include <lttng/condition/condition.h>
31#include <lttng/action/action.h>
32#include <lttng/notification/notification-internal.h>
33#include <lttng/condition/condition-internal.h>
34#include <lttng/condition/buffer-usage-internal.h>
e8360425 35#include <lttng/condition/session-consumed-size-internal.h>
ab0ee2ca 36#include <lttng/notification/channel-internal.h>
1da26331 37
ab0ee2ca
JG
38#include <time.h>
39#include <unistd.h>
40#include <assert.h>
41#include <inttypes.h>
d53ea4e4 42#include <fcntl.h>
ab0ee2ca 43
1da26331
JG
44#include "notification-thread.h"
45#include "notification-thread-events.h"
46#include "notification-thread-commands.h"
47#include "lttng-sessiond.h"
48#include "kernel.h"
49
ab0ee2ca
JG
50#define CLIENT_POLL_MASK_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
51#define CLIENT_POLL_MASK_IN_OUT (CLIENT_POLL_MASK_IN | LPOLLOUT)
52
53struct lttng_trigger_list_element {
54 struct lttng_trigger *trigger;
55 struct cds_list_head node;
56};
57
58struct lttng_channel_trigger_list {
59 struct channel_key channel_key;
60 struct cds_list_head list;
61 struct cds_lfht_node channel_triggers_ht_node;
62};
63
64struct lttng_trigger_ht_element {
65 struct lttng_trigger *trigger;
66 struct cds_lfht_node node;
67};
68
69struct lttng_condition_list_element {
70 struct lttng_condition *condition;
71 struct cds_list_head node;
72};
73
74struct notification_client_list_element {
75 struct notification_client *client;
76 struct cds_list_head node;
77};
78
79struct notification_client_list {
80 struct lttng_trigger *trigger;
81 struct cds_list_head list;
82 struct cds_lfht_node notification_trigger_ht_node;
83};
84
85struct notification_client {
86 int socket;
87 /* Client protocol version. */
88 uint8_t major, minor;
89 uid_t uid;
90 gid_t gid;
91 /*
5332364f 92 * Indicates if the credentials and versions of the client have been
ab0ee2ca
JG
93 * checked.
94 */
95 bool validated;
96 /*
97 * Conditions to which the client's notification channel is subscribed.
98 * List of struct lttng_condition_list_node. The condition member is
99 * owned by the client.
100 */
101 struct cds_list_head condition_list;
102 struct cds_lfht_node client_socket_ht_node;
103 struct {
104 struct {
14fa22f8
JG
105 /*
106 * During the reception of a message, the reception
107 * buffers' "size" is set to contain the current
108 * message's complete payload.
109 */
ab0ee2ca
JG
110 struct lttng_dynamic_buffer buffer;
111 /* Bytes left to receive for the current message. */
112 size_t bytes_to_receive;
113 /* Type of the message being received. */
114 enum lttng_notification_channel_message_type msg_type;
115 /*
116 * Indicates whether or not credentials are expected
117 * from the client.
118 */
01ea340e 119 bool expect_creds;
ab0ee2ca
JG
120 /*
121 * Indicates whether or not credentials were received
122 * from the client.
123 */
124 bool creds_received;
14fa22f8 125 /* Only used during credentials reception. */
ab0ee2ca
JG
126 lttng_sock_cred creds;
127 } inbound;
128 struct {
129 /*
130 * Indicates whether or not a notification addressed to
131 * this client was dropped because a command reply was
132 * already buffered.
133 *
134 * A notification is dropped whenever the buffer is not
135 * empty.
136 */
137 bool dropped_notification;
138 /*
139 * Indicates whether or not a command reply is already
140 * buffered. In this case, it means that the client is
141 * not consuming command replies before emitting a new
142 * one. This could be caused by a protocol error or a
143 * misbehaving/malicious client.
144 */
145 bool queued_command_reply;
146 struct lttng_dynamic_buffer buffer;
147 } outbound;
148 } communication;
149};
150
151struct channel_state_sample {
152 struct channel_key key;
153 struct cds_lfht_node channel_state_ht_node;
154 uint64_t highest_usage;
155 uint64_t lowest_usage;
e8360425 156 uint64_t channel_total_consumed;
ab0ee2ca
JG
157};
158
e4db5ace 159static unsigned long hash_channel_key(struct channel_key *key);
e8360425 160static int evaluate_condition(const struct lttng_condition *condition,
e4db5ace 161 struct lttng_evaluation **evaluation,
e8360425
JD
162 const struct notification_thread_state *state,
163 const struct channel_state_sample *previous_sample,
164 const struct channel_state_sample *latest_sample,
165 uint64_t previous_session_consumed_total,
166 uint64_t latest_session_consumed_total,
167 struct channel_info *channel_info);
e4db5ace
JR
168static
169int send_evaluation_to_clients(struct lttng_trigger *trigger,
170 struct lttng_evaluation *evaluation,
171 struct notification_client_list *client_list,
172 struct notification_thread_state *state,
173 uid_t channel_uid, gid_t channel_gid);
174
8abe313a
JG
175
176static
177void session_info_destroy(void *_data);
178static
179void session_info_get(struct session_info *session_info);
180static
181void session_info_put(struct session_info *session_info);
182static
183struct session_info *session_info_create(const char *name,
184 uid_t uid, gid_t gid);
185static
186void session_info_add_channel(struct session_info *session_info,
187 struct channel_info *channel_info);
188static
189void session_info_remove_channel(struct session_info *session_info,
190 struct channel_info *channel_info);
191
ab0ee2ca
JG
192static
193int match_client(struct cds_lfht_node *node, const void *key)
194{
195 /* This double-cast is intended to supress pointer-to-cast warning. */
196 int socket = (int) (intptr_t) key;
197 struct notification_client *client;
198
199 client = caa_container_of(node, struct notification_client,
200 client_socket_ht_node);
201
202 return !!(client->socket == socket);
203}
204
205static
206int match_channel_trigger_list(struct cds_lfht_node *node, const void *key)
207{
208 struct channel_key *channel_key = (struct channel_key *) key;
209 struct lttng_channel_trigger_list *trigger_list;
210
211 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
212 channel_triggers_ht_node);
213
214 return !!((channel_key->key == trigger_list->channel_key.key) &&
215 (channel_key->domain == trigger_list->channel_key.domain));
216}
217
218static
219int match_channel_state_sample(struct cds_lfht_node *node, const void *key)
220{
221 struct channel_key *channel_key = (struct channel_key *) key;
222 struct channel_state_sample *sample;
223
224 sample = caa_container_of(node, struct channel_state_sample,
225 channel_state_ht_node);
226
227 return !!((channel_key->key == sample->key.key) &&
228 (channel_key->domain == sample->key.domain));
229}
230
231static
232int match_channel_info(struct cds_lfht_node *node, const void *key)
233{
234 struct channel_key *channel_key = (struct channel_key *) key;
235 struct channel_info *channel_info;
236
237 channel_info = caa_container_of(node, struct channel_info,
238 channels_ht_node);
239
240 return !!((channel_key->key == channel_info->key.key) &&
241 (channel_key->domain == channel_info->key.domain));
242}
243
244static
245int match_condition(struct cds_lfht_node *node, const void *key)
246{
247 struct lttng_condition *condition_key = (struct lttng_condition *) key;
248 struct lttng_trigger_ht_element *trigger;
249 struct lttng_condition *condition;
250
251 trigger = caa_container_of(node, struct lttng_trigger_ht_element,
252 node);
253 condition = lttng_trigger_get_condition(trigger->trigger);
254 assert(condition);
255
256 return !!lttng_condition_is_equal(condition_key, condition);
257}
258
259static
260int match_client_list(struct cds_lfht_node *node, const void *key)
261{
262 struct lttng_trigger *trigger_key = (struct lttng_trigger *) key;
263 struct notification_client_list *client_list;
264 struct lttng_condition *condition;
265 struct lttng_condition *condition_key = lttng_trigger_get_condition(
266 trigger_key);
267
268 assert(condition_key);
269
270 client_list = caa_container_of(node, struct notification_client_list,
271 notification_trigger_ht_node);
272 condition = lttng_trigger_get_condition(client_list->trigger);
273
274 return !!lttng_condition_is_equal(condition_key, condition);
275}
276
277static
278int match_client_list_condition(struct cds_lfht_node *node, const void *key)
279{
280 struct lttng_condition *condition_key = (struct lttng_condition *) key;
281 struct notification_client_list *client_list;
282 struct lttng_condition *condition;
283
284 assert(condition_key);
285
286 client_list = caa_container_of(node, struct notification_client_list,
287 notification_trigger_ht_node);
288 condition = lttng_trigger_get_condition(client_list->trigger);
289
290 return !!lttng_condition_is_equal(condition_key, condition);
291}
292
293static
294unsigned long lttng_condition_buffer_usage_hash(
295 struct lttng_condition *_condition)
296{
297 unsigned long hash = 0;
298 struct lttng_condition_buffer_usage *condition;
299
300 condition = container_of(_condition,
301 struct lttng_condition_buffer_usage, parent);
302
303 if (condition->session_name) {
304 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
305 }
306 if (condition->channel_name) {
8f56701f 307 hash ^= hash_key_str(condition->channel_name, lttng_ht_seed);
ab0ee2ca
JG
308 }
309 if (condition->domain.set) {
310 hash ^= hash_key_ulong(
311 (void *) condition->domain.type,
312 lttng_ht_seed);
313 }
314 if (condition->threshold_ratio.set) {
315 uint64_t val;
316
317 val = condition->threshold_ratio.value * (double) UINT32_MAX;
318 hash ^= hash_key_u64(&val, lttng_ht_seed);
6633b0dd 319 } else if (condition->threshold_bytes.set) {
ab0ee2ca
JG
320 uint64_t val;
321
322 val = condition->threshold_bytes.value;
323 hash ^= hash_key_u64(&val, lttng_ht_seed);
324 }
325 return hash;
326}
327
e8360425
JD
328static
329unsigned long lttng_condition_session_consumed_size_hash(
330 struct lttng_condition *_condition)
331{
332 unsigned long hash = 0;
333 struct lttng_condition_session_consumed_size *condition;
334 uint64_t val;
335
336 condition = container_of(_condition,
337 struct lttng_condition_session_consumed_size, parent);
338
339 if (condition->session_name) {
340 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
341 }
342 val = condition->consumed_threshold_bytes.value;
343 hash ^= hash_key_u64(&val, lttng_ht_seed);
344 return hash;
345}
346
ab0ee2ca
JG
347/*
348 * The lttng_condition hashing code is kept in this file (rather than
349 * condition.c) since it makes use of GPLv2 code (hashtable utils), which we
350 * don't want to link in liblttng-ctl.
351 */
352static
353unsigned long lttng_condition_hash(struct lttng_condition *condition)
354{
355 switch (condition->type) {
356 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
357 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
358 return lttng_condition_buffer_usage_hash(condition);
e8360425
JD
359 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
360 return lttng_condition_session_consumed_size_hash(condition);
ab0ee2ca
JG
361 default:
362 ERR("[notification-thread] Unexpected condition type caught");
363 abort();
364 }
365}
366
e4db5ace
JR
367static
368unsigned long hash_channel_key(struct channel_key *key)
369{
370 unsigned long key_hash = hash_key_u64(&key->key, lttng_ht_seed);
371 unsigned long domain_hash = hash_key_ulong(
372 (void *) (unsigned long) key->domain, lttng_ht_seed);
373
374 return key_hash ^ domain_hash;
375}
376
ab0ee2ca
JG
377static
378void channel_info_destroy(struct channel_info *channel_info)
379{
380 if (!channel_info) {
381 return;
382 }
383
8abe313a
JG
384 if (channel_info->session_info) {
385 session_info_remove_channel(channel_info->session_info,
386 channel_info);
387 session_info_put(channel_info->session_info);
ab0ee2ca 388 }
8abe313a
JG
389 if (channel_info->name) {
390 free(channel_info->name);
ab0ee2ca
JG
391 }
392 free(channel_info);
393}
394
8abe313a 395/* Don't call directly, use the ref-counting mechanism. */
ab0ee2ca 396static
8abe313a 397void session_info_destroy(void *_data)
ab0ee2ca 398{
8abe313a 399 struct session_info *session_info = _data;
ab0ee2ca 400
8abe313a
JG
401 assert(session_info);
402 if (session_info->channel_infos_ht) {
403 cds_lfht_destroy(session_info->channel_infos_ht, NULL);
404 }
405 free(session_info->name);
406 free(session_info);
407}
ab0ee2ca 408
8abe313a
JG
409static
410void session_info_get(struct session_info *session_info)
411{
412 if (!session_info) {
413 return;
414 }
415 lttng_ref_get(&session_info->ref);
416}
417
418static
419void session_info_put(struct session_info *session_info)
420{
421 if (!session_info) {
422 return;
ab0ee2ca 423 }
8abe313a
JG
424 lttng_ref_put(&session_info->ref);
425}
426
427static
428struct session_info *session_info_create(const char *name, uid_t uid, gid_t gid)
429{
430 struct session_info *session_info;
ab0ee2ca 431
8abe313a 432 assert(name);
ab0ee2ca 433
8abe313a
JG
434 session_info = zmalloc(sizeof(*session_info));
435 if (!session_info) {
436 goto end;
437 }
438 lttng_ref_init(&session_info->ref, session_info_destroy);
439
440 session_info->channel_infos_ht = cds_lfht_new(DEFAULT_HT_SIZE,
441 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
442 if (!session_info->channel_infos_ht) {
ab0ee2ca
JG
443 goto error;
444 }
8abe313a
JG
445
446 cds_lfht_node_init(&session_info->sessions_ht_node);
447 session_info->name = strdup(name);
448 if (!session_info->name) {
ab0ee2ca
JG
449 goto error;
450 }
8abe313a
JG
451 session_info->uid = uid;
452 session_info->gid = gid;
453end:
454 return session_info;
455error:
456 session_info_put(session_info);
457 return NULL;
458}
459
460static
461void session_info_add_channel(struct session_info *session_info,
462 struct channel_info *channel_info)
463{
464 rcu_read_lock();
465 cds_lfht_add(session_info->channel_infos_ht,
466 hash_channel_key(&channel_info->key),
467 &channel_info->session_info_channels_ht_node);
468 rcu_read_unlock();
469}
470
471static
472void session_info_remove_channel(struct session_info *session_info,
473 struct channel_info *channel_info)
474{
475 rcu_read_lock();
476 cds_lfht_del(session_info->channel_infos_ht,
477 &channel_info->session_info_channels_ht_node);
478 rcu_read_unlock();
479}
480
481static
482struct channel_info *channel_info_create(const char *channel_name,
483 struct channel_key *channel_key, uint64_t channel_capacity,
484 struct session_info *session_info)
485{
486 struct channel_info *channel_info = zmalloc(sizeof(*channel_info));
487
488 if (!channel_info) {
489 goto end;
490 }
491
ab0ee2ca 492 cds_lfht_node_init(&channel_info->channels_ht_node);
8abe313a
JG
493 cds_lfht_node_init(&channel_info->session_info_channels_ht_node);
494 memcpy(&channel_info->key, channel_key, sizeof(*channel_key));
495 channel_info->capacity = channel_capacity;
496
497 channel_info->name = strdup(channel_name);
498 if (!channel_info->name) {
499 goto error;
500 }
501
502 /*
503 * Set the references between session and channel infos:
504 * - channel_info holds a strong reference to session_info
505 * - session_info holds a weak reference to channel_info
506 */
507 session_info_get(session_info);
508 session_info_add_channel(session_info, channel_info);
509 channel_info->session_info = session_info;
ab0ee2ca 510end:
8abe313a 511 return channel_info;
ab0ee2ca 512error:
8abe313a 513 channel_info_destroy(channel_info);
ab0ee2ca
JG
514 return NULL;
515}
516
e4db5ace
JR
517/* This function must be called with the RCU read lock held. */
518static
519int evaluate_condition_for_client(struct lttng_trigger *trigger,
520 struct lttng_condition *condition,
521 struct notification_client *client,
522 struct notification_thread_state *state)
523{
524 int ret;
525 struct cds_lfht_iter iter;
526 struct cds_lfht_node *node;
527 struct channel_info *channel_info = NULL;
528 struct channel_key *channel_key = NULL;
529 struct channel_state_sample *last_sample = NULL;
530 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
531 struct lttng_evaluation *evaluation = NULL;
532 struct notification_client_list client_list = { 0 };
533 struct notification_client_list_element client_list_element = { 0 };
534
535 assert(trigger);
536 assert(condition);
537 assert(client);
538 assert(state);
539
540 /* Find the channel associated with the trigger. */
541 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter,
542 channel_trigger_list , channel_triggers_ht_node) {
543 struct lttng_trigger_list_element *element;
544
545 cds_list_for_each_entry(element, &channel_trigger_list->list, node) {
546 struct lttng_condition *current_condition =
547 lttng_trigger_get_condition(
548 element->trigger);
549
550 assert(current_condition);
551 if (!lttng_condition_is_equal(condition,
2ae99f0b 552 current_condition)) {
e4db5ace
JR
553 continue;
554 }
555
556 /* Found the trigger, save the channel key. */
557 channel_key = &channel_trigger_list->channel_key;
558 break;
559 }
560 if (channel_key) {
561 /* The channel key was found stop iteration. */
562 break;
563 }
564 }
565
566 if (!channel_key){
567 /* No channel found; normal exit. */
568 DBG("[notification-thread] No channel associated with newly subscribed-to condition");
569 ret = 0;
570 goto end;
571 }
572
573 /* Fetch channel info for the matching channel. */
574 cds_lfht_lookup(state->channels_ht,
575 hash_channel_key(channel_key),
576 match_channel_info,
577 channel_key,
578 &iter);
579 node = cds_lfht_iter_get_node(&iter);
580 assert(node);
581 channel_info = caa_container_of(node, struct channel_info,
582 channels_ht_node);
583
584 /* Retrieve the channel's last sample, if it exists. */
585 cds_lfht_lookup(state->channel_state_ht,
586 hash_channel_key(channel_key),
587 match_channel_state_sample,
588 channel_key,
589 &iter);
590 node = cds_lfht_iter_get_node(&iter);
591 if (node) {
592 last_sample = caa_container_of(node,
593 struct channel_state_sample,
594 channel_state_ht_node);
595 } else {
596 /* Nothing to evaluate, no sample was ever taken. Normal exit */
597 DBG("[notification-thread] No channel sample associated with newly subscribed-to condition");
598 ret = 0;
599 goto end;
600 }
601
e8360425
JD
602 ret = evaluate_condition(condition, &evaluation, state,
603 NULL, last_sample,
604 0, channel_info->session_info->consumed_data_size,
605 channel_info);
e4db5ace 606 if (ret) {
066a3c82 607 WARN("[notification-thread] Fatal error occurred while evaluating a newly subscribed-to condition");
e4db5ace
JR
608 goto end;
609 }
610
611 if (!evaluation) {
612 /* Evaluation yielded nothing. Normal exit. */
613 DBG("[notification-thread] Newly subscribed-to condition evaluated to false, nothing to report to client");
614 ret = 0;
615 goto end;
616 }
617
618 /*
619 * Create a temporary client list with the client currently
620 * subscribing.
621 */
622 cds_lfht_node_init(&client_list.notification_trigger_ht_node);
623 CDS_INIT_LIST_HEAD(&client_list.list);
624 client_list.trigger = trigger;
625
626 CDS_INIT_LIST_HEAD(&client_list_element.node);
627 client_list_element.client = client;
628 cds_list_add(&client_list_element.node, &client_list.list);
629
630 /* Send evaluation result to the newly-subscribed client. */
631 DBG("[notification-thread] Newly subscribed-to condition evaluated to true, notifying client");
632 ret = send_evaluation_to_clients(trigger, evaluation, &client_list,
8abe313a
JG
633 state, channel_info->session_info->uid,
634 channel_info->session_info->gid);
e4db5ace
JR
635
636end:
637 return ret;
638}
639
ab0ee2ca
JG
640static
641int notification_thread_client_subscribe(struct notification_client *client,
642 struct lttng_condition *condition,
643 struct notification_thread_state *state,
644 enum lttng_notification_channel_status *_status)
645{
646 int ret = 0;
647 struct cds_lfht_iter iter;
648 struct cds_lfht_node *node;
649 struct notification_client_list *client_list;
650 struct lttng_condition_list_element *condition_list_element = NULL;
651 struct notification_client_list_element *client_list_element = NULL;
652 enum lttng_notification_channel_status status =
653 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
654
655 /*
656 * Ensure that the client has not already subscribed to this condition
657 * before.
658 */
659 cds_list_for_each_entry(condition_list_element, &client->condition_list, node) {
660 if (lttng_condition_is_equal(condition_list_element->condition,
661 condition)) {
662 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ALREADY_SUBSCRIBED;
663 goto end;
664 }
665 }
666
667 condition_list_element = zmalloc(sizeof(*condition_list_element));
668 if (!condition_list_element) {
669 ret = -1;
670 goto error;
671 }
672 client_list_element = zmalloc(sizeof(*client_list_element));
673 if (!client_list_element) {
674 ret = -1;
675 goto error;
676 }
677
678 rcu_read_lock();
679
680 /*
681 * Add the newly-subscribed condition to the client's subscription list.
682 */
683 CDS_INIT_LIST_HEAD(&condition_list_element->node);
684 condition_list_element->condition = condition;
685 cds_list_add(&condition_list_element->node, &client->condition_list);
686
ab0ee2ca
JG
687 cds_lfht_lookup(state->notification_trigger_clients_ht,
688 lttng_condition_hash(condition),
689 match_client_list_condition,
690 condition,
691 &iter);
692 node = cds_lfht_iter_get_node(&iter);
693 if (!node) {
2ae99f0b
JG
694 /*
695 * No notification-emiting trigger registered with this
696 * condition. We don't evaluate the condition right away
697 * since this trigger is not registered yet.
698 */
4fb43b68 699 free(client_list_element);
ab0ee2ca
JG
700 goto end_unlock;
701 }
702
703 client_list = caa_container_of(node, struct notification_client_list,
704 notification_trigger_ht_node);
2ae99f0b
JG
705 /*
706 * The condition to which the client just subscribed is evaluated
707 * at this point so that conditions that are already TRUE result
708 * in a notification being sent out.
709 */
e4db5ace
JR
710 if (evaluate_condition_for_client(client_list->trigger, condition,
711 client, state)) {
712 WARN("[notification-thread] Evaluation of a condition on client subscription failed, aborting.");
713 ret = -1;
714 goto end_unlock;
715 }
716
717 /*
718 * Add the client to the list of clients interested in a given trigger
719 * if a "notification" trigger with a corresponding condition was
720 * added prior.
721 */
ab0ee2ca
JG
722 client_list_element->client = client;
723 CDS_INIT_LIST_HEAD(&client_list_element->node);
724 cds_list_add(&client_list_element->node, &client_list->list);
725end_unlock:
726 rcu_read_unlock();
727end:
728 if (_status) {
729 *_status = status;
730 }
731 return ret;
732error:
733 free(condition_list_element);
734 free(client_list_element);
735 return ret;
736}
737
738static
739int notification_thread_client_unsubscribe(
740 struct notification_client *client,
741 struct lttng_condition *condition,
742 struct notification_thread_state *state,
743 enum lttng_notification_channel_status *_status)
744{
745 struct cds_lfht_iter iter;
746 struct cds_lfht_node *node;
747 struct notification_client_list *client_list;
748 struct lttng_condition_list_element *condition_list_element,
749 *condition_tmp;
750 struct notification_client_list_element *client_list_element,
751 *client_tmp;
752 bool condition_found = false;
753 enum lttng_notification_channel_status status =
754 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
755
756 /* Remove the condition from the client's condition list. */
757 cds_list_for_each_entry_safe(condition_list_element, condition_tmp,
758 &client->condition_list, node) {
759 if (!lttng_condition_is_equal(condition_list_element->condition,
760 condition)) {
761 continue;
762 }
763
764 cds_list_del(&condition_list_element->node);
765 /*
766 * The caller may be iterating on the client's conditions to
767 * tear down a client's connection. In this case, the condition
768 * will be destroyed at the end.
769 */
770 if (condition != condition_list_element->condition) {
771 lttng_condition_destroy(
772 condition_list_element->condition);
773 }
774 free(condition_list_element);
775 condition_found = true;
776 break;
777 }
778
779 if (!condition_found) {
780 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNKNOWN_CONDITION;
781 goto end;
782 }
783
784 /*
785 * Remove the client from the list of clients interested the trigger
786 * matching the condition.
787 */
788 rcu_read_lock();
789 cds_lfht_lookup(state->notification_trigger_clients_ht,
790 lttng_condition_hash(condition),
791 match_client_list_condition,
792 condition,
793 &iter);
794 node = cds_lfht_iter_get_node(&iter);
795 if (!node) {
796 goto end_unlock;
797 }
798
799 client_list = caa_container_of(node, struct notification_client_list,
800 notification_trigger_ht_node);
801 cds_list_for_each_entry_safe(client_list_element, client_tmp,
802 &client_list->list, node) {
803 if (client_list_element->client->socket != client->socket) {
804 continue;
805 }
806 cds_list_del(&client_list_element->node);
807 free(client_list_element);
808 break;
809 }
810end_unlock:
811 rcu_read_unlock();
812end:
813 lttng_condition_destroy(condition);
814 if (_status) {
815 *_status = status;
816 }
817 return 0;
818}
819
820static
821void notification_client_destroy(struct notification_client *client,
822 struct notification_thread_state *state)
823{
824 struct lttng_condition_list_element *condition_list_element, *tmp;
825
826 if (!client) {
827 return;
828 }
829
830 /* Release all conditions to which the client was subscribed. */
831 cds_list_for_each_entry_safe(condition_list_element, tmp,
832 &client->condition_list, node) {
833 (void) notification_thread_client_unsubscribe(client,
834 condition_list_element->condition, state, NULL);
835 }
836
837 if (client->socket >= 0) {
838 (void) lttcomm_close_unix_sock(client->socket);
839 }
840 lttng_dynamic_buffer_reset(&client->communication.inbound.buffer);
841 lttng_dynamic_buffer_reset(&client->communication.outbound.buffer);
842 free(client);
843}
844
845/*
846 * Call with rcu_read_lock held (and hold for the lifetime of the returned
847 * client pointer).
848 */
849static
850struct notification_client *get_client_from_socket(int socket,
851 struct notification_thread_state *state)
852{
853 struct cds_lfht_iter iter;
854 struct cds_lfht_node *node;
855 struct notification_client *client = NULL;
856
857 cds_lfht_lookup(state->client_socket_ht,
858 hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed),
859 match_client,
860 (void *) (unsigned long) socket,
861 &iter);
862 node = cds_lfht_iter_get_node(&iter);
863 if (!node) {
864 goto end;
865 }
866
867 client = caa_container_of(node, struct notification_client,
868 client_socket_ht_node);
869end:
870 return client;
871}
872
873static
e8360425
JD
874bool buffer_usage_condition_applies_to_channel(
875 struct lttng_condition *condition,
8abe313a 876 struct channel_info *channel_info)
ab0ee2ca
JG
877{
878 enum lttng_condition_status status;
e8360425
JD
879 enum lttng_domain_type condition_domain;
880 const char *condition_session_name = NULL;
881 const char *condition_channel_name = NULL;
ab0ee2ca 882
e8360425
JD
883 status = lttng_condition_buffer_usage_get_domain_type(condition,
884 &condition_domain);
885 assert(status == LTTNG_CONDITION_STATUS_OK);
886 if (channel_info->key.domain != condition_domain) {
ab0ee2ca
JG
887 goto fail;
888 }
889
e8360425
JD
890 status = lttng_condition_buffer_usage_get_session_name(
891 condition, &condition_session_name);
892 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
893
894 status = lttng_condition_buffer_usage_get_channel_name(
895 condition, &condition_channel_name);
896 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_channel_name);
897
898 if (strcmp(channel_info->session_info->name, condition_session_name)) {
899 goto fail;
900 }
901 if (strcmp(channel_info->name, condition_channel_name)) {
ab0ee2ca
JG
902 goto fail;
903 }
904
e8360425
JD
905 return true;
906fail:
907 return false;
908}
909
910static
911bool session_consumed_size_condition_applies_to_channel(
912 struct lttng_condition *condition,
913 struct channel_info *channel_info)
914{
915 enum lttng_condition_status status;
916 const char *condition_session_name = NULL;
917
918 status = lttng_condition_session_consumed_size_get_session_name(
919 condition, &condition_session_name);
920 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
921
922 if (strcmp(channel_info->session_info->name, condition_session_name)) {
ab0ee2ca
JG
923 goto fail;
924 }
925
e8360425
JD
926 return true;
927fail:
928 return false;
929}
ab0ee2ca 930
e8360425
JD
931static
932bool trigger_applies_to_channel(struct lttng_trigger *trigger,
933 struct channel_info *channel_info)
934{
935 struct lttng_condition *condition;
936 bool trigger_applies;
ab0ee2ca 937
e8360425
JD
938 condition = lttng_trigger_get_condition(trigger);
939 if (!condition) {
ab0ee2ca
JG
940 goto fail;
941 }
e8360425
JD
942
943 switch (lttng_condition_get_type(condition)) {
944 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
945 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
946 trigger_applies = buffer_usage_condition_applies_to_channel(
947 condition, channel_info);
948 break;
949 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
950 trigger_applies = session_consumed_size_condition_applies_to_channel(
951 condition, channel_info);
952 break;
953 default:
ab0ee2ca
JG
954 goto fail;
955 }
956
e8360425 957 return trigger_applies;
ab0ee2ca
JG
958fail:
959 return false;
960}
961
962static
963bool trigger_applies_to_client(struct lttng_trigger *trigger,
964 struct notification_client *client)
965{
966 bool applies = false;
967 struct lttng_condition_list_element *condition_list_element;
968
969 cds_list_for_each_entry(condition_list_element, &client->condition_list,
970 node) {
971 applies = lttng_condition_is_equal(
972 condition_list_element->condition,
973 lttng_trigger_get_condition(trigger));
974 if (applies) {
975 break;
976 }
977 }
978 return applies;
979}
980
8abe313a
JG
981static
982int match_session(struct cds_lfht_node *node, const void *key)
983{
984 const char *name = key;
985 struct session_info *session_info = caa_container_of(
986 node, struct session_info, sessions_ht_node);
987
988 return !strcmp(session_info->name, name);
989}
990
991static
992struct session_info *find_or_create_session_info(
993 struct notification_thread_state *state,
994 const char *name, uid_t uid, gid_t gid)
995{
996 struct session_info *session = NULL;
997 struct cds_lfht_node *node;
998 struct cds_lfht_iter iter;
999
1000 rcu_read_lock();
1001 cds_lfht_lookup(state->sessions_ht,
1002 hash_key_str(name, lttng_ht_seed),
1003 match_session,
1004 name,
1005 &iter);
1006 node = cds_lfht_iter_get_node(&iter);
1007 if (node) {
1008 DBG("[notification-thread] Found session info of session \"%s\" (uid = %i, gid = %i)",
1009 name, uid, gid);
1010 session = caa_container_of(node, struct session_info,
1011 sessions_ht_node);
1012 assert(session->uid == uid);
1013 assert(session->gid == gid);
1014 goto end;
1015 }
1016
1017 session = session_info_create(name, uid, gid);
1018 if (!session) {
1019 ERR("[notification-thread] Failed to allocation session info for session \"%s\" (uid = %i, gid = %i)",
1020 name, uid, gid);
1021 goto end;
1022 }
1023end:
1024 rcu_read_unlock();
1025 return session;
1026}
1027
ab0ee2ca
JG
1028static
1029int handle_notification_thread_command_add_channel(
8abe313a
JG
1030 struct notification_thread_state *state,
1031 const char *session_name, uid_t session_uid, gid_t session_gid,
1032 const char *channel_name, enum lttng_domain_type channel_domain,
1033 uint64_t channel_key_int, uint64_t channel_capacity,
1034 enum lttng_error_code *cmd_result)
ab0ee2ca
JG
1035{
1036 struct cds_list_head trigger_list;
8abe313a
JG
1037 struct channel_info *new_channel_info = NULL;
1038 struct channel_key channel_key = {
1039 .key = channel_key_int,
1040 .domain = channel_domain,
1041 };
ab0ee2ca
JG
1042 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
1043 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1044 int trigger_count = 0;
1045 struct cds_lfht_iter iter;
8abe313a 1046 struct session_info *session_info = NULL;
ab0ee2ca
JG
1047
1048 DBG("[notification-thread] Adding channel %s from session %s, channel key = %" PRIu64 " in %s domain",
8abe313a
JG
1049 channel_name, session_name, channel_key_int,
1050 channel_domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
ab0ee2ca
JG
1051
1052 CDS_INIT_LIST_HEAD(&trigger_list);
1053
8abe313a
JG
1054 session_info = find_or_create_session_info(state, session_name,
1055 session_uid, session_gid);
1056 if (!session_info) {
1057 /* Allocation error or an internal error occured. */
ab0ee2ca
JG
1058 goto error;
1059 }
1060
8abe313a
JG
1061 new_channel_info = channel_info_create(channel_name, &channel_key,
1062 channel_capacity, session_info);
1063 if (!new_channel_info) {
1064 goto error;
1065 }
ab0ee2ca
JG
1066
1067 /* Build a list of all triggers applying to the new channel. */
1068 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1069 node) {
1070 struct lttng_trigger_list_element *new_element;
1071
1072 if (!trigger_applies_to_channel(trigger_ht_element->trigger,
8abe313a 1073 new_channel_info)) {
ab0ee2ca
JG
1074 continue;
1075 }
1076
1077 new_element = zmalloc(sizeof(*new_element));
1078 if (!new_element) {
1079 goto error;
1080 }
1081 CDS_INIT_LIST_HEAD(&new_element->node);
1082 new_element->trigger = trigger_ht_element->trigger;
1083 cds_list_add(&new_element->node, &trigger_list);
1084 trigger_count++;
1085 }
1086
1087 DBG("[notification-thread] Found %i triggers that apply to newly added channel",
1088 trigger_count);
1089 channel_trigger_list = zmalloc(sizeof(*channel_trigger_list));
1090 if (!channel_trigger_list) {
1091 goto error;
1092 }
8abe313a 1093 channel_trigger_list->channel_key = new_channel_info->key;
ab0ee2ca
JG
1094 CDS_INIT_LIST_HEAD(&channel_trigger_list->list);
1095 cds_lfht_node_init(&channel_trigger_list->channel_triggers_ht_node);
1096 cds_list_splice(&trigger_list, &channel_trigger_list->list);
1097
1098 rcu_read_lock();
1099 /* Add channel to the channel_ht which owns the channel_infos. */
1100 cds_lfht_add(state->channels_ht,
8abe313a 1101 hash_channel_key(&new_channel_info->key),
ab0ee2ca
JG
1102 &new_channel_info->channels_ht_node);
1103 /*
1104 * Add the list of triggers associated with this channel to the
1105 * channel_triggers_ht.
1106 */
1107 cds_lfht_add(state->channel_triggers_ht,
8abe313a 1108 hash_channel_key(&new_channel_info->key),
ab0ee2ca
JG
1109 &channel_trigger_list->channel_triggers_ht_node);
1110 rcu_read_unlock();
1111 *cmd_result = LTTNG_OK;
1112 return 0;
1113error:
ab0ee2ca 1114 channel_info_destroy(new_channel_info);
8abe313a 1115 session_info_put(session_info);
ab0ee2ca
JG
1116 return 1;
1117}
1118
1119static
1120int handle_notification_thread_command_remove_channel(
1121 struct notification_thread_state *state,
1122 uint64_t channel_key, enum lttng_domain_type domain,
1123 enum lttng_error_code *cmd_result)
1124{
1125 struct cds_lfht_node *node;
1126 struct cds_lfht_iter iter;
1127 struct lttng_channel_trigger_list *trigger_list;
1128 struct lttng_trigger_list_element *trigger_list_element, *tmp;
1129 struct channel_key key = { .key = channel_key, .domain = domain };
1130 struct channel_info *channel_info;
1131
1132 DBG("[notification-thread] Removing channel key = %" PRIu64 " in %s domain",
1133 channel_key, domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
1134
1135 rcu_read_lock();
1136
1137 cds_lfht_lookup(state->channel_triggers_ht,
1138 hash_channel_key(&key),
1139 match_channel_trigger_list,
1140 &key,
1141 &iter);
1142 node = cds_lfht_iter_get_node(&iter);
1143 /*
1144 * There is a severe internal error if we are being asked to remove a
1145 * channel that doesn't exist.
1146 */
1147 if (!node) {
1148 ERR("[notification-thread] Channel being removed is unknown to the notification thread");
1149 goto end;
1150 }
1151
1152 /* Free the list of triggers associated with this channel. */
1153 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
1154 channel_triggers_ht_node);
1155 cds_list_for_each_entry_safe(trigger_list_element, tmp,
1156 &trigger_list->list, node) {
1157 cds_list_del(&trigger_list_element->node);
1158 free(trigger_list_element);
1159 }
1160 cds_lfht_del(state->channel_triggers_ht, node);
1161 free(trigger_list);
1162
1163 /* Free sampled channel state. */
1164 cds_lfht_lookup(state->channel_state_ht,
1165 hash_channel_key(&key),
1166 match_channel_state_sample,
1167 &key,
1168 &iter);
1169 node = cds_lfht_iter_get_node(&iter);
1170 /*
1171 * This is expected to be NULL if the channel is destroyed before we
1172 * received a sample.
1173 */
1174 if (node) {
1175 struct channel_state_sample *sample = caa_container_of(node,
1176 struct channel_state_sample,
1177 channel_state_ht_node);
1178
1179 cds_lfht_del(state->channel_state_ht, node);
1180 free(sample);
1181 }
1182
1183 /* Remove the channel from the channels_ht and free it. */
1184 cds_lfht_lookup(state->channels_ht,
1185 hash_channel_key(&key),
1186 match_channel_info,
1187 &key,
1188 &iter);
1189 node = cds_lfht_iter_get_node(&iter);
1190 assert(node);
1191 channel_info = caa_container_of(node, struct channel_info,
1192 channels_ht_node);
1193 cds_lfht_del(state->channels_ht, node);
1194 channel_info_destroy(channel_info);
1195end:
1196 rcu_read_unlock();
1197 *cmd_result = LTTNG_OK;
1198 return 0;
1199}
1200
1da26331
JG
1201static
1202int condition_is_supported(struct lttng_condition *condition)
1203{
1204 int ret;
1205
1206 switch (lttng_condition_get_type(condition)) {
1207 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
1208 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
1209 {
1210 enum lttng_domain_type domain;
1211
1212 ret = lttng_condition_buffer_usage_get_domain_type(condition,
1213 &domain);
1214 if (ret) {
1215 ret = -1;
1216 goto end;
1217 }
1218
1219 if (domain != LTTNG_DOMAIN_KERNEL) {
1220 ret = 1;
1221 goto end;
1222 }
1223
1224 /*
1225 * Older kernel tracers don't expose the API to monitor their
1226 * buffers. Therefore, we reject triggers that require that
1227 * mechanism to be available to be evaluated.
1228 */
1229 ret = kernel_supports_ring_buffer_snapshot_sample_positions(
1230 kernel_tracer_fd);
1231 break;
1232 }
1233 default:
1234 ret = 1;
1235 }
1236end:
1237 return ret;
1238}
1239
ab0ee2ca
JG
1240/*
1241 * FIXME A client's credentials are not checked when registering a trigger, nor
1242 * are they stored alongside with the trigger.
1243 *
1da26331 1244 * The effects of this are benign since:
ab0ee2ca
JG
1245 * - The client will succeed in registering the trigger, as it is valid,
1246 * - The trigger will, internally, be bound to the channel,
1247 * - The notifications will not be sent since the client's credentials
1248 * are checked against the channel at that moment.
1da26331
JG
1249 *
1250 * If this function returns a non-zero value, it means something is
50ca7858 1251 * fundamentally broken and the whole subsystem/thread will be torn down.
1da26331
JG
1252 *
1253 * If a non-fatal error occurs, just set the cmd_result to the appropriate
1254 * error code.
ab0ee2ca
JG
1255 */
1256static
1257int handle_notification_thread_command_register_trigger(
8abe313a
JG
1258 struct notification_thread_state *state,
1259 struct lttng_trigger *trigger,
1260 enum lttng_error_code *cmd_result)
ab0ee2ca
JG
1261{
1262 int ret = 0;
1263 struct lttng_condition *condition;
1264 struct notification_client *client;
1265 struct notification_client_list *client_list = NULL;
1266 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1267 struct notification_client_list_element *client_list_element, *tmp;
1268 struct cds_lfht_node *node;
1269 struct cds_lfht_iter iter;
1270 struct channel_info *channel;
1271 bool free_trigger = true;
1272
1273 rcu_read_lock();
1274
1275 condition = lttng_trigger_get_condition(trigger);
1da26331
JG
1276 assert(condition);
1277
1278 ret = condition_is_supported(condition);
1279 if (ret < 0) {
1280 goto error;
1281 } else if (ret == 0) {
1282 *cmd_result = LTTNG_ERR_NOT_SUPPORTED;
1283 goto error;
1284 } else {
1285 /* Feature is supported, continue. */
1286 ret = 0;
1287 }
1288
ab0ee2ca
JG
1289 trigger_ht_element = zmalloc(sizeof(*trigger_ht_element));
1290 if (!trigger_ht_element) {
1291 ret = -1;
1292 goto error;
1293 }
1294
1295 /* Add trigger to the trigger_ht. */
1296 cds_lfht_node_init(&trigger_ht_element->node);
1297 trigger_ht_element->trigger = trigger;
1298
1299 node = cds_lfht_add_unique(state->triggers_ht,
1300 lttng_condition_hash(condition),
1301 match_condition,
1302 condition,
1303 &trigger_ht_element->node);
1304 if (node != &trigger_ht_element->node) {
1305 /* Not a fatal error, simply report it to the client. */
1306 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
1307 goto error_free_ht_element;
1308 }
1309
1310 /*
1311 * Ownership of the trigger and of its wrapper was transfered to
1312 * the triggers_ht.
1313 */
1314 trigger_ht_element = NULL;
1315 free_trigger = false;
1316
1317 /*
1318 * The rest only applies to triggers that have a "notify" action.
1319 * It is not skipped as this is the only action type currently
1320 * supported.
1321 */
1322 client_list = zmalloc(sizeof(*client_list));
1323 if (!client_list) {
1324 ret = -1;
1325 goto error_free_ht_element;
1326 }
1327 cds_lfht_node_init(&client_list->notification_trigger_ht_node);
1328 CDS_INIT_LIST_HEAD(&client_list->list);
1329 client_list->trigger = trigger;
1330
1331 /* Build a list of clients to which this new trigger applies. */
1332 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
1333 client_socket_ht_node) {
1334 if (!trigger_applies_to_client(trigger, client)) {
1335 continue;
1336 }
1337
1338 client_list_element = zmalloc(sizeof(*client_list_element));
1339 if (!client_list_element) {
1340 ret = -1;
1341 goto error_free_client_list;
1342 }
1343 CDS_INIT_LIST_HEAD(&client_list_element->node);
1344 client_list_element->client = client;
1345 cds_list_add(&client_list_element->node, &client_list->list);
1346 }
1347
1348 cds_lfht_add(state->notification_trigger_clients_ht,
1349 lttng_condition_hash(condition),
1350 &client_list->notification_trigger_ht_node);
ab0ee2ca
JG
1351
1352 /*
1353 * Add the trigger to list of triggers bound to the channels currently
1354 * known.
1355 */
1356 cds_lfht_for_each_entry(state->channels_ht, &iter, channel,
1357 channels_ht_node) {
1358 struct lttng_trigger_list_element *trigger_list_element;
1359 struct lttng_channel_trigger_list *trigger_list;
1360
1361 if (!trigger_applies_to_channel(trigger, channel)) {
1362 continue;
1363 }
1364
1365 cds_lfht_lookup(state->channel_triggers_ht,
1366 hash_channel_key(&channel->key),
1367 match_channel_trigger_list,
1368 &channel->key,
1369 &iter);
1370 node = cds_lfht_iter_get_node(&iter);
1371 assert(node);
ab0ee2ca
JG
1372 trigger_list = caa_container_of(node,
1373 struct lttng_channel_trigger_list,
1374 channel_triggers_ht_node);
1375
1376 trigger_list_element = zmalloc(sizeof(*trigger_list_element));
1377 if (!trigger_list_element) {
1378 ret = -1;
1379 goto error_free_client_list;
1380 }
1381 CDS_INIT_LIST_HEAD(&trigger_list_element->node);
1382 trigger_list_element->trigger = trigger;
1383 cds_list_add(&trigger_list_element->node, &trigger_list->list);
ab0ee2ca
JG
1384 }
1385
2ae99f0b
JG
1386 /*
1387 * Since there is nothing preventing clients from subscribing to a
1388 * condition before the corresponding trigger is registered, we have
1389 * to evaluate this new condition right away.
1390 *
1391 * At some point, we were waiting for the next "evaluation" (e.g. on
1392 * reception of a channel sample) to evaluate this new condition, but
1393 * that was broken.
1394 *
1395 * The reason it was broken is that waiting for the next sample
1396 * does not allow us to properly handle transitions for edge-triggered
1397 * conditions.
1398 *
1399 * Consider this example: when we handle a new channel sample, we
1400 * evaluate each conditions twice: once with the previous state, and
1401 * again with the newest state. We then use those two results to
1402 * determine whether a state change happened: a condition was false and
1403 * became true. If a state change happened, we have to notify clients.
1404 *
1405 * Now, if a client subscribes to a given notification and registers
1406 * a trigger *after* that subscription, we have to make sure the
1407 * condition is evaluated at this point while considering only the
1408 * current state. Otherwise, the next evaluation cycle may only see
1409 * that the evaluations remain the same (true for samples n-1 and n) and
1410 * the client will never know that the condition has been met.
1411 */
1412 cds_list_for_each_entry_safe(client_list_element, tmp,
1413 &client_list->list, node) {
1414 ret = evaluate_condition_for_client(trigger, condition,
1415 client_list_element->client, state);
1416 if (ret) {
1417 goto error_free_client_list;
1418 }
1419 }
1420
1421 /*
1422 * Client list ownership transferred to the
1423 * notification_trigger_clients_ht.
1424 */
1425 client_list = NULL;
1426
ab0ee2ca
JG
1427 *cmd_result = LTTNG_OK;
1428error_free_client_list:
1429 if (client_list) {
1430 cds_list_for_each_entry_safe(client_list_element, tmp,
1431 &client_list->list, node) {
1432 free(client_list_element);
1433 }
1434 free(client_list);
1435 }
1436error_free_ht_element:
1437 free(trigger_ht_element);
1438error:
1439 if (free_trigger) {
1440 struct lttng_action *action = lttng_trigger_get_action(trigger);
1441
1442 lttng_condition_destroy(condition);
1443 lttng_action_destroy(action);
1444 lttng_trigger_destroy(trigger);
1445 }
1446 rcu_read_unlock();
1447 return ret;
1448}
1449
cc2295b5 1450static
ab0ee2ca
JG
1451int handle_notification_thread_command_unregister_trigger(
1452 struct notification_thread_state *state,
1453 struct lttng_trigger *trigger,
1454 enum lttng_error_code *_cmd_reply)
1455{
1456 struct cds_lfht_iter iter;
1457 struct cds_lfht_node *node, *triggers_ht_node;
1458 struct lttng_channel_trigger_list *trigger_list;
1459 struct notification_client_list *client_list;
1460 struct notification_client_list_element *client_list_element, *tmp;
1461 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1462 struct lttng_condition *condition = lttng_trigger_get_condition(
1463 trigger);
1464 struct lttng_action *action;
1465 enum lttng_error_code cmd_reply;
1466
1467 rcu_read_lock();
1468
1469 cds_lfht_lookup(state->triggers_ht,
1470 lttng_condition_hash(condition),
1471 match_condition,
1472 condition,
1473 &iter);
1474 triggers_ht_node = cds_lfht_iter_get_node(&iter);
1475 if (!triggers_ht_node) {
1476 cmd_reply = LTTNG_ERR_TRIGGER_NOT_FOUND;
1477 goto end;
1478 } else {
1479 cmd_reply = LTTNG_OK;
1480 }
1481
1482 /* Remove trigger from channel_triggers_ht. */
1483 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter, trigger_list,
1484 channel_triggers_ht_node) {
1485 struct lttng_trigger_list_element *trigger_element, *tmp;
1486
1487 cds_list_for_each_entry_safe(trigger_element, tmp,
1488 &trigger_list->list, node) {
1489 struct lttng_condition *current_condition =
1490 lttng_trigger_get_condition(
1491 trigger_element->trigger);
1492
1493 assert(current_condition);
1494 if (!lttng_condition_is_equal(condition,
1495 current_condition)) {
1496 continue;
1497 }
1498
1499 DBG("[notification-thread] Removed trigger from channel_triggers_ht");
1500 cds_list_del(&trigger_element->node);
e4db5ace
JR
1501 /* A trigger can only appear once per channel */
1502 break;
ab0ee2ca
JG
1503 }
1504 }
1505
1506 /*
1507 * Remove and release the client list from
1508 * notification_trigger_clients_ht.
1509 */
1510 cds_lfht_lookup(state->notification_trigger_clients_ht,
1511 lttng_condition_hash(condition),
1512 match_client_list,
1513 trigger,
1514 &iter);
1515 node = cds_lfht_iter_get_node(&iter);
1516 assert(node);
1517 client_list = caa_container_of(node, struct notification_client_list,
1518 notification_trigger_ht_node);
1519 cds_list_for_each_entry_safe(client_list_element, tmp,
1520 &client_list->list, node) {
1521 free(client_list_element);
1522 }
1523 cds_lfht_del(state->notification_trigger_clients_ht, node);
1524 free(client_list);
1525
1526 /* Remove trigger from triggers_ht. */
1527 trigger_ht_element = caa_container_of(triggers_ht_node,
1528 struct lttng_trigger_ht_element, node);
1529 cds_lfht_del(state->triggers_ht, triggers_ht_node);
1530
1531 condition = lttng_trigger_get_condition(trigger_ht_element->trigger);
1532 lttng_condition_destroy(condition);
1533 action = lttng_trigger_get_action(trigger_ht_element->trigger);
1534 lttng_action_destroy(action);
1535 lttng_trigger_destroy(trigger_ht_element->trigger);
1536 free(trigger_ht_element);
1537end:
1538 rcu_read_unlock();
1539 if (_cmd_reply) {
1540 *_cmd_reply = cmd_reply;
1541 }
1542 return 0;
1543}
1544
1545/* Returns 0 on success, 1 on exit requested, negative value on error. */
1546int handle_notification_thread_command(
1547 struct notification_thread_handle *handle,
1548 struct notification_thread_state *state)
1549{
1550 int ret;
1551 uint64_t counter;
1552 struct notification_thread_command *cmd;
1553
8abe313a
JG
1554 /* Read the event pipe to put it back into a quiescent state. */
1555 ret = read(lttng_pipe_get_readfd(handle->cmd_queue.event_pipe), &counter,
1556 sizeof(counter));
ab0ee2ca
JG
1557 if (ret == -1) {
1558 goto error;
1559 }
1560
1561 pthread_mutex_lock(&handle->cmd_queue.lock);
1562 cmd = cds_list_first_entry(&handle->cmd_queue.list,
1563 struct notification_thread_command, cmd_list_node);
1564 switch (cmd->type) {
1565 case NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER:
1566 DBG("[notification-thread] Received register trigger command");
1567 ret = handle_notification_thread_command_register_trigger(
1568 state, cmd->parameters.trigger,
1569 &cmd->reply_code);
1570 break;
1571 case NOTIFICATION_COMMAND_TYPE_UNREGISTER_TRIGGER:
1572 DBG("[notification-thread] Received unregister trigger command");
1573 ret = handle_notification_thread_command_unregister_trigger(
1574 state, cmd->parameters.trigger,
1575 &cmd->reply_code);
1576 break;
1577 case NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL:
1578 DBG("[notification-thread] Received add channel command");
1579 ret = handle_notification_thread_command_add_channel(
8abe313a
JG
1580 state,
1581 cmd->parameters.add_channel.session.name,
1582 cmd->parameters.add_channel.session.uid,
1583 cmd->parameters.add_channel.session.gid,
1584 cmd->parameters.add_channel.channel.name,
1585 cmd->parameters.add_channel.channel.domain,
1586 cmd->parameters.add_channel.channel.key,
1587 cmd->parameters.add_channel.channel.capacity,
ab0ee2ca
JG
1588 &cmd->reply_code);
1589 break;
1590 case NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL:
1591 DBG("[notification-thread] Received remove channel command");
1592 ret = handle_notification_thread_command_remove_channel(
1593 state, cmd->parameters.remove_channel.key,
1594 cmd->parameters.remove_channel.domain,
1595 &cmd->reply_code);
1596 break;
1597 case NOTIFICATION_COMMAND_TYPE_QUIT:
1598 DBG("[notification-thread] Received quit command");
1599 cmd->reply_code = LTTNG_OK;
1600 ret = 1;
1601 goto end;
1602 default:
1603 ERR("[notification-thread] Unknown internal command received");
1604 goto error_unlock;
1605 }
1606
1607 if (ret) {
1608 goto error_unlock;
1609 }
1610end:
1611 cds_list_del(&cmd->cmd_list_node);
8ada111f 1612 lttng_waiter_wake_up(&cmd->reply_waiter);
ab0ee2ca
JG
1613 pthread_mutex_unlock(&handle->cmd_queue.lock);
1614 return ret;
1615error_unlock:
1616 /* Wake-up and return a fatal error to the calling thread. */
8ada111f 1617 lttng_waiter_wake_up(&cmd->reply_waiter);
ab0ee2ca
JG
1618 pthread_mutex_unlock(&handle->cmd_queue.lock);
1619 cmd->reply_code = LTTNG_ERR_FATAL;
1620error:
1621 /* Indicate a fatal error to the caller. */
1622 return -1;
1623}
1624
1625static
1626unsigned long hash_client_socket(int socket)
1627{
1628 return hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed);
1629}
1630
1631static
1632int socket_set_non_blocking(int socket)
1633{
1634 int ret, flags;
1635
1636 /* Set the pipe as non-blocking. */
1637 ret = fcntl(socket, F_GETFL, 0);
1638 if (ret == -1) {
1639 PERROR("fcntl get socket flags");
1640 goto end;
1641 }
1642 flags = ret;
1643
1644 ret = fcntl(socket, F_SETFL, flags | O_NONBLOCK);
1645 if (ret == -1) {
1646 PERROR("fcntl set O_NONBLOCK socket flag");
1647 goto end;
1648 }
1649 DBG("Client socket (fd = %i) set as non-blocking", socket);
1650end:
1651 return ret;
1652}
1653
1654static
14fa22f8 1655int client_reset_inbound_state(struct notification_client *client)
ab0ee2ca
JG
1656{
1657 int ret;
1658
1659 ret = lttng_dynamic_buffer_set_size(
1660 &client->communication.inbound.buffer, 0);
1661 assert(!ret);
1662
1663 client->communication.inbound.bytes_to_receive =
1664 sizeof(struct lttng_notification_channel_message);
1665 client->communication.inbound.msg_type =
1666 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN;
ab0ee2ca
JG
1667 LTTNG_SOCK_SET_UID_CRED(&client->communication.inbound.creds, -1);
1668 LTTNG_SOCK_SET_GID_CRED(&client->communication.inbound.creds, -1);
14fa22f8
JG
1669 ret = lttng_dynamic_buffer_set_size(
1670 &client->communication.inbound.buffer,
1671 client->communication.inbound.bytes_to_receive);
1672 return ret;
ab0ee2ca
JG
1673}
1674
1675int handle_notification_thread_client_connect(
1676 struct notification_thread_state *state)
1677{
1678 int ret;
1679 struct notification_client *client;
1680
1681 DBG("[notification-thread] Handling new notification channel client connection");
1682
1683 client = zmalloc(sizeof(*client));
1684 if (!client) {
1685 /* Fatal error. */
1686 ret = -1;
1687 goto error;
1688 }
1689 CDS_INIT_LIST_HEAD(&client->condition_list);
1690 lttng_dynamic_buffer_init(&client->communication.inbound.buffer);
1691 lttng_dynamic_buffer_init(&client->communication.outbound.buffer);
01ea340e 1692 client->communication.inbound.expect_creds = true;
14fa22f8
JG
1693 ret = client_reset_inbound_state(client);
1694 if (ret) {
1695 ERR("[notification-thread] Failed to reset client communication's inbound state");
1696 ret = 0;
1697 goto error;
1698 }
ab0ee2ca
JG
1699
1700 ret = lttcomm_accept_unix_sock(state->notification_channel_socket);
1701 if (ret < 0) {
1702 ERR("[notification-thread] Failed to accept new notification channel client connection");
1703 ret = 0;
1704 goto error;
1705 }
1706
1707 client->socket = ret;
1708
1709 ret = socket_set_non_blocking(client->socket);
1710 if (ret) {
1711 ERR("[notification-thread] Failed to set new notification channel client connection socket as non-blocking");
1712 goto error;
1713 }
1714
1715 ret = lttcomm_setsockopt_creds_unix_sock(client->socket);
1716 if (ret < 0) {
1717 ERR("[notification-thread] Failed to set socket options on new notification channel client socket");
1718 ret = 0;
1719 goto error;
1720 }
1721
1722 ret = lttng_poll_add(&state->events, client->socket,
1723 LPOLLIN | LPOLLERR |
1724 LPOLLHUP | LPOLLRDHUP);
1725 if (ret < 0) {
1726 ERR("[notification-thread] Failed to add notification channel client socket to poll set");
1727 ret = 0;
1728 goto error;
1729 }
1730 DBG("[notification-thread] Added new notification channel client socket (%i) to poll set",
1731 client->socket);
1732
ab0ee2ca
JG
1733 rcu_read_lock();
1734 cds_lfht_add(state->client_socket_ht,
1735 hash_client_socket(client->socket),
1736 &client->client_socket_ht_node);
1737 rcu_read_unlock();
1738
1739 return ret;
1740error:
1741 notification_client_destroy(client, state);
1742 return ret;
1743}
1744
1745int handle_notification_thread_client_disconnect(
1746 int client_socket,
1747 struct notification_thread_state *state)
1748{
1749 int ret = 0;
1750 struct notification_client *client;
1751
1752 rcu_read_lock();
1753 DBG("[notification-thread] Closing client connection (socket fd = %i)",
1754 client_socket);
1755 client = get_client_from_socket(client_socket, state);
1756 if (!client) {
1757 /* Internal state corruption, fatal error. */
1758 ERR("[notification-thread] Unable to find client (socket fd = %i)",
1759 client_socket);
1760 ret = -1;
1761 goto end;
1762 }
1763
1764 ret = lttng_poll_del(&state->events, client_socket);
1765 if (ret) {
1766 ERR("[notification-thread] Failed to remove client socket from poll set");
1767 }
1768 cds_lfht_del(state->client_socket_ht,
1769 &client->client_socket_ht_node);
1770 notification_client_destroy(client, state);
1771end:
1772 rcu_read_unlock();
1773 return ret;
1774}
1775
1776int handle_notification_thread_client_disconnect_all(
1777 struct notification_thread_state *state)
1778{
1779 struct cds_lfht_iter iter;
1780 struct notification_client *client;
1781 bool error_encoutered = false;
1782
1783 rcu_read_lock();
1784 DBG("[notification-thread] Closing all client connections");
1785 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
1786 client_socket_ht_node) {
1787 int ret;
1788
1789 ret = handle_notification_thread_client_disconnect(
1790 client->socket, state);
1791 if (ret) {
1792 error_encoutered = true;
1793 }
1794 }
1795 rcu_read_unlock();
1796 return error_encoutered ? 1 : 0;
1797}
1798
1799int handle_notification_thread_trigger_unregister_all(
1800 struct notification_thread_state *state)
1801{
4149ace8 1802 bool error_occurred = false;
ab0ee2ca
JG
1803 struct cds_lfht_iter iter;
1804 struct lttng_trigger_ht_element *trigger_ht_element;
1805
1806 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1807 node) {
1808 int ret = handle_notification_thread_command_unregister_trigger(
1809 state, trigger_ht_element->trigger, NULL);
1810 if (ret) {
4149ace8 1811 error_occurred = true;
ab0ee2ca
JG
1812 }
1813 }
4149ace8 1814 return error_occurred ? -1 : 0;
ab0ee2ca
JG
1815}
1816
1817static
1818int client_flush_outgoing_queue(struct notification_client *client,
1819 struct notification_thread_state *state)
1820{
1821 ssize_t ret;
1822 size_t to_send_count;
1823
1824 assert(client->communication.outbound.buffer.size != 0);
1825 to_send_count = client->communication.outbound.buffer.size;
1826 DBG("[notification-thread] Flushing client (socket fd = %i) outgoing queue",
1827 client->socket);
1828
1829 ret = lttcomm_send_unix_sock_non_block(client->socket,
1830 client->communication.outbound.buffer.data,
1831 to_send_count);
1832 if ((ret < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) ||
1833 (ret > 0 && ret < to_send_count)) {
1834 DBG("[notification-thread] Client (socket fd = %i) outgoing queue could not be completely flushed",
1835 client->socket);
1836 to_send_count -= max(ret, 0);
1837
1838 memcpy(client->communication.outbound.buffer.data,
1839 client->communication.outbound.buffer.data +
1840 client->communication.outbound.buffer.size - to_send_count,
1841 to_send_count);
1842 ret = lttng_dynamic_buffer_set_size(
1843 &client->communication.outbound.buffer,
1844 to_send_count);
1845 if (ret) {
1846 goto error;
1847 }
1848
1849 /*
1850 * We want to be notified whenever there is buffer space
1851 * available to send the rest of the payload.
1852 */
1853 ret = lttng_poll_mod(&state->events, client->socket,
1854 CLIENT_POLL_MASK_IN_OUT);
1855 if (ret) {
1856 goto error;
1857 }
1858 } else if (ret < 0) {
1859 /* Generic error, disconnect the client. */
1860 ERR("[notification-thread] Failed to send flush outgoing queue, disconnecting client (socket fd = %i)",
1861 client->socket);
1862 ret = handle_notification_thread_client_disconnect(
1863 client->socket, state);
1864 if (ret) {
1865 goto error;
1866 }
1867 } else {
1868 /* No error and flushed the queue completely. */
1869 ret = lttng_dynamic_buffer_set_size(
1870 &client->communication.outbound.buffer, 0);
1871 if (ret) {
1872 goto error;
1873 }
1874 ret = lttng_poll_mod(&state->events, client->socket,
1875 CLIENT_POLL_MASK_IN);
1876 if (ret) {
1877 goto error;
1878 }
1879
1880 client->communication.outbound.queued_command_reply = false;
1881 client->communication.outbound.dropped_notification = false;
1882 }
1883
1884 return 0;
1885error:
1886 return -1;
1887}
1888
1889static
1890int client_send_command_reply(struct notification_client *client,
1891 struct notification_thread_state *state,
1892 enum lttng_notification_channel_status status)
1893{
1894 int ret;
1895 struct lttng_notification_channel_command_reply reply = {
1896 .status = (int8_t) status,
1897 };
1898 struct lttng_notification_channel_message msg = {
1899 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY,
1900 .size = sizeof(reply),
1901 };
1902 char buffer[sizeof(msg) + sizeof(reply)];
1903
1904 if (client->communication.outbound.queued_command_reply) {
1905 /* Protocol error. */
1906 goto error;
1907 }
1908
1909 memcpy(buffer, &msg, sizeof(msg));
1910 memcpy(buffer + sizeof(msg), &reply, sizeof(reply));
1911 DBG("[notification-thread] Send command reply (%i)", (int) status);
1912
1913 /* Enqueue buffer to outgoing queue and flush it. */
1914 ret = lttng_dynamic_buffer_append(
1915 &client->communication.outbound.buffer,
1916 buffer, sizeof(buffer));
1917 if (ret) {
1918 goto error;
1919 }
1920
1921 ret = client_flush_outgoing_queue(client, state);
1922 if (ret) {
1923 goto error;
1924 }
1925
1926 if (client->communication.outbound.buffer.size != 0) {
1927 /* Queue could not be emptied. */
1928 client->communication.outbound.queued_command_reply = true;
1929 }
1930
1931 return 0;
1932error:
1933 return -1;
1934}
1935
1936static
1937int client_dispatch_message(struct notification_client *client,
1938 struct notification_thread_state *state)
1939{
1940 int ret = 0;
1941
1942 if (client->communication.inbound.msg_type !=
1943 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE &&
1944 client->communication.inbound.msg_type !=
1945 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN &&
1946 !client->validated) {
1947 WARN("[notification-thread] client attempted a command before handshake");
1948 ret = -1;
1949 goto end;
1950 }
1951
1952 switch (client->communication.inbound.msg_type) {
1953 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN:
1954 {
1955 /*
1956 * Receiving message header. The function will be called again
1957 * once the rest of the message as been received and can be
1958 * interpreted.
1959 */
1960 const struct lttng_notification_channel_message *msg;
1961
1962 assert(sizeof(*msg) ==
1963 client->communication.inbound.buffer.size);
1964 msg = (const struct lttng_notification_channel_message *)
1965 client->communication.inbound.buffer.data;
1966
1967 if (msg->size == 0 || msg->size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) {
1968 ERR("[notification-thread] Invalid notification channel message: length = %u", msg->size);
1969 ret = -1;
1970 goto end;
1971 }
1972
1973 switch (msg->type) {
1974 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
1975 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
1976 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
1977 break;
1978 default:
1979 ret = -1;
1980 ERR("[notification-thread] Invalid notification channel message: unexpected message type");
1981 goto end;
1982 }
1983
1984 client->communication.inbound.bytes_to_receive = msg->size;
1985 client->communication.inbound.msg_type =
1986 (enum lttng_notification_channel_message_type) msg->type;
ab0ee2ca 1987 ret = lttng_dynamic_buffer_set_size(
14fa22f8 1988 &client->communication.inbound.buffer, msg->size);
ab0ee2ca
JG
1989 if (ret) {
1990 goto end;
1991 }
1992 break;
1993 }
1994 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
1995 {
1996 struct lttng_notification_channel_command_handshake *handshake_client;
1997 struct lttng_notification_channel_command_handshake handshake_reply = {
1998 .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
1999 .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
2000 };
2001 struct lttng_notification_channel_message msg_header = {
2002 .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE,
2003 .size = sizeof(handshake_reply),
2004 };
2005 enum lttng_notification_channel_status status =
2006 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
2007 char send_buffer[sizeof(msg_header) + sizeof(handshake_reply)];
2008
2009 memcpy(send_buffer, &msg_header, sizeof(msg_header));
2010 memcpy(send_buffer + sizeof(msg_header), &handshake_reply,
2011 sizeof(handshake_reply));
2012
2013 handshake_client =
2014 (struct lttng_notification_channel_command_handshake *)
2015 client->communication.inbound.buffer.data;
47a32869 2016 client->major = handshake_client->major;
ab0ee2ca
JG
2017 client->minor = handshake_client->minor;
2018 if (!client->communication.inbound.creds_received) {
2019 ERR("[notification-thread] No credentials received from client");
2020 ret = -1;
2021 goto end;
2022 }
2023
2024 client->uid = LTTNG_SOCK_GET_UID_CRED(
2025 &client->communication.inbound.creds);
2026 client->gid = LTTNG_SOCK_GET_GID_CRED(
2027 &client->communication.inbound.creds);
2028 DBG("[notification-thread] Received handshake from client (uid = %u, gid = %u) with version %i.%i",
2029 client->uid, client->gid, (int) client->major,
2030 (int) client->minor);
2031
2032 if (handshake_client->major != LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) {
2033 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNSUPPORTED_VERSION;
2034 }
2035
2036 ret = lttng_dynamic_buffer_append(&client->communication.outbound.buffer,
2037 send_buffer, sizeof(send_buffer));
2038 if (ret) {
2039 ERR("[notification-thread] Failed to send protocol version to notification channel client");
2040 goto end;
2041 }
2042
2043 ret = client_flush_outgoing_queue(client, state);
2044 if (ret) {
2045 goto end;
2046 }
2047
2048 ret = client_send_command_reply(client, state, status);
2049 if (ret) {
2050 ERR("[notification-thread] Failed to send reply to notification channel client");
2051 goto end;
2052 }
2053
2054 /* Set reception state to receive the next message header. */
14fa22f8
JG
2055 ret = client_reset_inbound_state(client);
2056 if (ret) {
2057 ERR("[notification-thread] Failed to reset client communication's inbound state");
2058 goto end;
2059 }
ab0ee2ca
JG
2060 client->validated = true;
2061 break;
2062 }
2063 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
2064 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
2065 {
2066 struct lttng_condition *condition;
2067 enum lttng_notification_channel_status status =
2068 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
2069 const struct lttng_buffer_view condition_view =
2070 lttng_buffer_view_from_dynamic_buffer(
2071 &client->communication.inbound.buffer,
2072 0, -1);
2073 size_t expected_condition_size =
2074 client->communication.inbound.buffer.size;
2075
2076 ret = lttng_condition_create_from_buffer(&condition_view,
2077 &condition);
2078 if (ret != expected_condition_size) {
2079 ERR("[notification-thread] Malformed condition received from client");
2080 goto end;
2081 }
2082
2083 if (client->communication.inbound.msg_type ==
2084 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE) {
ab0ee2ca
JG
2085 ret = notification_thread_client_subscribe(client,
2086 condition, state, &status);
2087 } else {
2088 ret = notification_thread_client_unsubscribe(client,
2089 condition, state, &status);
2090 }
2091 if (ret) {
2092 goto end;
2093 }
2094
2095 ret = client_send_command_reply(client, state, status);
2096 if (ret) {
2097 ERR("[notification-thread] Failed to send reply to notification channel client");
2098 goto end;
2099 }
2100
2101 /* Set reception state to receive the next message header. */
14fa22f8
JG
2102 ret = client_reset_inbound_state(client);
2103 if (ret) {
2104 ERR("[notification-thread] Failed to reset client communication's inbound state");
2105 goto end;
2106 }
ab0ee2ca
JG
2107 break;
2108 }
2109 default:
2110 abort();
2111 }
2112end:
2113 return ret;
2114}
2115
2116/* Incoming data from client. */
2117int handle_notification_thread_client_in(
2118 struct notification_thread_state *state, int socket)
2119{
14fa22f8 2120 int ret = 0;
ab0ee2ca
JG
2121 struct notification_client *client;
2122 ssize_t recv_ret;
2123 size_t offset;
2124
2125 client = get_client_from_socket(socket, state);
2126 if (!client) {
2127 /* Internal error, abort. */
2128 ret = -1;
2129 goto end;
2130 }
2131
14fa22f8
JG
2132 offset = client->communication.inbound.buffer.size -
2133 client->communication.inbound.bytes_to_receive;
01ea340e 2134 if (client->communication.inbound.expect_creds) {
ab0ee2ca
JG
2135 recv_ret = lttcomm_recv_creds_unix_sock(socket,
2136 client->communication.inbound.buffer.data + offset,
2137 client->communication.inbound.bytes_to_receive,
2138 &client->communication.inbound.creds);
2139 if (recv_ret > 0) {
01ea340e 2140 client->communication.inbound.expect_creds = false;
ab0ee2ca
JG
2141 client->communication.inbound.creds_received = true;
2142 }
2143 } else {
2144 recv_ret = lttcomm_recv_unix_sock_non_block(socket,
2145 client->communication.inbound.buffer.data + offset,
2146 client->communication.inbound.bytes_to_receive);
2147 }
2148 if (recv_ret < 0) {
2149 goto error_disconnect_client;
2150 }
2151
2152 client->communication.inbound.bytes_to_receive -= recv_ret;
ab0ee2ca
JG
2153 if (client->communication.inbound.bytes_to_receive == 0) {
2154 ret = client_dispatch_message(client, state);
2155 if (ret) {
2156 /*
2157 * Only returns an error if this client must be
2158 * disconnected.
2159 */
2160 goto error_disconnect_client;
2161 }
2162 } else {
2163 goto end;
2164 }
2165end:
2166 return ret;
2167error_disconnect_client:
2168 ret = handle_notification_thread_client_disconnect(socket, state);
2169 return ret;
2170}
2171
2172/* Client ready to receive outgoing data. */
2173int handle_notification_thread_client_out(
2174 struct notification_thread_state *state, int socket)
2175{
2176 int ret;
2177 struct notification_client *client;
2178
2179 client = get_client_from_socket(socket, state);
2180 if (!client) {
2181 /* Internal error, abort. */
2182 ret = -1;
2183 goto end;
2184 }
2185
2186 ret = client_flush_outgoing_queue(client, state);
2187 if (ret) {
2188 goto end;
2189 }
2190end:
2191 return ret;
2192}
2193
2194static
e8360425
JD
2195bool evaluate_buffer_usage_condition(const struct lttng_condition *condition,
2196 const struct channel_state_sample *sample,
2197 uint64_t buffer_capacity)
ab0ee2ca
JG
2198{
2199 bool result = false;
2200 uint64_t threshold;
2201 enum lttng_condition_type condition_type;
e8360425 2202 const struct lttng_condition_buffer_usage *use_condition = container_of(
ab0ee2ca
JG
2203 condition, struct lttng_condition_buffer_usage,
2204 parent);
2205
ab0ee2ca
JG
2206 if (use_condition->threshold_bytes.set) {
2207 threshold = use_condition->threshold_bytes.value;
2208 } else {
2209 /*
2210 * Threshold was expressed as a ratio.
2211 *
2212 * TODO the threshold (in bytes) of conditions expressed
2213 * as a ratio of total buffer size could be cached to
2214 * forego this double-multiplication or it could be performed
2215 * as fixed-point math.
2216 *
2217 * Note that caching should accomodate the case where the
2218 * condition applies to multiple channels (i.e. don't assume
2219 * that all channels matching my_chann* have the same size...)
2220 */
2221 threshold = (uint64_t) (use_condition->threshold_ratio.value *
2222 (double) buffer_capacity);
2223 }
2224
2225 condition_type = lttng_condition_get_type(condition);
2226 if (condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW) {
2227 DBG("[notification-thread] Low buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
2228 threshold, sample->highest_usage);
2229
2230 /*
2231 * The low condition should only be triggered once _all_ of the
2232 * streams in a channel have gone below the "low" threshold.
2233 */
2234 if (sample->highest_usage <= threshold) {
2235 result = true;
2236 }
2237 } else {
2238 DBG("[notification-thread] High buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
2239 threshold, sample->highest_usage);
2240
2241 /*
2242 * For high buffer usage scenarios, we want to trigger whenever
2243 * _any_ of the streams has reached the "high" threshold.
2244 */
2245 if (sample->highest_usage >= threshold) {
2246 result = true;
2247 }
2248 }
e8360425 2249
ab0ee2ca
JG
2250 return result;
2251}
2252
2253static
e8360425
JD
2254bool evaluate_session_consumed_size_condition(
2255 const struct lttng_condition *condition,
2256 uint64_t session_consumed_size)
2257{
2258 uint64_t threshold;
2259 const struct lttng_condition_session_consumed_size *size_condition =
2260 container_of(condition,
2261 struct lttng_condition_session_consumed_size,
2262 parent);
2263
2264 threshold = size_condition->consumed_threshold_bytes.value;
2265 DBG("[notification-thread] Session consumed size condition being evaluated: threshold = %" PRIu64 ", current size = %" PRIu64,
2266 threshold, session_consumed_size);
2267 return session_consumed_size >= threshold;
2268}
2269
2270static
2271int evaluate_condition(const struct lttng_condition *condition,
ab0ee2ca 2272 struct lttng_evaluation **evaluation,
e8360425
JD
2273 const struct notification_thread_state *state,
2274 const struct channel_state_sample *previous_sample,
2275 const struct channel_state_sample *latest_sample,
2276 uint64_t previous_session_consumed_total,
2277 uint64_t latest_session_consumed_total,
2278 struct channel_info *channel_info)
ab0ee2ca
JG
2279{
2280 int ret = 0;
2281 enum lttng_condition_type condition_type;
e8360425
JD
2282 const bool previous_sample_available = !!previous_sample;
2283 bool previous_sample_result = false;
ab0ee2ca
JG
2284 bool latest_sample_result;
2285
2286 condition_type = lttng_condition_get_type(condition);
ab0ee2ca 2287
e8360425
JD
2288 switch (condition_type) {
2289 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
2290 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
2291 if (caa_likely(previous_sample_available)) {
2292 previous_sample_result =
2293 evaluate_buffer_usage_condition(condition,
2294 previous_sample, channel_info->capacity);
2295 }
2296 latest_sample_result = evaluate_buffer_usage_condition(
2297 condition, latest_sample,
2298 channel_info->capacity);
2299 break;
2300 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
2301 if (caa_likely(previous_sample_available)) {
2302 previous_sample_result =
2303 evaluate_session_consumed_size_condition(
2304 condition,
2305 previous_session_consumed_total);
2306 }
2307 latest_sample_result =
2308 evaluate_session_consumed_size_condition(
2309 condition,
2310 latest_session_consumed_total);
2311 break;
2312 default:
2313 /* Unknown condition type; internal error. */
2314 abort();
2315 }
ab0ee2ca
JG
2316
2317 if (!latest_sample_result ||
2318 (previous_sample_result == latest_sample_result)) {
2319 /*
2320 * Only trigger on a condition evaluation transition.
2321 *
2322 * NOTE: This edge-triggered logic may not be appropriate for
2323 * future condition types.
2324 */
2325 goto end;
2326 }
2327
e8360425
JD
2328 if (!evaluation || !latest_sample_result) {
2329 goto end;
2330 }
2331
2332 switch (condition_type) {
2333 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
2334 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
ab0ee2ca
JG
2335 *evaluation = lttng_evaluation_buffer_usage_create(
2336 condition_type,
2337 latest_sample->highest_usage,
e8360425
JD
2338 channel_info->capacity);
2339 break;
2340 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
2341 *evaluation = lttng_evaluation_session_consumed_size_create(
2342 condition_type,
2343 latest_session_consumed_total);
2344 break;
2345 default:
2346 abort();
2347 }
2348
2349 if (!*evaluation) {
2350 ret = -1;
2351 goto end;
ab0ee2ca
JG
2352 }
2353end:
2354 return ret;
2355}
2356
2357static
2358int client_enqueue_dropped_notification(struct notification_client *client,
2359 struct notification_thread_state *state)
2360{
2361 int ret;
2362 struct lttng_notification_channel_message msg = {
2363 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED,
2364 .size = 0,
2365 };
2366
2367 ret = lttng_dynamic_buffer_append(
2368 &client->communication.outbound.buffer, &msg,
2369 sizeof(msg));
2370 return ret;
2371}
2372
2373static
2374int send_evaluation_to_clients(struct lttng_trigger *trigger,
2375 struct lttng_evaluation *evaluation,
2376 struct notification_client_list* client_list,
2377 struct notification_thread_state *state,
2378 uid_t channel_uid, gid_t channel_gid)
2379{
2380 int ret = 0;
2381 struct lttng_dynamic_buffer msg_buffer;
2382 struct notification_client_list_element *client_list_element, *tmp;
2383 struct lttng_notification *notification;
2384 struct lttng_condition *condition;
2385 ssize_t expected_notification_size, notification_size;
2386 struct lttng_notification_channel_message msg;
2387
2388 lttng_dynamic_buffer_init(&msg_buffer);
2389
2390 condition = lttng_trigger_get_condition(trigger);
2391 assert(condition);
2392
2393 notification = lttng_notification_create(condition, evaluation);
2394 if (!notification) {
2395 ret = -1;
2396 goto end;
2397 }
2398
2399 expected_notification_size = lttng_notification_serialize(notification,
2400 NULL);
2401 if (expected_notification_size < 0) {
2402 ERR("[notification-thread] Failed to get size of serialized notification");
2403 ret = -1;
2404 goto end;
2405 }
2406
2407 msg.type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION;
2408 msg.size = (uint32_t) expected_notification_size;
2409 ret = lttng_dynamic_buffer_append(&msg_buffer, &msg, sizeof(msg));
2410 if (ret) {
2411 goto end;
2412 }
2413
2414 ret = lttng_dynamic_buffer_set_size(&msg_buffer,
2415 msg_buffer.size + expected_notification_size);
2416 if (ret) {
2417 goto end;
2418 }
2419
2420 notification_size = lttng_notification_serialize(notification,
2421 msg_buffer.data + sizeof(msg));
2422 if (notification_size != expected_notification_size) {
2423 ERR("[notification-thread] Failed to serialize notification");
2424 ret = -1;
2425 goto end;
2426 }
2427
2428 cds_list_for_each_entry_safe(client_list_element, tmp,
2429 &client_list->list, node) {
2430 struct notification_client *client =
2431 client_list_element->client;
2432
2433 if (client->uid != channel_uid && client->gid != channel_gid &&
2434 client->uid != 0) {
2435 /* Client is not allowed to monitor this channel. */
2436 DBG("[notification-thread] Skipping client at it does not have the permission to receive notification for this channel");
2437 continue;
2438 }
2439
2440 DBG("[notification-thread] Sending notification to client (fd = %i, %zu bytes)",
2441 client->socket, msg_buffer.size);
2442 if (client->communication.outbound.buffer.size) {
2443 /*
2444 * Outgoing data is already buffered for this client;
2445 * drop the notification and enqueue a "dropped
2446 * notification" message if this is the first dropped
2447 * notification since the socket spilled-over to the
2448 * queue.
2449 */
2450 DBG("[notification-thread] Dropping notification addressed to client (socket fd = %i)",
2451 client->socket);
2452 if (!client->communication.outbound.dropped_notification) {
2453 client->communication.outbound.dropped_notification = true;
2454 ret = client_enqueue_dropped_notification(
2455 client, state);
2456 if (ret) {
2457 goto end;
2458 }
2459 }
2460 continue;
2461 }
2462
2463 ret = lttng_dynamic_buffer_append_buffer(
2464 &client->communication.outbound.buffer,
2465 &msg_buffer);
2466 if (ret) {
2467 goto end;
2468 }
2469
2470 ret = client_flush_outgoing_queue(client, state);
2471 if (ret) {
2472 goto end;
2473 }
2474 }
2475 ret = 0;
2476end:
2477 lttng_notification_destroy(notification);
2478 lttng_dynamic_buffer_reset(&msg_buffer);
2479 return ret;
2480}
2481
2482int handle_notification_thread_channel_sample(
2483 struct notification_thread_state *state, int pipe,
2484 enum lttng_domain_type domain)
2485{
2486 int ret = 0;
2487 struct lttcomm_consumer_channel_monitor_msg sample_msg;
ab0ee2ca
JG
2488 struct channel_info *channel_info;
2489 struct cds_lfht_node *node;
2490 struct cds_lfht_iter iter;
2491 struct lttng_channel_trigger_list *trigger_list;
2492 struct lttng_trigger_list_element *trigger_list_element;
2493 bool previous_sample_available = false;
e8360425
JD
2494 struct channel_state_sample previous_sample, latest_sample;
2495 uint64_t previous_session_consumed_total, latest_session_consumed_total;
ab0ee2ca
JG
2496
2497 /*
2498 * The monitoring pipe only holds messages smaller than PIPE_BUF,
2499 * ensuring that read/write of sampling messages are atomic.
2500 */
7c2551ef 2501 ret = lttng_read(pipe, &sample_msg, sizeof(sample_msg));
ab0ee2ca
JG
2502 if (ret != sizeof(sample_msg)) {
2503 ERR("[notification-thread] Failed to read from monitoring pipe (fd = %i)",
2504 pipe);
2505 ret = -1;
2506 goto end;
2507 }
2508
2509 ret = 0;
2510 latest_sample.key.key = sample_msg.key;
2511 latest_sample.key.domain = domain;
2512 latest_sample.highest_usage = sample_msg.highest;
2513 latest_sample.lowest_usage = sample_msg.lowest;
e8360425 2514 latest_sample.channel_total_consumed = sample_msg.total_consumed;
ab0ee2ca
JG
2515
2516 rcu_read_lock();
2517
2518 /* Retrieve the channel's informations */
2519 cds_lfht_lookup(state->channels_ht,
2520 hash_channel_key(&latest_sample.key),
2521 match_channel_info,
2522 &latest_sample.key,
2523 &iter);
2524 node = cds_lfht_iter_get_node(&iter);
e0b5f87b 2525 if (caa_unlikely(!node)) {
ab0ee2ca
JG
2526 /*
2527 * Not an error since the consumer can push a sample to the pipe
2528 * and the rest of the session daemon could notify us of the
2529 * channel's destruction before we get a chance to process that
2530 * sample.
2531 */
2532 DBG("[notification-thread] Received a sample for an unknown channel from consumerd, key = %" PRIu64 " in %s domain",
2533 latest_sample.key.key,
2534 domain == LTTNG_DOMAIN_KERNEL ? "kernel" :
2535 "user space");
2536 goto end_unlock;
2537 }
2538 channel_info = caa_container_of(node, struct channel_info,
2539 channels_ht_node);
e8360425 2540 DBG("[notification-thread] Handling channel sample for channel %s (key = %" PRIu64 ") in session %s (highest usage = %" PRIu64 ", lowest usage = %" PRIu64", total consumed = %" PRIu64")",
8abe313a 2541 channel_info->name,
ab0ee2ca 2542 latest_sample.key.key,
8abe313a 2543 channel_info->session_info->name,
ab0ee2ca 2544 latest_sample.highest_usage,
e8360425
JD
2545 latest_sample.lowest_usage,
2546 latest_sample.channel_total_consumed);
2547
2548 previous_session_consumed_total =
2549 channel_info->session_info->consumed_data_size;
ab0ee2ca
JG
2550
2551 /* Retrieve the channel's last sample, if it exists, and update it. */
2552 cds_lfht_lookup(state->channel_state_ht,
2553 hash_channel_key(&latest_sample.key),
2554 match_channel_state_sample,
2555 &latest_sample.key,
2556 &iter);
2557 node = cds_lfht_iter_get_node(&iter);
e0b5f87b 2558 if (caa_likely(node)) {
ab0ee2ca
JG
2559 struct channel_state_sample *stored_sample;
2560
2561 /* Update the sample stored. */
2562 stored_sample = caa_container_of(node,
2563 struct channel_state_sample,
2564 channel_state_ht_node);
e8360425 2565
ab0ee2ca
JG
2566 memcpy(&previous_sample, stored_sample,
2567 sizeof(previous_sample));
2568 stored_sample->highest_usage = latest_sample.highest_usage;
2569 stored_sample->lowest_usage = latest_sample.lowest_usage;
0f0479d7 2570 stored_sample->channel_total_consumed = latest_sample.channel_total_consumed;
ab0ee2ca 2571 previous_sample_available = true;
e8360425
JD
2572
2573 latest_session_consumed_total =
2574 previous_session_consumed_total +
2575 (latest_sample.channel_total_consumed - previous_sample.channel_total_consumed);
ab0ee2ca
JG
2576 } else {
2577 /*
2578 * This is the channel's first sample, allocate space for and
2579 * store the new sample.
2580 */
2581 struct channel_state_sample *stored_sample;
2582
2583 stored_sample = zmalloc(sizeof(*stored_sample));
2584 if (!stored_sample) {
2585 ret = -1;
2586 goto end_unlock;
2587 }
2588
2589 memcpy(stored_sample, &latest_sample, sizeof(*stored_sample));
2590 cds_lfht_node_init(&stored_sample->channel_state_ht_node);
2591 cds_lfht_add(state->channel_state_ht,
2592 hash_channel_key(&stored_sample->key),
2593 &stored_sample->channel_state_ht_node);
e8360425
JD
2594
2595 latest_session_consumed_total =
2596 previous_session_consumed_total +
2597 latest_sample.channel_total_consumed;
ab0ee2ca
JG
2598 }
2599
e8360425
JD
2600 channel_info->session_info->consumed_data_size =
2601 latest_session_consumed_total;
2602
ab0ee2ca
JG
2603 /* Find triggers associated with this channel. */
2604 cds_lfht_lookup(state->channel_triggers_ht,
2605 hash_channel_key(&latest_sample.key),
2606 match_channel_trigger_list,
2607 &latest_sample.key,
2608 &iter);
2609 node = cds_lfht_iter_get_node(&iter);
e0b5f87b 2610 if (caa_likely(!node)) {
ab0ee2ca
JG
2611 goto end_unlock;
2612 }
2613
2614 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
2615 channel_triggers_ht_node);
2616 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
2617 node) {
2618 struct lttng_condition *condition;
2619 struct lttng_action *action;
2620 struct lttng_trigger *trigger;
2621 struct notification_client_list *client_list;
2622 struct lttng_evaluation *evaluation = NULL;
2623
2624 trigger = trigger_list_element->trigger;
2625 condition = lttng_trigger_get_condition(trigger);
2626 assert(condition);
2627 action = lttng_trigger_get_action(trigger);
2628
2629 /* Notify actions are the only type currently supported. */
2630 assert(lttng_action_get_type(action) ==
2631 LTTNG_ACTION_TYPE_NOTIFY);
2632
2633 /*
2634 * Check if any client is subscribed to the result of this
2635 * evaluation.
2636 */
2637 cds_lfht_lookup(state->notification_trigger_clients_ht,
2638 lttng_condition_hash(condition),
2639 match_client_list,
2640 trigger,
2641 &iter);
2642 node = cds_lfht_iter_get_node(&iter);
2643 assert(node);
2644
2645 client_list = caa_container_of(node,
2646 struct notification_client_list,
2647 notification_trigger_ht_node);
2648 if (cds_list_empty(&client_list->list)) {
2649 /*
2650 * No clients interested in the evaluation's result,
2651 * skip it.
2652 */
2653 continue;
2654 }
2655
2656 ret = evaluate_condition(condition, &evaluation, state,
2657 previous_sample_available ? &previous_sample : NULL,
e8360425
JD
2658 &latest_sample,
2659 previous_session_consumed_total,
2660 latest_session_consumed_total,
2661 channel_info);
2662 if (caa_unlikely(ret)) {
ab0ee2ca
JG
2663 goto end_unlock;
2664 }
2665
e8360425 2666 if (caa_likely(!evaluation)) {
ab0ee2ca
JG
2667 continue;
2668 }
2669
2670 /* Dispatch evaluation result to all clients. */
2671 ret = send_evaluation_to_clients(trigger_list_element->trigger,
2672 evaluation, client_list, state,
8abe313a
JG
2673 channel_info->session_info->uid,
2674 channel_info->session_info->gid);
2675 lttng_evaluation_destroy(evaluation);
e0b5f87b 2676 if (caa_unlikely(ret)) {
ab0ee2ca
JG
2677 goto end_unlock;
2678 }
2679 }
2680end_unlock:
2681 rcu_read_unlock();
2682end:
2683 return ret;
2684}
This page took 0.132403 seconds and 4 git commands to generate.