Rotate timer
[lttng-tools.git] / src / bin / lttng-sessiond / notification-thread-events.c
CommitLineData
ab0ee2ca
JG
1/*
2 * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18#define _LGPL_SOURCE
19#include <urcu.h>
20#include <urcu/rculfhash.h>
21
ab0ee2ca
JG
22#include <common/defaults.h>
23#include <common/error.h>
24#include <common/futex.h>
25#include <common/unix.h>
26#include <common/dynamic-buffer.h>
27#include <common/hashtable/utils.h>
28#include <common/sessiond-comm/sessiond-comm.h>
29#include <common/macros.h>
30#include <lttng/condition/condition.h>
31#include <lttng/action/action.h>
32#include <lttng/notification/notification-internal.h>
33#include <lttng/condition/condition-internal.h>
34#include <lttng/condition/buffer-usage-internal.h>
35#include <lttng/notification/channel-internal.h>
1da26331 36
ab0ee2ca
JG
37#include <time.h>
38#include <unistd.h>
39#include <assert.h>
40#include <inttypes.h>
d53ea4e4 41#include <fcntl.h>
ab0ee2ca 42
1da26331
JG
43#include "notification-thread.h"
44#include "notification-thread-events.h"
45#include "notification-thread-commands.h"
46#include "lttng-sessiond.h"
47#include "kernel.h"
48
ab0ee2ca
JG
49#define CLIENT_POLL_MASK_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
50#define CLIENT_POLL_MASK_IN_OUT (CLIENT_POLL_MASK_IN | LPOLLOUT)
51
52struct lttng_trigger_list_element {
53 struct lttng_trigger *trigger;
54 struct cds_list_head node;
55};
56
57struct lttng_channel_trigger_list {
58 struct channel_key channel_key;
59 struct cds_list_head list;
60 struct cds_lfht_node channel_triggers_ht_node;
61};
62
63struct lttng_trigger_ht_element {
64 struct lttng_trigger *trigger;
65 struct cds_lfht_node node;
66};
67
68struct lttng_condition_list_element {
69 struct lttng_condition *condition;
70 struct cds_list_head node;
71};
72
73struct notification_client_list_element {
74 struct notification_client *client;
75 struct cds_list_head node;
76};
77
78struct notification_client_list {
79 struct lttng_trigger *trigger;
80 struct cds_list_head list;
81 struct cds_lfht_node notification_trigger_ht_node;
82};
83
84struct notification_client {
85 int socket;
86 /* Client protocol version. */
87 uint8_t major, minor;
88 uid_t uid;
89 gid_t gid;
90 /*
5332364f 91 * Indicates if the credentials and versions of the client have been
ab0ee2ca
JG
92 * checked.
93 */
94 bool validated;
95 /*
96 * Conditions to which the client's notification channel is subscribed.
97 * List of struct lttng_condition_list_node. The condition member is
98 * owned by the client.
99 */
100 struct cds_list_head condition_list;
101 struct cds_lfht_node client_socket_ht_node;
102 struct {
103 struct {
14fa22f8
JG
104 /*
105 * During the reception of a message, the reception
106 * buffers' "size" is set to contain the current
107 * message's complete payload.
108 */
ab0ee2ca
JG
109 struct lttng_dynamic_buffer buffer;
110 /* Bytes left to receive for the current message. */
111 size_t bytes_to_receive;
112 /* Type of the message being received. */
113 enum lttng_notification_channel_message_type msg_type;
114 /*
115 * Indicates whether or not credentials are expected
116 * from the client.
117 */
01ea340e 118 bool expect_creds;
ab0ee2ca
JG
119 /*
120 * Indicates whether or not credentials were received
121 * from the client.
122 */
123 bool creds_received;
14fa22f8 124 /* Only used during credentials reception. */
ab0ee2ca
JG
125 lttng_sock_cred creds;
126 } inbound;
127 struct {
128 /*
129 * Indicates whether or not a notification addressed to
130 * this client was dropped because a command reply was
131 * already buffered.
132 *
133 * A notification is dropped whenever the buffer is not
134 * empty.
135 */
136 bool dropped_notification;
137 /*
138 * Indicates whether or not a command reply is already
139 * buffered. In this case, it means that the client is
140 * not consuming command replies before emitting a new
141 * one. This could be caused by a protocol error or a
142 * misbehaving/malicious client.
143 */
144 bool queued_command_reply;
145 struct lttng_dynamic_buffer buffer;
146 } outbound;
147 } communication;
148};
149
150struct channel_state_sample {
151 struct channel_key key;
152 struct cds_lfht_node channel_state_ht_node;
153 uint64_t highest_usage;
154 uint64_t lowest_usage;
155};
156
e4db5ace
JR
157static unsigned long hash_channel_key(struct channel_key *key);
158static int evaluate_condition(struct lttng_condition *condition,
159 struct lttng_evaluation **evaluation,
160 struct notification_thread_state *state,
161 struct channel_state_sample *previous_sample,
162 struct channel_state_sample *latest_sample,
163 uint64_t buffer_capacity);
164static
165int send_evaluation_to_clients(struct lttng_trigger *trigger,
166 struct lttng_evaluation *evaluation,
167 struct notification_client_list *client_list,
168 struct notification_thread_state *state,
169 uid_t channel_uid, gid_t channel_gid);
170
ab0ee2ca
JG
171static
172int match_client(struct cds_lfht_node *node, const void *key)
173{
174 /* This double-cast is intended to supress pointer-to-cast warning. */
175 int socket = (int) (intptr_t) key;
176 struct notification_client *client;
177
178 client = caa_container_of(node, struct notification_client,
179 client_socket_ht_node);
180
181 return !!(client->socket == socket);
182}
183
184static
185int match_channel_trigger_list(struct cds_lfht_node *node, const void *key)
186{
187 struct channel_key *channel_key = (struct channel_key *) key;
188 struct lttng_channel_trigger_list *trigger_list;
189
190 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
191 channel_triggers_ht_node);
192
193 return !!((channel_key->key == trigger_list->channel_key.key) &&
194 (channel_key->domain == trigger_list->channel_key.domain));
195}
196
197static
198int match_channel_state_sample(struct cds_lfht_node *node, const void *key)
199{
200 struct channel_key *channel_key = (struct channel_key *) key;
201 struct channel_state_sample *sample;
202
203 sample = caa_container_of(node, struct channel_state_sample,
204 channel_state_ht_node);
205
206 return !!((channel_key->key == sample->key.key) &&
207 (channel_key->domain == sample->key.domain));
208}
209
210static
211int match_channel_info(struct cds_lfht_node *node, const void *key)
212{
213 struct channel_key *channel_key = (struct channel_key *) key;
214 struct channel_info *channel_info;
215
216 channel_info = caa_container_of(node, struct channel_info,
217 channels_ht_node);
218
219 return !!((channel_key->key == channel_info->key.key) &&
220 (channel_key->domain == channel_info->key.domain));
221}
222
223static
224int match_condition(struct cds_lfht_node *node, const void *key)
225{
226 struct lttng_condition *condition_key = (struct lttng_condition *) key;
227 struct lttng_trigger_ht_element *trigger;
228 struct lttng_condition *condition;
229
230 trigger = caa_container_of(node, struct lttng_trigger_ht_element,
231 node);
232 condition = lttng_trigger_get_condition(trigger->trigger);
233 assert(condition);
234
235 return !!lttng_condition_is_equal(condition_key, condition);
236}
237
238static
239int match_client_list(struct cds_lfht_node *node, const void *key)
240{
241 struct lttng_trigger *trigger_key = (struct lttng_trigger *) key;
242 struct notification_client_list *client_list;
243 struct lttng_condition *condition;
244 struct lttng_condition *condition_key = lttng_trigger_get_condition(
245 trigger_key);
246
247 assert(condition_key);
248
249 client_list = caa_container_of(node, struct notification_client_list,
250 notification_trigger_ht_node);
251 condition = lttng_trigger_get_condition(client_list->trigger);
252
253 return !!lttng_condition_is_equal(condition_key, condition);
254}
255
256static
257int match_client_list_condition(struct cds_lfht_node *node, const void *key)
258{
259 struct lttng_condition *condition_key = (struct lttng_condition *) key;
260 struct notification_client_list *client_list;
261 struct lttng_condition *condition;
262
263 assert(condition_key);
264
265 client_list = caa_container_of(node, struct notification_client_list,
266 notification_trigger_ht_node);
267 condition = lttng_trigger_get_condition(client_list->trigger);
268
269 return !!lttng_condition_is_equal(condition_key, condition);
270}
271
272static
273unsigned long lttng_condition_buffer_usage_hash(
274 struct lttng_condition *_condition)
275{
276 unsigned long hash = 0;
277 struct lttng_condition_buffer_usage *condition;
278
279 condition = container_of(_condition,
280 struct lttng_condition_buffer_usage, parent);
281
282 if (condition->session_name) {
283 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
284 }
285 if (condition->channel_name) {
8f56701f 286 hash ^= hash_key_str(condition->channel_name, lttng_ht_seed);
ab0ee2ca
JG
287 }
288 if (condition->domain.set) {
289 hash ^= hash_key_ulong(
290 (void *) condition->domain.type,
291 lttng_ht_seed);
292 }
293 if (condition->threshold_ratio.set) {
294 uint64_t val;
295
296 val = condition->threshold_ratio.value * (double) UINT32_MAX;
297 hash ^= hash_key_u64(&val, lttng_ht_seed);
6633b0dd 298 } else if (condition->threshold_bytes.set) {
ab0ee2ca
JG
299 uint64_t val;
300
301 val = condition->threshold_bytes.value;
302 hash ^= hash_key_u64(&val, lttng_ht_seed);
303 }
304 return hash;
305}
306
307/*
308 * The lttng_condition hashing code is kept in this file (rather than
309 * condition.c) since it makes use of GPLv2 code (hashtable utils), which we
310 * don't want to link in liblttng-ctl.
311 */
312static
313unsigned long lttng_condition_hash(struct lttng_condition *condition)
314{
315 switch (condition->type) {
316 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
317 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
318 return lttng_condition_buffer_usage_hash(condition);
319 default:
320 ERR("[notification-thread] Unexpected condition type caught");
321 abort();
322 }
323}
324
e4db5ace
JR
325static
326unsigned long hash_channel_key(struct channel_key *key)
327{
328 unsigned long key_hash = hash_key_u64(&key->key, lttng_ht_seed);
329 unsigned long domain_hash = hash_key_ulong(
330 (void *) (unsigned long) key->domain, lttng_ht_seed);
331
332 return key_hash ^ domain_hash;
333}
334
ab0ee2ca
JG
335static
336void channel_info_destroy(struct channel_info *channel_info)
337{
338 if (!channel_info) {
339 return;
340 }
341
342 if (channel_info->session_name) {
343 free(channel_info->session_name);
344 }
345 if (channel_info->channel_name) {
346 free(channel_info->channel_name);
347 }
348 free(channel_info);
349}
350
351static
352struct channel_info *channel_info_copy(struct channel_info *channel_info)
353{
354 struct channel_info *copy = zmalloc(sizeof(*channel_info));
355
356 assert(channel_info);
357 assert(channel_info->session_name);
358 assert(channel_info->channel_name);
359
360 if (!copy) {
361 goto end;
362 }
363
364 memcpy(copy, channel_info, sizeof(*channel_info));
365 copy->session_name = NULL;
366 copy->channel_name = NULL;
367
368 copy->session_name = strdup(channel_info->session_name);
369 if (!copy->session_name) {
370 goto error;
371 }
372 copy->channel_name = strdup(channel_info->channel_name);
373 if (!copy->channel_name) {
374 goto error;
375 }
376 cds_lfht_node_init(&channel_info->channels_ht_node);
377end:
378 return copy;
379error:
380 channel_info_destroy(copy);
381 return NULL;
382}
383
e4db5ace
JR
384/* This function must be called with the RCU read lock held. */
385static
386int evaluate_condition_for_client(struct lttng_trigger *trigger,
387 struct lttng_condition *condition,
388 struct notification_client *client,
389 struct notification_thread_state *state)
390{
391 int ret;
392 struct cds_lfht_iter iter;
393 struct cds_lfht_node *node;
394 struct channel_info *channel_info = NULL;
395 struct channel_key *channel_key = NULL;
396 struct channel_state_sample *last_sample = NULL;
397 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
398 struct lttng_evaluation *evaluation = NULL;
399 struct notification_client_list client_list = { 0 };
400 struct notification_client_list_element client_list_element = { 0 };
401
402 assert(trigger);
403 assert(condition);
404 assert(client);
405 assert(state);
406
407 /* Find the channel associated with the trigger. */
408 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter,
409 channel_trigger_list , channel_triggers_ht_node) {
410 struct lttng_trigger_list_element *element;
411
412 cds_list_for_each_entry(element, &channel_trigger_list->list, node) {
413 struct lttng_condition *current_condition =
414 lttng_trigger_get_condition(
415 element->trigger);
416
417 assert(current_condition);
418 if (!lttng_condition_is_equal(condition,
2ae99f0b 419 current_condition)) {
e4db5ace
JR
420 continue;
421 }
422
423 /* Found the trigger, save the channel key. */
424 channel_key = &channel_trigger_list->channel_key;
425 break;
426 }
427 if (channel_key) {
428 /* The channel key was found stop iteration. */
429 break;
430 }
431 }
432
433 if (!channel_key){
434 /* No channel found; normal exit. */
435 DBG("[notification-thread] No channel associated with newly subscribed-to condition");
436 ret = 0;
437 goto end;
438 }
439
440 /* Fetch channel info for the matching channel. */
441 cds_lfht_lookup(state->channels_ht,
442 hash_channel_key(channel_key),
443 match_channel_info,
444 channel_key,
445 &iter);
446 node = cds_lfht_iter_get_node(&iter);
447 assert(node);
448 channel_info = caa_container_of(node, struct channel_info,
449 channels_ht_node);
450
451 /* Retrieve the channel's last sample, if it exists. */
452 cds_lfht_lookup(state->channel_state_ht,
453 hash_channel_key(channel_key),
454 match_channel_state_sample,
455 channel_key,
456 &iter);
457 node = cds_lfht_iter_get_node(&iter);
458 if (node) {
459 last_sample = caa_container_of(node,
460 struct channel_state_sample,
461 channel_state_ht_node);
462 } else {
463 /* Nothing to evaluate, no sample was ever taken. Normal exit */
464 DBG("[notification-thread] No channel sample associated with newly subscribed-to condition");
465 ret = 0;
466 goto end;
467 }
468
469 ret = evaluate_condition(condition, &evaluation, state, NULL,
470 last_sample, channel_info->capacity);
471 if (ret) {
066a3c82 472 WARN("[notification-thread] Fatal error occurred while evaluating a newly subscribed-to condition");
e4db5ace
JR
473 goto end;
474 }
475
476 if (!evaluation) {
477 /* Evaluation yielded nothing. Normal exit. */
478 DBG("[notification-thread] Newly subscribed-to condition evaluated to false, nothing to report to client");
479 ret = 0;
480 goto end;
481 }
482
483 /*
484 * Create a temporary client list with the client currently
485 * subscribing.
486 */
487 cds_lfht_node_init(&client_list.notification_trigger_ht_node);
488 CDS_INIT_LIST_HEAD(&client_list.list);
489 client_list.trigger = trigger;
490
491 CDS_INIT_LIST_HEAD(&client_list_element.node);
492 client_list_element.client = client;
493 cds_list_add(&client_list_element.node, &client_list.list);
494
495 /* Send evaluation result to the newly-subscribed client. */
496 DBG("[notification-thread] Newly subscribed-to condition evaluated to true, notifying client");
497 ret = send_evaluation_to_clients(trigger, evaluation, &client_list,
498 state, channel_info->uid, channel_info->gid);
499
500end:
501 return ret;
502}
503
ab0ee2ca
JG
504static
505int notification_thread_client_subscribe(struct notification_client *client,
506 struct lttng_condition *condition,
507 struct notification_thread_state *state,
508 enum lttng_notification_channel_status *_status)
509{
510 int ret = 0;
511 struct cds_lfht_iter iter;
512 struct cds_lfht_node *node;
513 struct notification_client_list *client_list;
514 struct lttng_condition_list_element *condition_list_element = NULL;
515 struct notification_client_list_element *client_list_element = NULL;
516 enum lttng_notification_channel_status status =
517 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
518
519 /*
520 * Ensure that the client has not already subscribed to this condition
521 * before.
522 */
523 cds_list_for_each_entry(condition_list_element, &client->condition_list, node) {
524 if (lttng_condition_is_equal(condition_list_element->condition,
525 condition)) {
526 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ALREADY_SUBSCRIBED;
527 goto end;
528 }
529 }
530
531 condition_list_element = zmalloc(sizeof(*condition_list_element));
532 if (!condition_list_element) {
533 ret = -1;
534 goto error;
535 }
536 client_list_element = zmalloc(sizeof(*client_list_element));
537 if (!client_list_element) {
538 ret = -1;
539 goto error;
540 }
541
542 rcu_read_lock();
543
544 /*
545 * Add the newly-subscribed condition to the client's subscription list.
546 */
547 CDS_INIT_LIST_HEAD(&condition_list_element->node);
548 condition_list_element->condition = condition;
549 cds_list_add(&condition_list_element->node, &client->condition_list);
550
ab0ee2ca
JG
551 cds_lfht_lookup(state->notification_trigger_clients_ht,
552 lttng_condition_hash(condition),
553 match_client_list_condition,
554 condition,
555 &iter);
556 node = cds_lfht_iter_get_node(&iter);
557 if (!node) {
2ae99f0b
JG
558 /*
559 * No notification-emiting trigger registered with this
560 * condition. We don't evaluate the condition right away
561 * since this trigger is not registered yet.
562 */
4fb43b68 563 free(client_list_element);
ab0ee2ca
JG
564 goto end_unlock;
565 }
566
567 client_list = caa_container_of(node, struct notification_client_list,
568 notification_trigger_ht_node);
2ae99f0b
JG
569 /*
570 * The condition to which the client just subscribed is evaluated
571 * at this point so that conditions that are already TRUE result
572 * in a notification being sent out.
573 */
e4db5ace
JR
574 if (evaluate_condition_for_client(client_list->trigger, condition,
575 client, state)) {
576 WARN("[notification-thread] Evaluation of a condition on client subscription failed, aborting.");
577 ret = -1;
578 goto end_unlock;
579 }
580
581 /*
582 * Add the client to the list of clients interested in a given trigger
583 * if a "notification" trigger with a corresponding condition was
584 * added prior.
585 */
ab0ee2ca
JG
586 client_list_element->client = client;
587 CDS_INIT_LIST_HEAD(&client_list_element->node);
588 cds_list_add(&client_list_element->node, &client_list->list);
589end_unlock:
590 rcu_read_unlock();
591end:
592 if (_status) {
593 *_status = status;
594 }
595 return ret;
596error:
597 free(condition_list_element);
598 free(client_list_element);
599 return ret;
600}
601
602static
603int notification_thread_client_unsubscribe(
604 struct notification_client *client,
605 struct lttng_condition *condition,
606 struct notification_thread_state *state,
607 enum lttng_notification_channel_status *_status)
608{
609 struct cds_lfht_iter iter;
610 struct cds_lfht_node *node;
611 struct notification_client_list *client_list;
612 struct lttng_condition_list_element *condition_list_element,
613 *condition_tmp;
614 struct notification_client_list_element *client_list_element,
615 *client_tmp;
616 bool condition_found = false;
617 enum lttng_notification_channel_status status =
618 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
619
620 /* Remove the condition from the client's condition list. */
621 cds_list_for_each_entry_safe(condition_list_element, condition_tmp,
622 &client->condition_list, node) {
623 if (!lttng_condition_is_equal(condition_list_element->condition,
624 condition)) {
625 continue;
626 }
627
628 cds_list_del(&condition_list_element->node);
629 /*
630 * The caller may be iterating on the client's conditions to
631 * tear down a client's connection. In this case, the condition
632 * will be destroyed at the end.
633 */
634 if (condition != condition_list_element->condition) {
635 lttng_condition_destroy(
636 condition_list_element->condition);
637 }
638 free(condition_list_element);
639 condition_found = true;
640 break;
641 }
642
643 if (!condition_found) {
644 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNKNOWN_CONDITION;
645 goto end;
646 }
647
648 /*
649 * Remove the client from the list of clients interested the trigger
650 * matching the condition.
651 */
652 rcu_read_lock();
653 cds_lfht_lookup(state->notification_trigger_clients_ht,
654 lttng_condition_hash(condition),
655 match_client_list_condition,
656 condition,
657 &iter);
658 node = cds_lfht_iter_get_node(&iter);
659 if (!node) {
660 goto end_unlock;
661 }
662
663 client_list = caa_container_of(node, struct notification_client_list,
664 notification_trigger_ht_node);
665 cds_list_for_each_entry_safe(client_list_element, client_tmp,
666 &client_list->list, node) {
667 if (client_list_element->client->socket != client->socket) {
668 continue;
669 }
670 cds_list_del(&client_list_element->node);
671 free(client_list_element);
672 break;
673 }
674end_unlock:
675 rcu_read_unlock();
676end:
677 lttng_condition_destroy(condition);
678 if (_status) {
679 *_status = status;
680 }
681 return 0;
682}
683
684static
685void notification_client_destroy(struct notification_client *client,
686 struct notification_thread_state *state)
687{
688 struct lttng_condition_list_element *condition_list_element, *tmp;
689
690 if (!client) {
691 return;
692 }
693
694 /* Release all conditions to which the client was subscribed. */
695 cds_list_for_each_entry_safe(condition_list_element, tmp,
696 &client->condition_list, node) {
697 (void) notification_thread_client_unsubscribe(client,
698 condition_list_element->condition, state, NULL);
699 }
700
701 if (client->socket >= 0) {
702 (void) lttcomm_close_unix_sock(client->socket);
703 }
704 lttng_dynamic_buffer_reset(&client->communication.inbound.buffer);
705 lttng_dynamic_buffer_reset(&client->communication.outbound.buffer);
706 free(client);
707}
708
709/*
710 * Call with rcu_read_lock held (and hold for the lifetime of the returned
711 * client pointer).
712 */
713static
714struct notification_client *get_client_from_socket(int socket,
715 struct notification_thread_state *state)
716{
717 struct cds_lfht_iter iter;
718 struct cds_lfht_node *node;
719 struct notification_client *client = NULL;
720
721 cds_lfht_lookup(state->client_socket_ht,
722 hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed),
723 match_client,
724 (void *) (unsigned long) socket,
725 &iter);
726 node = cds_lfht_iter_get_node(&iter);
727 if (!node) {
728 goto end;
729 }
730
731 client = caa_container_of(node, struct notification_client,
732 client_socket_ht_node);
733end:
734 return client;
735}
736
737static
738bool trigger_applies_to_channel(struct lttng_trigger *trigger,
739 struct channel_info *info)
740{
741 enum lttng_condition_status status;
742 struct lttng_condition *condition;
743 const char *trigger_session_name = NULL;
744 const char *trigger_channel_name = NULL;
745 enum lttng_domain_type trigger_domain;
746
747 condition = lttng_trigger_get_condition(trigger);
748 if (!condition) {
749 goto fail;
750 }
751
752 switch (lttng_condition_get_type(condition)) {
753 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
754 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
755 break;
756 default:
757 goto fail;
758 }
759
760 status = lttng_condition_buffer_usage_get_domain_type(condition,
761 &trigger_domain);
762 assert(status == LTTNG_CONDITION_STATUS_OK);
763 if (info->key.domain != trigger_domain) {
764 goto fail;
765 }
766
767 status = lttng_condition_buffer_usage_get_session_name(
768 condition, &trigger_session_name);
769 assert((status == LTTNG_CONDITION_STATUS_OK) && trigger_session_name);
770
771 status = lttng_condition_buffer_usage_get_channel_name(
772 condition, &trigger_channel_name);
773 assert((status == LTTNG_CONDITION_STATUS_OK) && trigger_channel_name);
774
775 if (strcmp(info->session_name, trigger_session_name)) {
776 goto fail;
777 }
778 if (strcmp(info->channel_name, trigger_channel_name)) {
779 goto fail;
780 }
781
782 return true;
783fail:
784 return false;
785}
786
787static
788bool trigger_applies_to_client(struct lttng_trigger *trigger,
789 struct notification_client *client)
790{
791 bool applies = false;
792 struct lttng_condition_list_element *condition_list_element;
793
794 cds_list_for_each_entry(condition_list_element, &client->condition_list,
795 node) {
796 applies = lttng_condition_is_equal(
797 condition_list_element->condition,
798 lttng_trigger_get_condition(trigger));
799 if (applies) {
800 break;
801 }
802 }
803 return applies;
804}
805
ab0ee2ca
JG
806static
807int handle_notification_thread_command_add_channel(
808 struct notification_thread_state *state,
809 struct channel_info *channel_info,
810 enum lttng_error_code *cmd_result)
811{
812 struct cds_list_head trigger_list;
813 struct channel_info *new_channel_info;
814 struct channel_key *channel_key;
815 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
816 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
817 int trigger_count = 0;
818 struct cds_lfht_iter iter;
819
820 DBG("[notification-thread] Adding channel %s from session %s, channel key = %" PRIu64 " in %s domain",
821 channel_info->channel_name, channel_info->session_name,
822 channel_info->key.key, channel_info->key.domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
823
824 CDS_INIT_LIST_HEAD(&trigger_list);
825
826 new_channel_info = channel_info_copy(channel_info);
827 if (!new_channel_info) {
828 goto error;
829 }
830
831 channel_key = &new_channel_info->key;
832
833 /* Build a list of all triggers applying to the new channel. */
834 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
835 node) {
836 struct lttng_trigger_list_element *new_element;
837
838 if (!trigger_applies_to_channel(trigger_ht_element->trigger,
839 channel_info)) {
840 continue;
841 }
842
843 new_element = zmalloc(sizeof(*new_element));
844 if (!new_element) {
845 goto error;
846 }
847 CDS_INIT_LIST_HEAD(&new_element->node);
848 new_element->trigger = trigger_ht_element->trigger;
849 cds_list_add(&new_element->node, &trigger_list);
850 trigger_count++;
851 }
852
853 DBG("[notification-thread] Found %i triggers that apply to newly added channel",
854 trigger_count);
855 channel_trigger_list = zmalloc(sizeof(*channel_trigger_list));
856 if (!channel_trigger_list) {
857 goto error;
858 }
859 channel_trigger_list->channel_key = *channel_key;
860 CDS_INIT_LIST_HEAD(&channel_trigger_list->list);
861 cds_lfht_node_init(&channel_trigger_list->channel_triggers_ht_node);
862 cds_list_splice(&trigger_list, &channel_trigger_list->list);
863
864 rcu_read_lock();
865 /* Add channel to the channel_ht which owns the channel_infos. */
866 cds_lfht_add(state->channels_ht,
867 hash_channel_key(channel_key),
868 &new_channel_info->channels_ht_node);
869 /*
870 * Add the list of triggers associated with this channel to the
871 * channel_triggers_ht.
872 */
873 cds_lfht_add(state->channel_triggers_ht,
874 hash_channel_key(channel_key),
875 &channel_trigger_list->channel_triggers_ht_node);
876 rcu_read_unlock();
877 *cmd_result = LTTNG_OK;
878 return 0;
879error:
880 /* Empty trigger list */
881 channel_info_destroy(new_channel_info);
882 return 1;
883}
884
885static
886int handle_notification_thread_command_remove_channel(
887 struct notification_thread_state *state,
888 uint64_t channel_key, enum lttng_domain_type domain,
889 enum lttng_error_code *cmd_result)
890{
891 struct cds_lfht_node *node;
892 struct cds_lfht_iter iter;
893 struct lttng_channel_trigger_list *trigger_list;
894 struct lttng_trigger_list_element *trigger_list_element, *tmp;
895 struct channel_key key = { .key = channel_key, .domain = domain };
896 struct channel_info *channel_info;
897
898 DBG("[notification-thread] Removing channel key = %" PRIu64 " in %s domain",
899 channel_key, domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
900
901 rcu_read_lock();
902
903 cds_lfht_lookup(state->channel_triggers_ht,
904 hash_channel_key(&key),
905 match_channel_trigger_list,
906 &key,
907 &iter);
908 node = cds_lfht_iter_get_node(&iter);
909 /*
910 * There is a severe internal error if we are being asked to remove a
911 * channel that doesn't exist.
912 */
913 if (!node) {
914 ERR("[notification-thread] Channel being removed is unknown to the notification thread");
915 goto end;
916 }
917
918 /* Free the list of triggers associated with this channel. */
919 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
920 channel_triggers_ht_node);
921 cds_list_for_each_entry_safe(trigger_list_element, tmp,
922 &trigger_list->list, node) {
923 cds_list_del(&trigger_list_element->node);
924 free(trigger_list_element);
925 }
926 cds_lfht_del(state->channel_triggers_ht, node);
927 free(trigger_list);
928
929 /* Free sampled channel state. */
930 cds_lfht_lookup(state->channel_state_ht,
931 hash_channel_key(&key),
932 match_channel_state_sample,
933 &key,
934 &iter);
935 node = cds_lfht_iter_get_node(&iter);
936 /*
937 * This is expected to be NULL if the channel is destroyed before we
938 * received a sample.
939 */
940 if (node) {
941 struct channel_state_sample *sample = caa_container_of(node,
942 struct channel_state_sample,
943 channel_state_ht_node);
944
945 cds_lfht_del(state->channel_state_ht, node);
946 free(sample);
947 }
948
949 /* Remove the channel from the channels_ht and free it. */
950 cds_lfht_lookup(state->channels_ht,
951 hash_channel_key(&key),
952 match_channel_info,
953 &key,
954 &iter);
955 node = cds_lfht_iter_get_node(&iter);
956 assert(node);
957 channel_info = caa_container_of(node, struct channel_info,
958 channels_ht_node);
959 cds_lfht_del(state->channels_ht, node);
960 channel_info_destroy(channel_info);
961end:
962 rcu_read_unlock();
963 *cmd_result = LTTNG_OK;
964 return 0;
965}
966
1da26331
JG
967static
968int condition_is_supported(struct lttng_condition *condition)
969{
970 int ret;
971
972 switch (lttng_condition_get_type(condition)) {
973 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
974 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
975 {
976 enum lttng_domain_type domain;
977
978 ret = lttng_condition_buffer_usage_get_domain_type(condition,
979 &domain);
980 if (ret) {
981 ret = -1;
982 goto end;
983 }
984
985 if (domain != LTTNG_DOMAIN_KERNEL) {
986 ret = 1;
987 goto end;
988 }
989
990 /*
991 * Older kernel tracers don't expose the API to monitor their
992 * buffers. Therefore, we reject triggers that require that
993 * mechanism to be available to be evaluated.
994 */
995 ret = kernel_supports_ring_buffer_snapshot_sample_positions(
996 kernel_tracer_fd);
997 break;
998 }
999 default:
1000 ret = 1;
1001 }
1002end:
1003 return ret;
1004}
1005
ab0ee2ca
JG
1006/*
1007 * FIXME A client's credentials are not checked when registering a trigger, nor
1008 * are they stored alongside with the trigger.
1009 *
1da26331 1010 * The effects of this are benign since:
ab0ee2ca
JG
1011 * - The client will succeed in registering the trigger, as it is valid,
1012 * - The trigger will, internally, be bound to the channel,
1013 * - The notifications will not be sent since the client's credentials
1014 * are checked against the channel at that moment.
1da26331
JG
1015 *
1016 * If this function returns a non-zero value, it means something is
50ca7858 1017 * fundamentally broken and the whole subsystem/thread will be torn down.
1da26331
JG
1018 *
1019 * If a non-fatal error occurs, just set the cmd_result to the appropriate
1020 * error code.
ab0ee2ca
JG
1021 */
1022static
1023int handle_notification_thread_command_register_trigger(
1024 struct notification_thread_state *state,
1025 struct lttng_trigger *trigger,
1026 enum lttng_error_code *cmd_result)
1027{
1028 int ret = 0;
1029 struct lttng_condition *condition;
1030 struct notification_client *client;
1031 struct notification_client_list *client_list = NULL;
1032 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1033 struct notification_client_list_element *client_list_element, *tmp;
1034 struct cds_lfht_node *node;
1035 struct cds_lfht_iter iter;
1036 struct channel_info *channel;
1037 bool free_trigger = true;
1038
1039 rcu_read_lock();
1040
1041 condition = lttng_trigger_get_condition(trigger);
1da26331
JG
1042 assert(condition);
1043
1044 ret = condition_is_supported(condition);
1045 if (ret < 0) {
1046 goto error;
1047 } else if (ret == 0) {
1048 *cmd_result = LTTNG_ERR_NOT_SUPPORTED;
1049 goto error;
1050 } else {
1051 /* Feature is supported, continue. */
1052 ret = 0;
1053 }
1054
ab0ee2ca
JG
1055 trigger_ht_element = zmalloc(sizeof(*trigger_ht_element));
1056 if (!trigger_ht_element) {
1057 ret = -1;
1058 goto error;
1059 }
1060
1061 /* Add trigger to the trigger_ht. */
1062 cds_lfht_node_init(&trigger_ht_element->node);
1063 trigger_ht_element->trigger = trigger;
1064
1065 node = cds_lfht_add_unique(state->triggers_ht,
1066 lttng_condition_hash(condition),
1067 match_condition,
1068 condition,
1069 &trigger_ht_element->node);
1070 if (node != &trigger_ht_element->node) {
1071 /* Not a fatal error, simply report it to the client. */
1072 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
1073 goto error_free_ht_element;
1074 }
1075
1076 /*
1077 * Ownership of the trigger and of its wrapper was transfered to
1078 * the triggers_ht.
1079 */
1080 trigger_ht_element = NULL;
1081 free_trigger = false;
1082
1083 /*
1084 * The rest only applies to triggers that have a "notify" action.
1085 * It is not skipped as this is the only action type currently
1086 * supported.
1087 */
1088 client_list = zmalloc(sizeof(*client_list));
1089 if (!client_list) {
1090 ret = -1;
1091 goto error_free_ht_element;
1092 }
1093 cds_lfht_node_init(&client_list->notification_trigger_ht_node);
1094 CDS_INIT_LIST_HEAD(&client_list->list);
1095 client_list->trigger = trigger;
1096
1097 /* Build a list of clients to which this new trigger applies. */
1098 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
1099 client_socket_ht_node) {
1100 if (!trigger_applies_to_client(trigger, client)) {
1101 continue;
1102 }
1103
1104 client_list_element = zmalloc(sizeof(*client_list_element));
1105 if (!client_list_element) {
1106 ret = -1;
1107 goto error_free_client_list;
1108 }
1109 CDS_INIT_LIST_HEAD(&client_list_element->node);
1110 client_list_element->client = client;
1111 cds_list_add(&client_list_element->node, &client_list->list);
1112 }
1113
1114 cds_lfht_add(state->notification_trigger_clients_ht,
1115 lttng_condition_hash(condition),
1116 &client_list->notification_trigger_ht_node);
ab0ee2ca
JG
1117
1118 /*
1119 * Add the trigger to list of triggers bound to the channels currently
1120 * known.
1121 */
1122 cds_lfht_for_each_entry(state->channels_ht, &iter, channel,
1123 channels_ht_node) {
1124 struct lttng_trigger_list_element *trigger_list_element;
1125 struct lttng_channel_trigger_list *trigger_list;
1126
1127 if (!trigger_applies_to_channel(trigger, channel)) {
1128 continue;
1129 }
1130
1131 cds_lfht_lookup(state->channel_triggers_ht,
1132 hash_channel_key(&channel->key),
1133 match_channel_trigger_list,
1134 &channel->key,
1135 &iter);
1136 node = cds_lfht_iter_get_node(&iter);
1137 assert(node);
ab0ee2ca
JG
1138 trigger_list = caa_container_of(node,
1139 struct lttng_channel_trigger_list,
1140 channel_triggers_ht_node);
1141
1142 trigger_list_element = zmalloc(sizeof(*trigger_list_element));
1143 if (!trigger_list_element) {
1144 ret = -1;
1145 goto error_free_client_list;
1146 }
1147 CDS_INIT_LIST_HEAD(&trigger_list_element->node);
1148 trigger_list_element->trigger = trigger;
1149 cds_list_add(&trigger_list_element->node, &trigger_list->list);
e4db5ace 1150
ab0ee2ca
JG
1151 /* A trigger can only apply to one channel. */
1152 break;
1153 }
1154
2ae99f0b
JG
1155 /*
1156 * Since there is nothing preventing clients from subscribing to a
1157 * condition before the corresponding trigger is registered, we have
1158 * to evaluate this new condition right away.
1159 *
1160 * At some point, we were waiting for the next "evaluation" (e.g. on
1161 * reception of a channel sample) to evaluate this new condition, but
1162 * that was broken.
1163 *
1164 * The reason it was broken is that waiting for the next sample
1165 * does not allow us to properly handle transitions for edge-triggered
1166 * conditions.
1167 *
1168 * Consider this example: when we handle a new channel sample, we
1169 * evaluate each conditions twice: once with the previous state, and
1170 * again with the newest state. We then use those two results to
1171 * determine whether a state change happened: a condition was false and
1172 * became true. If a state change happened, we have to notify clients.
1173 *
1174 * Now, if a client subscribes to a given notification and registers
1175 * a trigger *after* that subscription, we have to make sure the
1176 * condition is evaluated at this point while considering only the
1177 * current state. Otherwise, the next evaluation cycle may only see
1178 * that the evaluations remain the same (true for samples n-1 and n) and
1179 * the client will never know that the condition has been met.
1180 */
1181 cds_list_for_each_entry_safe(client_list_element, tmp,
1182 &client_list->list, node) {
1183 ret = evaluate_condition_for_client(trigger, condition,
1184 client_list_element->client, state);
1185 if (ret) {
1186 goto error_free_client_list;
1187 }
1188 }
1189
1190 /*
1191 * Client list ownership transferred to the
1192 * notification_trigger_clients_ht.
1193 */
1194 client_list = NULL;
1195
ab0ee2ca
JG
1196 *cmd_result = LTTNG_OK;
1197error_free_client_list:
1198 if (client_list) {
1199 cds_list_for_each_entry_safe(client_list_element, tmp,
1200 &client_list->list, node) {
1201 free(client_list_element);
1202 }
1203 free(client_list);
1204 }
1205error_free_ht_element:
1206 free(trigger_ht_element);
1207error:
1208 if (free_trigger) {
1209 struct lttng_action *action = lttng_trigger_get_action(trigger);
1210
1211 lttng_condition_destroy(condition);
1212 lttng_action_destroy(action);
1213 lttng_trigger_destroy(trigger);
1214 }
1215 rcu_read_unlock();
1216 return ret;
1217}
1218
cc2295b5 1219static
ab0ee2ca
JG
1220int handle_notification_thread_command_unregister_trigger(
1221 struct notification_thread_state *state,
1222 struct lttng_trigger *trigger,
1223 enum lttng_error_code *_cmd_reply)
1224{
1225 struct cds_lfht_iter iter;
1226 struct cds_lfht_node *node, *triggers_ht_node;
1227 struct lttng_channel_trigger_list *trigger_list;
1228 struct notification_client_list *client_list;
1229 struct notification_client_list_element *client_list_element, *tmp;
1230 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1231 struct lttng_condition *condition = lttng_trigger_get_condition(
1232 trigger);
1233 struct lttng_action *action;
1234 enum lttng_error_code cmd_reply;
1235
1236 rcu_read_lock();
1237
1238 cds_lfht_lookup(state->triggers_ht,
1239 lttng_condition_hash(condition),
1240 match_condition,
1241 condition,
1242 &iter);
1243 triggers_ht_node = cds_lfht_iter_get_node(&iter);
1244 if (!triggers_ht_node) {
1245 cmd_reply = LTTNG_ERR_TRIGGER_NOT_FOUND;
1246 goto end;
1247 } else {
1248 cmd_reply = LTTNG_OK;
1249 }
1250
1251 /* Remove trigger from channel_triggers_ht. */
1252 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter, trigger_list,
1253 channel_triggers_ht_node) {
1254 struct lttng_trigger_list_element *trigger_element, *tmp;
1255
1256 cds_list_for_each_entry_safe(trigger_element, tmp,
1257 &trigger_list->list, node) {
1258 struct lttng_condition *current_condition =
1259 lttng_trigger_get_condition(
1260 trigger_element->trigger);
1261
1262 assert(current_condition);
1263 if (!lttng_condition_is_equal(condition,
1264 current_condition)) {
1265 continue;
1266 }
1267
1268 DBG("[notification-thread] Removed trigger from channel_triggers_ht");
1269 cds_list_del(&trigger_element->node);
e4db5ace
JR
1270 /* A trigger can only appear once per channel */
1271 break;
ab0ee2ca
JG
1272 }
1273 }
1274
1275 /*
1276 * Remove and release the client list from
1277 * notification_trigger_clients_ht.
1278 */
1279 cds_lfht_lookup(state->notification_trigger_clients_ht,
1280 lttng_condition_hash(condition),
1281 match_client_list,
1282 trigger,
1283 &iter);
1284 node = cds_lfht_iter_get_node(&iter);
1285 assert(node);
1286 client_list = caa_container_of(node, struct notification_client_list,
1287 notification_trigger_ht_node);
1288 cds_list_for_each_entry_safe(client_list_element, tmp,
1289 &client_list->list, node) {
1290 free(client_list_element);
1291 }
1292 cds_lfht_del(state->notification_trigger_clients_ht, node);
1293 free(client_list);
1294
1295 /* Remove trigger from triggers_ht. */
1296 trigger_ht_element = caa_container_of(triggers_ht_node,
1297 struct lttng_trigger_ht_element, node);
1298 cds_lfht_del(state->triggers_ht, triggers_ht_node);
1299
1300 condition = lttng_trigger_get_condition(trigger_ht_element->trigger);
1301 lttng_condition_destroy(condition);
1302 action = lttng_trigger_get_action(trigger_ht_element->trigger);
1303 lttng_action_destroy(action);
1304 lttng_trigger_destroy(trigger_ht_element->trigger);
1305 free(trigger_ht_element);
1306end:
1307 rcu_read_unlock();
1308 if (_cmd_reply) {
1309 *_cmd_reply = cmd_reply;
1310 }
1311 return 0;
1312}
1313
1314/* Returns 0 on success, 1 on exit requested, negative value on error. */
1315int handle_notification_thread_command(
1316 struct notification_thread_handle *handle,
1317 struct notification_thread_state *state)
1318{
1319 int ret;
1320 uint64_t counter;
1321 struct notification_thread_command *cmd;
1322
1323 /* Read event_fd to put it back into a quiescent state. */
814b4934 1324 ret = read(lttng_pipe_get_readfd(handle->cmd_queue.event_pipe), &counter, sizeof(counter));
ab0ee2ca
JG
1325 if (ret == -1) {
1326 goto error;
1327 }
1328
1329 pthread_mutex_lock(&handle->cmd_queue.lock);
1330 cmd = cds_list_first_entry(&handle->cmd_queue.list,
1331 struct notification_thread_command, cmd_list_node);
1332 switch (cmd->type) {
1333 case NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER:
1334 DBG("[notification-thread] Received register trigger command");
1335 ret = handle_notification_thread_command_register_trigger(
1336 state, cmd->parameters.trigger,
1337 &cmd->reply_code);
1338 break;
1339 case NOTIFICATION_COMMAND_TYPE_UNREGISTER_TRIGGER:
1340 DBG("[notification-thread] Received unregister trigger command");
1341 ret = handle_notification_thread_command_unregister_trigger(
1342 state, cmd->parameters.trigger,
1343 &cmd->reply_code);
1344 break;
1345 case NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL:
1346 DBG("[notification-thread] Received add channel command");
1347 ret = handle_notification_thread_command_add_channel(
1348 state, &cmd->parameters.add_channel,
1349 &cmd->reply_code);
1350 break;
1351 case NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL:
1352 DBG("[notification-thread] Received remove channel command");
1353 ret = handle_notification_thread_command_remove_channel(
1354 state, cmd->parameters.remove_channel.key,
1355 cmd->parameters.remove_channel.domain,
1356 &cmd->reply_code);
1357 break;
1358 case NOTIFICATION_COMMAND_TYPE_QUIT:
1359 DBG("[notification-thread] Received quit command");
1360 cmd->reply_code = LTTNG_OK;
1361 ret = 1;
1362 goto end;
1363 default:
1364 ERR("[notification-thread] Unknown internal command received");
1365 goto error_unlock;
1366 }
1367
1368 if (ret) {
1369 goto error_unlock;
1370 }
1371end:
1372 cds_list_del(&cmd->cmd_list_node);
8ada111f 1373 lttng_waiter_wake_up(&cmd->reply_waiter);
ab0ee2ca
JG
1374 pthread_mutex_unlock(&handle->cmd_queue.lock);
1375 return ret;
1376error_unlock:
1377 /* Wake-up and return a fatal error to the calling thread. */
8ada111f 1378 lttng_waiter_wake_up(&cmd->reply_waiter);
ab0ee2ca
JG
1379 pthread_mutex_unlock(&handle->cmd_queue.lock);
1380 cmd->reply_code = LTTNG_ERR_FATAL;
1381error:
1382 /* Indicate a fatal error to the caller. */
1383 return -1;
1384}
1385
1386static
1387unsigned long hash_client_socket(int socket)
1388{
1389 return hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed);
1390}
1391
1392static
1393int socket_set_non_blocking(int socket)
1394{
1395 int ret, flags;
1396
1397 /* Set the pipe as non-blocking. */
1398 ret = fcntl(socket, F_GETFL, 0);
1399 if (ret == -1) {
1400 PERROR("fcntl get socket flags");
1401 goto end;
1402 }
1403 flags = ret;
1404
1405 ret = fcntl(socket, F_SETFL, flags | O_NONBLOCK);
1406 if (ret == -1) {
1407 PERROR("fcntl set O_NONBLOCK socket flag");
1408 goto end;
1409 }
1410 DBG("Client socket (fd = %i) set as non-blocking", socket);
1411end:
1412 return ret;
1413}
1414
1415static
14fa22f8 1416int client_reset_inbound_state(struct notification_client *client)
ab0ee2ca
JG
1417{
1418 int ret;
1419
1420 ret = lttng_dynamic_buffer_set_size(
1421 &client->communication.inbound.buffer, 0);
1422 assert(!ret);
1423
1424 client->communication.inbound.bytes_to_receive =
1425 sizeof(struct lttng_notification_channel_message);
1426 client->communication.inbound.msg_type =
1427 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN;
ab0ee2ca
JG
1428 LTTNG_SOCK_SET_UID_CRED(&client->communication.inbound.creds, -1);
1429 LTTNG_SOCK_SET_GID_CRED(&client->communication.inbound.creds, -1);
14fa22f8
JG
1430 ret = lttng_dynamic_buffer_set_size(
1431 &client->communication.inbound.buffer,
1432 client->communication.inbound.bytes_to_receive);
1433 return ret;
ab0ee2ca
JG
1434}
1435
1436int handle_notification_thread_client_connect(
1437 struct notification_thread_state *state)
1438{
1439 int ret;
1440 struct notification_client *client;
1441
1442 DBG("[notification-thread] Handling new notification channel client connection");
1443
1444 client = zmalloc(sizeof(*client));
1445 if (!client) {
1446 /* Fatal error. */
1447 ret = -1;
1448 goto error;
1449 }
1450 CDS_INIT_LIST_HEAD(&client->condition_list);
1451 lttng_dynamic_buffer_init(&client->communication.inbound.buffer);
1452 lttng_dynamic_buffer_init(&client->communication.outbound.buffer);
01ea340e 1453 client->communication.inbound.expect_creds = true;
14fa22f8
JG
1454 ret = client_reset_inbound_state(client);
1455 if (ret) {
1456 ERR("[notification-thread] Failed to reset client communication's inbound state");
1457 ret = 0;
1458 goto error;
1459 }
ab0ee2ca
JG
1460
1461 ret = lttcomm_accept_unix_sock(state->notification_channel_socket);
1462 if (ret < 0) {
1463 ERR("[notification-thread] Failed to accept new notification channel client connection");
1464 ret = 0;
1465 goto error;
1466 }
1467
1468 client->socket = ret;
1469
1470 ret = socket_set_non_blocking(client->socket);
1471 if (ret) {
1472 ERR("[notification-thread] Failed to set new notification channel client connection socket as non-blocking");
1473 goto error;
1474 }
1475
1476 ret = lttcomm_setsockopt_creds_unix_sock(client->socket);
1477 if (ret < 0) {
1478 ERR("[notification-thread] Failed to set socket options on new notification channel client socket");
1479 ret = 0;
1480 goto error;
1481 }
1482
1483 ret = lttng_poll_add(&state->events, client->socket,
1484 LPOLLIN | LPOLLERR |
1485 LPOLLHUP | LPOLLRDHUP);
1486 if (ret < 0) {
1487 ERR("[notification-thread] Failed to add notification channel client socket to poll set");
1488 ret = 0;
1489 goto error;
1490 }
1491 DBG("[notification-thread] Added new notification channel client socket (%i) to poll set",
1492 client->socket);
1493
ab0ee2ca
JG
1494 rcu_read_lock();
1495 cds_lfht_add(state->client_socket_ht,
1496 hash_client_socket(client->socket),
1497 &client->client_socket_ht_node);
1498 rcu_read_unlock();
1499
1500 return ret;
1501error:
1502 notification_client_destroy(client, state);
1503 return ret;
1504}
1505
1506int handle_notification_thread_client_disconnect(
1507 int client_socket,
1508 struct notification_thread_state *state)
1509{
1510 int ret = 0;
1511 struct notification_client *client;
1512
1513 rcu_read_lock();
1514 DBG("[notification-thread] Closing client connection (socket fd = %i)",
1515 client_socket);
1516 client = get_client_from_socket(client_socket, state);
1517 if (!client) {
1518 /* Internal state corruption, fatal error. */
1519 ERR("[notification-thread] Unable to find client (socket fd = %i)",
1520 client_socket);
1521 ret = -1;
1522 goto end;
1523 }
1524
1525 ret = lttng_poll_del(&state->events, client_socket);
1526 if (ret) {
1527 ERR("[notification-thread] Failed to remove client socket from poll set");
1528 }
1529 cds_lfht_del(state->client_socket_ht,
1530 &client->client_socket_ht_node);
1531 notification_client_destroy(client, state);
1532end:
1533 rcu_read_unlock();
1534 return ret;
1535}
1536
1537int handle_notification_thread_client_disconnect_all(
1538 struct notification_thread_state *state)
1539{
1540 struct cds_lfht_iter iter;
1541 struct notification_client *client;
1542 bool error_encoutered = false;
1543
1544 rcu_read_lock();
1545 DBG("[notification-thread] Closing all client connections");
1546 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
1547 client_socket_ht_node) {
1548 int ret;
1549
1550 ret = handle_notification_thread_client_disconnect(
1551 client->socket, state);
1552 if (ret) {
1553 error_encoutered = true;
1554 }
1555 }
1556 rcu_read_unlock();
1557 return error_encoutered ? 1 : 0;
1558}
1559
1560int handle_notification_thread_trigger_unregister_all(
1561 struct notification_thread_state *state)
1562{
4149ace8 1563 bool error_occurred = false;
ab0ee2ca
JG
1564 struct cds_lfht_iter iter;
1565 struct lttng_trigger_ht_element *trigger_ht_element;
1566
1567 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1568 node) {
1569 int ret = handle_notification_thread_command_unregister_trigger(
1570 state, trigger_ht_element->trigger, NULL);
1571 if (ret) {
4149ace8 1572 error_occurred = true;
ab0ee2ca
JG
1573 }
1574 }
4149ace8 1575 return error_occurred ? -1 : 0;
ab0ee2ca
JG
1576}
1577
1578static
1579int client_flush_outgoing_queue(struct notification_client *client,
1580 struct notification_thread_state *state)
1581{
1582 ssize_t ret;
1583 size_t to_send_count;
1584
1585 assert(client->communication.outbound.buffer.size != 0);
1586 to_send_count = client->communication.outbound.buffer.size;
1587 DBG("[notification-thread] Flushing client (socket fd = %i) outgoing queue",
1588 client->socket);
1589
1590 ret = lttcomm_send_unix_sock_non_block(client->socket,
1591 client->communication.outbound.buffer.data,
1592 to_send_count);
1593 if ((ret < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) ||
1594 (ret > 0 && ret < to_send_count)) {
1595 DBG("[notification-thread] Client (socket fd = %i) outgoing queue could not be completely flushed",
1596 client->socket);
1597 to_send_count -= max(ret, 0);
1598
1599 memcpy(client->communication.outbound.buffer.data,
1600 client->communication.outbound.buffer.data +
1601 client->communication.outbound.buffer.size - to_send_count,
1602 to_send_count);
1603 ret = lttng_dynamic_buffer_set_size(
1604 &client->communication.outbound.buffer,
1605 to_send_count);
1606 if (ret) {
1607 goto error;
1608 }
1609
1610 /*
1611 * We want to be notified whenever there is buffer space
1612 * available to send the rest of the payload.
1613 */
1614 ret = lttng_poll_mod(&state->events, client->socket,
1615 CLIENT_POLL_MASK_IN_OUT);
1616 if (ret) {
1617 goto error;
1618 }
1619 } else if (ret < 0) {
1620 /* Generic error, disconnect the client. */
1621 ERR("[notification-thread] Failed to send flush outgoing queue, disconnecting client (socket fd = %i)",
1622 client->socket);
1623 ret = handle_notification_thread_client_disconnect(
1624 client->socket, state);
1625 if (ret) {
1626 goto error;
1627 }
1628 } else {
1629 /* No error and flushed the queue completely. */
1630 ret = lttng_dynamic_buffer_set_size(
1631 &client->communication.outbound.buffer, 0);
1632 if (ret) {
1633 goto error;
1634 }
1635 ret = lttng_poll_mod(&state->events, client->socket,
1636 CLIENT_POLL_MASK_IN);
1637 if (ret) {
1638 goto error;
1639 }
1640
1641 client->communication.outbound.queued_command_reply = false;
1642 client->communication.outbound.dropped_notification = false;
1643 }
1644
1645 return 0;
1646error:
1647 return -1;
1648}
1649
1650static
1651int client_send_command_reply(struct notification_client *client,
1652 struct notification_thread_state *state,
1653 enum lttng_notification_channel_status status)
1654{
1655 int ret;
1656 struct lttng_notification_channel_command_reply reply = {
1657 .status = (int8_t) status,
1658 };
1659 struct lttng_notification_channel_message msg = {
1660 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY,
1661 .size = sizeof(reply),
1662 };
1663 char buffer[sizeof(msg) + sizeof(reply)];
1664
1665 if (client->communication.outbound.queued_command_reply) {
1666 /* Protocol error. */
1667 goto error;
1668 }
1669
1670 memcpy(buffer, &msg, sizeof(msg));
1671 memcpy(buffer + sizeof(msg), &reply, sizeof(reply));
1672 DBG("[notification-thread] Send command reply (%i)", (int) status);
1673
1674 /* Enqueue buffer to outgoing queue and flush it. */
1675 ret = lttng_dynamic_buffer_append(
1676 &client->communication.outbound.buffer,
1677 buffer, sizeof(buffer));
1678 if (ret) {
1679 goto error;
1680 }
1681
1682 ret = client_flush_outgoing_queue(client, state);
1683 if (ret) {
1684 goto error;
1685 }
1686
1687 if (client->communication.outbound.buffer.size != 0) {
1688 /* Queue could not be emptied. */
1689 client->communication.outbound.queued_command_reply = true;
1690 }
1691
1692 return 0;
1693error:
1694 return -1;
1695}
1696
1697static
1698int client_dispatch_message(struct notification_client *client,
1699 struct notification_thread_state *state)
1700{
1701 int ret = 0;
1702
1703 if (client->communication.inbound.msg_type !=
1704 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE &&
1705 client->communication.inbound.msg_type !=
1706 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN &&
1707 !client->validated) {
1708 WARN("[notification-thread] client attempted a command before handshake");
1709 ret = -1;
1710 goto end;
1711 }
1712
1713 switch (client->communication.inbound.msg_type) {
1714 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN:
1715 {
1716 /*
1717 * Receiving message header. The function will be called again
1718 * once the rest of the message as been received and can be
1719 * interpreted.
1720 */
1721 const struct lttng_notification_channel_message *msg;
1722
1723 assert(sizeof(*msg) ==
1724 client->communication.inbound.buffer.size);
1725 msg = (const struct lttng_notification_channel_message *)
1726 client->communication.inbound.buffer.data;
1727
1728 if (msg->size == 0 || msg->size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) {
1729 ERR("[notification-thread] Invalid notification channel message: length = %u", msg->size);
1730 ret = -1;
1731 goto end;
1732 }
1733
1734 switch (msg->type) {
1735 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
1736 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
1737 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
1738 break;
1739 default:
1740 ret = -1;
1741 ERR("[notification-thread] Invalid notification channel message: unexpected message type");
1742 goto end;
1743 }
1744
1745 client->communication.inbound.bytes_to_receive = msg->size;
1746 client->communication.inbound.msg_type =
1747 (enum lttng_notification_channel_message_type) msg->type;
ab0ee2ca 1748 ret = lttng_dynamic_buffer_set_size(
14fa22f8 1749 &client->communication.inbound.buffer, msg->size);
ab0ee2ca
JG
1750 if (ret) {
1751 goto end;
1752 }
1753 break;
1754 }
1755 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
1756 {
1757 struct lttng_notification_channel_command_handshake *handshake_client;
1758 struct lttng_notification_channel_command_handshake handshake_reply = {
1759 .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
1760 .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
1761 };
1762 struct lttng_notification_channel_message msg_header = {
1763 .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE,
1764 .size = sizeof(handshake_reply),
1765 };
1766 enum lttng_notification_channel_status status =
1767 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
1768 char send_buffer[sizeof(msg_header) + sizeof(handshake_reply)];
1769
1770 memcpy(send_buffer, &msg_header, sizeof(msg_header));
1771 memcpy(send_buffer + sizeof(msg_header), &handshake_reply,
1772 sizeof(handshake_reply));
1773
1774 handshake_client =
1775 (struct lttng_notification_channel_command_handshake *)
1776 client->communication.inbound.buffer.data;
47a32869 1777 client->major = handshake_client->major;
ab0ee2ca
JG
1778 client->minor = handshake_client->minor;
1779 if (!client->communication.inbound.creds_received) {
1780 ERR("[notification-thread] No credentials received from client");
1781 ret = -1;
1782 goto end;
1783 }
1784
1785 client->uid = LTTNG_SOCK_GET_UID_CRED(
1786 &client->communication.inbound.creds);
1787 client->gid = LTTNG_SOCK_GET_GID_CRED(
1788 &client->communication.inbound.creds);
1789 DBG("[notification-thread] Received handshake from client (uid = %u, gid = %u) with version %i.%i",
1790 client->uid, client->gid, (int) client->major,
1791 (int) client->minor);
1792
1793 if (handshake_client->major != LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) {
1794 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNSUPPORTED_VERSION;
1795 }
1796
1797 ret = lttng_dynamic_buffer_append(&client->communication.outbound.buffer,
1798 send_buffer, sizeof(send_buffer));
1799 if (ret) {
1800 ERR("[notification-thread] Failed to send protocol version to notification channel client");
1801 goto end;
1802 }
1803
1804 ret = client_flush_outgoing_queue(client, state);
1805 if (ret) {
1806 goto end;
1807 }
1808
1809 ret = client_send_command_reply(client, state, status);
1810 if (ret) {
1811 ERR("[notification-thread] Failed to send reply to notification channel client");
1812 goto end;
1813 }
1814
1815 /* Set reception state to receive the next message header. */
14fa22f8
JG
1816 ret = client_reset_inbound_state(client);
1817 if (ret) {
1818 ERR("[notification-thread] Failed to reset client communication's inbound state");
1819 goto end;
1820 }
ab0ee2ca
JG
1821 client->validated = true;
1822 break;
1823 }
1824 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
1825 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
1826 {
1827 struct lttng_condition *condition;
1828 enum lttng_notification_channel_status status =
1829 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
1830 const struct lttng_buffer_view condition_view =
1831 lttng_buffer_view_from_dynamic_buffer(
1832 &client->communication.inbound.buffer,
1833 0, -1);
1834 size_t expected_condition_size =
1835 client->communication.inbound.buffer.size;
1836
1837 ret = lttng_condition_create_from_buffer(&condition_view,
1838 &condition);
1839 if (ret != expected_condition_size) {
1840 ERR("[notification-thread] Malformed condition received from client");
1841 goto end;
1842 }
1843
1844 if (client->communication.inbound.msg_type ==
1845 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE) {
ab0ee2ca
JG
1846 ret = notification_thread_client_subscribe(client,
1847 condition, state, &status);
1848 } else {
1849 ret = notification_thread_client_unsubscribe(client,
1850 condition, state, &status);
1851 }
1852 if (ret) {
1853 goto end;
1854 }
1855
1856 ret = client_send_command_reply(client, state, status);
1857 if (ret) {
1858 ERR("[notification-thread] Failed to send reply to notification channel client");
1859 goto end;
1860 }
1861
1862 /* Set reception state to receive the next message header. */
14fa22f8
JG
1863 ret = client_reset_inbound_state(client);
1864 if (ret) {
1865 ERR("[notification-thread] Failed to reset client communication's inbound state");
1866 goto end;
1867 }
ab0ee2ca
JG
1868 break;
1869 }
1870 default:
1871 abort();
1872 }
1873end:
1874 return ret;
1875}
1876
1877/* Incoming data from client. */
1878int handle_notification_thread_client_in(
1879 struct notification_thread_state *state, int socket)
1880{
14fa22f8 1881 int ret = 0;
ab0ee2ca
JG
1882 struct notification_client *client;
1883 ssize_t recv_ret;
1884 size_t offset;
1885
1886 client = get_client_from_socket(socket, state);
1887 if (!client) {
1888 /* Internal error, abort. */
1889 ret = -1;
1890 goto end;
1891 }
1892
14fa22f8
JG
1893 offset = client->communication.inbound.buffer.size -
1894 client->communication.inbound.bytes_to_receive;
01ea340e 1895 if (client->communication.inbound.expect_creds) {
ab0ee2ca
JG
1896 recv_ret = lttcomm_recv_creds_unix_sock(socket,
1897 client->communication.inbound.buffer.data + offset,
1898 client->communication.inbound.bytes_to_receive,
1899 &client->communication.inbound.creds);
1900 if (recv_ret > 0) {
01ea340e 1901 client->communication.inbound.expect_creds = false;
ab0ee2ca
JG
1902 client->communication.inbound.creds_received = true;
1903 }
1904 } else {
1905 recv_ret = lttcomm_recv_unix_sock_non_block(socket,
1906 client->communication.inbound.buffer.data + offset,
1907 client->communication.inbound.bytes_to_receive);
1908 }
1909 if (recv_ret < 0) {
1910 goto error_disconnect_client;
1911 }
1912
1913 client->communication.inbound.bytes_to_receive -= recv_ret;
ab0ee2ca
JG
1914 if (client->communication.inbound.bytes_to_receive == 0) {
1915 ret = client_dispatch_message(client, state);
1916 if (ret) {
1917 /*
1918 * Only returns an error if this client must be
1919 * disconnected.
1920 */
1921 goto error_disconnect_client;
1922 }
1923 } else {
1924 goto end;
1925 }
1926end:
1927 return ret;
1928error_disconnect_client:
1929 ret = handle_notification_thread_client_disconnect(socket, state);
1930 return ret;
1931}
1932
1933/* Client ready to receive outgoing data. */
1934int handle_notification_thread_client_out(
1935 struct notification_thread_state *state, int socket)
1936{
1937 int ret;
1938 struct notification_client *client;
1939
1940 client = get_client_from_socket(socket, state);
1941 if (!client) {
1942 /* Internal error, abort. */
1943 ret = -1;
1944 goto end;
1945 }
1946
1947 ret = client_flush_outgoing_queue(client, state);
1948 if (ret) {
1949 goto end;
1950 }
1951end:
1952 return ret;
1953}
1954
1955static
1956bool evaluate_buffer_usage_condition(struct lttng_condition *condition,
1957 struct channel_state_sample *sample, uint64_t buffer_capacity)
1958{
1959 bool result = false;
1960 uint64_t threshold;
1961 enum lttng_condition_type condition_type;
1962 struct lttng_condition_buffer_usage *use_condition = container_of(
1963 condition, struct lttng_condition_buffer_usage,
1964 parent);
1965
1966 if (!sample) {
1967 goto end;
1968 }
1969
1970 if (use_condition->threshold_bytes.set) {
1971 threshold = use_condition->threshold_bytes.value;
1972 } else {
1973 /*
1974 * Threshold was expressed as a ratio.
1975 *
1976 * TODO the threshold (in bytes) of conditions expressed
1977 * as a ratio of total buffer size could be cached to
1978 * forego this double-multiplication or it could be performed
1979 * as fixed-point math.
1980 *
1981 * Note that caching should accomodate the case where the
1982 * condition applies to multiple channels (i.e. don't assume
1983 * that all channels matching my_chann* have the same size...)
1984 */
1985 threshold = (uint64_t) (use_condition->threshold_ratio.value *
1986 (double) buffer_capacity);
1987 }
1988
1989 condition_type = lttng_condition_get_type(condition);
1990 if (condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW) {
1991 DBG("[notification-thread] Low buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
1992 threshold, sample->highest_usage);
1993
1994 /*
1995 * The low condition should only be triggered once _all_ of the
1996 * streams in a channel have gone below the "low" threshold.
1997 */
1998 if (sample->highest_usage <= threshold) {
1999 result = true;
2000 }
2001 } else {
2002 DBG("[notification-thread] High buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
2003 threshold, sample->highest_usage);
2004
2005 /*
2006 * For high buffer usage scenarios, we want to trigger whenever
2007 * _any_ of the streams has reached the "high" threshold.
2008 */
2009 if (sample->highest_usage >= threshold) {
2010 result = true;
2011 }
2012 }
2013end:
2014 return result;
2015}
2016
2017static
2018int evaluate_condition(struct lttng_condition *condition,
2019 struct lttng_evaluation **evaluation,
2020 struct notification_thread_state *state,
2021 struct channel_state_sample *previous_sample,
2022 struct channel_state_sample *latest_sample,
2023 uint64_t buffer_capacity)
2024{
2025 int ret = 0;
2026 enum lttng_condition_type condition_type;
2027 bool previous_sample_result;
2028 bool latest_sample_result;
2029
2030 condition_type = lttng_condition_get_type(condition);
2031 /* No other condition type supported for the moment. */
2032 assert(condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW ||
2033 condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH);
2034
2035 previous_sample_result = evaluate_buffer_usage_condition(condition,
2036 previous_sample, buffer_capacity);
2037 latest_sample_result = evaluate_buffer_usage_condition(condition,
2038 latest_sample, buffer_capacity);
2039
2040 if (!latest_sample_result ||
2041 (previous_sample_result == latest_sample_result)) {
2042 /*
2043 * Only trigger on a condition evaluation transition.
2044 *
2045 * NOTE: This edge-triggered logic may not be appropriate for
2046 * future condition types.
2047 */
2048 goto end;
2049 }
2050
2051 if (evaluation && latest_sample_result) {
2052 *evaluation = lttng_evaluation_buffer_usage_create(
2053 condition_type,
2054 latest_sample->highest_usage,
2055 buffer_capacity);
2056 if (!*evaluation) {
2057 ret = -1;
2058 goto end;
2059 }
2060 }
2061end:
2062 return ret;
2063}
2064
2065static
2066int client_enqueue_dropped_notification(struct notification_client *client,
2067 struct notification_thread_state *state)
2068{
2069 int ret;
2070 struct lttng_notification_channel_message msg = {
2071 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED,
2072 .size = 0,
2073 };
2074
2075 ret = lttng_dynamic_buffer_append(
2076 &client->communication.outbound.buffer, &msg,
2077 sizeof(msg));
2078 return ret;
2079}
2080
2081static
2082int send_evaluation_to_clients(struct lttng_trigger *trigger,
2083 struct lttng_evaluation *evaluation,
2084 struct notification_client_list* client_list,
2085 struct notification_thread_state *state,
2086 uid_t channel_uid, gid_t channel_gid)
2087{
2088 int ret = 0;
2089 struct lttng_dynamic_buffer msg_buffer;
2090 struct notification_client_list_element *client_list_element, *tmp;
2091 struct lttng_notification *notification;
2092 struct lttng_condition *condition;
2093 ssize_t expected_notification_size, notification_size;
2094 struct lttng_notification_channel_message msg;
2095
2096 lttng_dynamic_buffer_init(&msg_buffer);
2097
2098 condition = lttng_trigger_get_condition(trigger);
2099 assert(condition);
2100
2101 notification = lttng_notification_create(condition, evaluation);
2102 if (!notification) {
2103 ret = -1;
2104 goto end;
2105 }
2106
2107 expected_notification_size = lttng_notification_serialize(notification,
2108 NULL);
2109 if (expected_notification_size < 0) {
2110 ERR("[notification-thread] Failed to get size of serialized notification");
2111 ret = -1;
2112 goto end;
2113 }
2114
2115 msg.type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION;
2116 msg.size = (uint32_t) expected_notification_size;
2117 ret = lttng_dynamic_buffer_append(&msg_buffer, &msg, sizeof(msg));
2118 if (ret) {
2119 goto end;
2120 }
2121
2122 ret = lttng_dynamic_buffer_set_size(&msg_buffer,
2123 msg_buffer.size + expected_notification_size);
2124 if (ret) {
2125 goto end;
2126 }
2127
2128 notification_size = lttng_notification_serialize(notification,
2129 msg_buffer.data + sizeof(msg));
2130 if (notification_size != expected_notification_size) {
2131 ERR("[notification-thread] Failed to serialize notification");
2132 ret = -1;
2133 goto end;
2134 }
2135
2136 cds_list_for_each_entry_safe(client_list_element, tmp,
2137 &client_list->list, node) {
2138 struct notification_client *client =
2139 client_list_element->client;
2140
2141 if (client->uid != channel_uid && client->gid != channel_gid &&
2142 client->uid != 0) {
2143 /* Client is not allowed to monitor this channel. */
2144 DBG("[notification-thread] Skipping client at it does not have the permission to receive notification for this channel");
2145 continue;
2146 }
2147
2148 DBG("[notification-thread] Sending notification to client (fd = %i, %zu bytes)",
2149 client->socket, msg_buffer.size);
2150 if (client->communication.outbound.buffer.size) {
2151 /*
2152 * Outgoing data is already buffered for this client;
2153 * drop the notification and enqueue a "dropped
2154 * notification" message if this is the first dropped
2155 * notification since the socket spilled-over to the
2156 * queue.
2157 */
2158 DBG("[notification-thread] Dropping notification addressed to client (socket fd = %i)",
2159 client->socket);
2160 if (!client->communication.outbound.dropped_notification) {
2161 client->communication.outbound.dropped_notification = true;
2162 ret = client_enqueue_dropped_notification(
2163 client, state);
2164 if (ret) {
2165 goto end;
2166 }
2167 }
2168 continue;
2169 }
2170
2171 ret = lttng_dynamic_buffer_append_buffer(
2172 &client->communication.outbound.buffer,
2173 &msg_buffer);
2174 if (ret) {
2175 goto end;
2176 }
2177
2178 ret = client_flush_outgoing_queue(client, state);
2179 if (ret) {
2180 goto end;
2181 }
2182 }
2183 ret = 0;
2184end:
2185 lttng_notification_destroy(notification);
2186 lttng_dynamic_buffer_reset(&msg_buffer);
2187 return ret;
2188}
2189
2190int handle_notification_thread_channel_sample(
2191 struct notification_thread_state *state, int pipe,
2192 enum lttng_domain_type domain)
2193{
2194 int ret = 0;
2195 struct lttcomm_consumer_channel_monitor_msg sample_msg;
2196 struct channel_state_sample previous_sample, latest_sample;
2197 struct channel_info *channel_info;
2198 struct cds_lfht_node *node;
2199 struct cds_lfht_iter iter;
2200 struct lttng_channel_trigger_list *trigger_list;
2201 struct lttng_trigger_list_element *trigger_list_element;
2202 bool previous_sample_available = false;
2203
2204 /*
2205 * The monitoring pipe only holds messages smaller than PIPE_BUF,
2206 * ensuring that read/write of sampling messages are atomic.
2207 */
7c2551ef 2208 ret = lttng_read(pipe, &sample_msg, sizeof(sample_msg));
ab0ee2ca
JG
2209 if (ret != sizeof(sample_msg)) {
2210 ERR("[notification-thread] Failed to read from monitoring pipe (fd = %i)",
2211 pipe);
2212 ret = -1;
2213 goto end;
2214 }
2215
2216 ret = 0;
2217 latest_sample.key.key = sample_msg.key;
2218 latest_sample.key.domain = domain;
2219 latest_sample.highest_usage = sample_msg.highest;
2220 latest_sample.lowest_usage = sample_msg.lowest;
2221
2222 rcu_read_lock();
2223
2224 /* Retrieve the channel's informations */
2225 cds_lfht_lookup(state->channels_ht,
2226 hash_channel_key(&latest_sample.key),
2227 match_channel_info,
2228 &latest_sample.key,
2229 &iter);
2230 node = cds_lfht_iter_get_node(&iter);
2231 if (!node) {
2232 /*
2233 * Not an error since the consumer can push a sample to the pipe
2234 * and the rest of the session daemon could notify us of the
2235 * channel's destruction before we get a chance to process that
2236 * sample.
2237 */
2238 DBG("[notification-thread] Received a sample for an unknown channel from consumerd, key = %" PRIu64 " in %s domain",
2239 latest_sample.key.key,
2240 domain == LTTNG_DOMAIN_KERNEL ? "kernel" :
2241 "user space");
2242 goto end_unlock;
2243 }
2244 channel_info = caa_container_of(node, struct channel_info,
2245 channels_ht_node);
2246 DBG("[notification-thread] Handling channel sample for channel %s (key = %" PRIu64 ") in session %s (highest usage = %" PRIu64 ", lowest usage = %" PRIu64")",
2247 channel_info->channel_name,
2248 latest_sample.key.key,
2249 channel_info->session_name,
2250 latest_sample.highest_usage,
2251 latest_sample.lowest_usage);
2252
2253 /* Retrieve the channel's last sample, if it exists, and update it. */
2254 cds_lfht_lookup(state->channel_state_ht,
2255 hash_channel_key(&latest_sample.key),
2256 match_channel_state_sample,
2257 &latest_sample.key,
2258 &iter);
2259 node = cds_lfht_iter_get_node(&iter);
2260 if (node) {
2261 struct channel_state_sample *stored_sample;
2262
2263 /* Update the sample stored. */
2264 stored_sample = caa_container_of(node,
2265 struct channel_state_sample,
2266 channel_state_ht_node);
2267 memcpy(&previous_sample, stored_sample,
2268 sizeof(previous_sample));
2269 stored_sample->highest_usage = latest_sample.highest_usage;
2270 stored_sample->lowest_usage = latest_sample.lowest_usage;
2271 previous_sample_available = true;
2272 } else {
2273 /*
2274 * This is the channel's first sample, allocate space for and
2275 * store the new sample.
2276 */
2277 struct channel_state_sample *stored_sample;
2278
2279 stored_sample = zmalloc(sizeof(*stored_sample));
2280 if (!stored_sample) {
2281 ret = -1;
2282 goto end_unlock;
2283 }
2284
2285 memcpy(stored_sample, &latest_sample, sizeof(*stored_sample));
2286 cds_lfht_node_init(&stored_sample->channel_state_ht_node);
2287 cds_lfht_add(state->channel_state_ht,
2288 hash_channel_key(&stored_sample->key),
2289 &stored_sample->channel_state_ht_node);
2290 }
2291
2292 /* Find triggers associated with this channel. */
2293 cds_lfht_lookup(state->channel_triggers_ht,
2294 hash_channel_key(&latest_sample.key),
2295 match_channel_trigger_list,
2296 &latest_sample.key,
2297 &iter);
2298 node = cds_lfht_iter_get_node(&iter);
2299 if (!node) {
2300 goto end_unlock;
2301 }
2302
2303 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
2304 channel_triggers_ht_node);
2305 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
2306 node) {
2307 struct lttng_condition *condition;
2308 struct lttng_action *action;
2309 struct lttng_trigger *trigger;
2310 struct notification_client_list *client_list;
2311 struct lttng_evaluation *evaluation = NULL;
2312
2313 trigger = trigger_list_element->trigger;
2314 condition = lttng_trigger_get_condition(trigger);
2315 assert(condition);
2316 action = lttng_trigger_get_action(trigger);
2317
2318 /* Notify actions are the only type currently supported. */
2319 assert(lttng_action_get_type(action) ==
2320 LTTNG_ACTION_TYPE_NOTIFY);
2321
2322 /*
2323 * Check if any client is subscribed to the result of this
2324 * evaluation.
2325 */
2326 cds_lfht_lookup(state->notification_trigger_clients_ht,
2327 lttng_condition_hash(condition),
2328 match_client_list,
2329 trigger,
2330 &iter);
2331 node = cds_lfht_iter_get_node(&iter);
2332 assert(node);
2333
2334 client_list = caa_container_of(node,
2335 struct notification_client_list,
2336 notification_trigger_ht_node);
2337 if (cds_list_empty(&client_list->list)) {
2338 /*
2339 * No clients interested in the evaluation's result,
2340 * skip it.
2341 */
2342 continue;
2343 }
2344
2345 ret = evaluate_condition(condition, &evaluation, state,
2346 previous_sample_available ? &previous_sample : NULL,
2347 &latest_sample, channel_info->capacity);
2348 if (ret) {
2349 goto end_unlock;
2350 }
2351
2352 if (!evaluation) {
2353 continue;
2354 }
2355
2356 /* Dispatch evaluation result to all clients. */
2357 ret = send_evaluation_to_clients(trigger_list_element->trigger,
2358 evaluation, client_list, state,
2359 channel_info->uid, channel_info->gid);
2360 if (ret) {
2361 goto end_unlock;
2362 }
2363 }
2364end_unlock:
2365 rcu_read_unlock();
2366end:
2367 return ret;
2368}
This page took 0.205648 seconds and 4 git commands to generate.