Fix: evaluate condition/trigger on subscription
[lttng-tools.git] / src / bin / lttng-sessiond / notification-thread-events.c
CommitLineData
ab0ee2ca
JG
1/*
2 * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18#define _LGPL_SOURCE
19#include <urcu.h>
20#include <urcu/rculfhash.h>
21
ab0ee2ca
JG
22#include <common/defaults.h>
23#include <common/error.h>
24#include <common/futex.h>
25#include <common/unix.h>
26#include <common/dynamic-buffer.h>
27#include <common/hashtable/utils.h>
28#include <common/sessiond-comm/sessiond-comm.h>
29#include <common/macros.h>
30#include <lttng/condition/condition.h>
31#include <lttng/action/action.h>
32#include <lttng/notification/notification-internal.h>
33#include <lttng/condition/condition-internal.h>
34#include <lttng/condition/buffer-usage-internal.h>
35#include <lttng/notification/channel-internal.h>
1da26331 36
ab0ee2ca
JG
37#include <time.h>
38#include <unistd.h>
39#include <assert.h>
40#include <inttypes.h>
d53ea4e4 41#include <fcntl.h>
ab0ee2ca 42
1da26331
JG
43#include "notification-thread.h"
44#include "notification-thread-events.h"
45#include "notification-thread-commands.h"
46#include "lttng-sessiond.h"
47#include "kernel.h"
48
ab0ee2ca
JG
49#define CLIENT_POLL_MASK_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
50#define CLIENT_POLL_MASK_IN_OUT (CLIENT_POLL_MASK_IN | LPOLLOUT)
51
52struct lttng_trigger_list_element {
53 struct lttng_trigger *trigger;
54 struct cds_list_head node;
55};
56
57struct lttng_channel_trigger_list {
58 struct channel_key channel_key;
59 struct cds_list_head list;
60 struct cds_lfht_node channel_triggers_ht_node;
61};
62
63struct lttng_trigger_ht_element {
64 struct lttng_trigger *trigger;
65 struct cds_lfht_node node;
66};
67
68struct lttng_condition_list_element {
69 struct lttng_condition *condition;
70 struct cds_list_head node;
71};
72
73struct notification_client_list_element {
74 struct notification_client *client;
75 struct cds_list_head node;
76};
77
78struct notification_client_list {
79 struct lttng_trigger *trigger;
80 struct cds_list_head list;
81 struct cds_lfht_node notification_trigger_ht_node;
82};
83
84struct notification_client {
85 int socket;
86 /* Client protocol version. */
87 uint8_t major, minor;
88 uid_t uid;
89 gid_t gid;
90 /*
91 * Indicates if the credentials and versions of the client has been
92 * checked.
93 */
94 bool validated;
95 /*
96 * Conditions to which the client's notification channel is subscribed.
97 * List of struct lttng_condition_list_node. The condition member is
98 * owned by the client.
99 */
100 struct cds_list_head condition_list;
101 struct cds_lfht_node client_socket_ht_node;
102 struct {
103 struct {
14fa22f8
JG
104 /*
105 * During the reception of a message, the reception
106 * buffers' "size" is set to contain the current
107 * message's complete payload.
108 */
ab0ee2ca
JG
109 struct lttng_dynamic_buffer buffer;
110 /* Bytes left to receive for the current message. */
111 size_t bytes_to_receive;
112 /* Type of the message being received. */
113 enum lttng_notification_channel_message_type msg_type;
114 /*
115 * Indicates whether or not credentials are expected
116 * from the client.
117 */
01ea340e 118 bool expect_creds;
ab0ee2ca
JG
119 /*
120 * Indicates whether or not credentials were received
121 * from the client.
122 */
123 bool creds_received;
14fa22f8 124 /* Only used during credentials reception. */
ab0ee2ca
JG
125 lttng_sock_cred creds;
126 } inbound;
127 struct {
128 /*
129 * Indicates whether or not a notification addressed to
130 * this client was dropped because a command reply was
131 * already buffered.
132 *
133 * A notification is dropped whenever the buffer is not
134 * empty.
135 */
136 bool dropped_notification;
137 /*
138 * Indicates whether or not a command reply is already
139 * buffered. In this case, it means that the client is
140 * not consuming command replies before emitting a new
141 * one. This could be caused by a protocol error or a
142 * misbehaving/malicious client.
143 */
144 bool queued_command_reply;
145 struct lttng_dynamic_buffer buffer;
146 } outbound;
147 } communication;
148};
149
150struct channel_state_sample {
151 struct channel_key key;
152 struct cds_lfht_node channel_state_ht_node;
153 uint64_t highest_usage;
154 uint64_t lowest_usage;
155};
156
e4db5ace
JR
157static unsigned long hash_channel_key(struct channel_key *key);
158static int evaluate_condition(struct lttng_condition *condition,
159 struct lttng_evaluation **evaluation,
160 struct notification_thread_state *state,
161 struct channel_state_sample *previous_sample,
162 struct channel_state_sample *latest_sample,
163 uint64_t buffer_capacity);
164static
165int send_evaluation_to_clients(struct lttng_trigger *trigger,
166 struct lttng_evaluation *evaluation,
167 struct notification_client_list *client_list,
168 struct notification_thread_state *state,
169 uid_t channel_uid, gid_t channel_gid);
170
ab0ee2ca
JG
171static
172int match_client(struct cds_lfht_node *node, const void *key)
173{
174 /* This double-cast is intended to supress pointer-to-cast warning. */
175 int socket = (int) (intptr_t) key;
176 struct notification_client *client;
177
178 client = caa_container_of(node, struct notification_client,
179 client_socket_ht_node);
180
181 return !!(client->socket == socket);
182}
183
184static
185int match_channel_trigger_list(struct cds_lfht_node *node, const void *key)
186{
187 struct channel_key *channel_key = (struct channel_key *) key;
188 struct lttng_channel_trigger_list *trigger_list;
189
190 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
191 channel_triggers_ht_node);
192
193 return !!((channel_key->key == trigger_list->channel_key.key) &&
194 (channel_key->domain == trigger_list->channel_key.domain));
195}
196
197static
198int match_channel_state_sample(struct cds_lfht_node *node, const void *key)
199{
200 struct channel_key *channel_key = (struct channel_key *) key;
201 struct channel_state_sample *sample;
202
203 sample = caa_container_of(node, struct channel_state_sample,
204 channel_state_ht_node);
205
206 return !!((channel_key->key == sample->key.key) &&
207 (channel_key->domain == sample->key.domain));
208}
209
210static
211int match_channel_info(struct cds_lfht_node *node, const void *key)
212{
213 struct channel_key *channel_key = (struct channel_key *) key;
214 struct channel_info *channel_info;
215
216 channel_info = caa_container_of(node, struct channel_info,
217 channels_ht_node);
218
219 return !!((channel_key->key == channel_info->key.key) &&
220 (channel_key->domain == channel_info->key.domain));
221}
222
223static
224int match_condition(struct cds_lfht_node *node, const void *key)
225{
226 struct lttng_condition *condition_key = (struct lttng_condition *) key;
227 struct lttng_trigger_ht_element *trigger;
228 struct lttng_condition *condition;
229
230 trigger = caa_container_of(node, struct lttng_trigger_ht_element,
231 node);
232 condition = lttng_trigger_get_condition(trigger->trigger);
233 assert(condition);
234
235 return !!lttng_condition_is_equal(condition_key, condition);
236}
237
238static
239int match_client_list(struct cds_lfht_node *node, const void *key)
240{
241 struct lttng_trigger *trigger_key = (struct lttng_trigger *) key;
242 struct notification_client_list *client_list;
243 struct lttng_condition *condition;
244 struct lttng_condition *condition_key = lttng_trigger_get_condition(
245 trigger_key);
246
247 assert(condition_key);
248
249 client_list = caa_container_of(node, struct notification_client_list,
250 notification_trigger_ht_node);
251 condition = lttng_trigger_get_condition(client_list->trigger);
252
253 return !!lttng_condition_is_equal(condition_key, condition);
254}
255
256static
257int match_client_list_condition(struct cds_lfht_node *node, const void *key)
258{
259 struct lttng_condition *condition_key = (struct lttng_condition *) key;
260 struct notification_client_list *client_list;
261 struct lttng_condition *condition;
262
263 assert(condition_key);
264
265 client_list = caa_container_of(node, struct notification_client_list,
266 notification_trigger_ht_node);
267 condition = lttng_trigger_get_condition(client_list->trigger);
268
269 return !!lttng_condition_is_equal(condition_key, condition);
270}
271
272static
273unsigned long lttng_condition_buffer_usage_hash(
274 struct lttng_condition *_condition)
275{
276 unsigned long hash = 0;
277 struct lttng_condition_buffer_usage *condition;
278
279 condition = container_of(_condition,
280 struct lttng_condition_buffer_usage, parent);
281
282 if (condition->session_name) {
283 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
284 }
285 if (condition->channel_name) {
8f56701f 286 hash ^= hash_key_str(condition->channel_name, lttng_ht_seed);
ab0ee2ca
JG
287 }
288 if (condition->domain.set) {
289 hash ^= hash_key_ulong(
290 (void *) condition->domain.type,
291 lttng_ht_seed);
292 }
293 if (condition->threshold_ratio.set) {
294 uint64_t val;
295
296 val = condition->threshold_ratio.value * (double) UINT32_MAX;
297 hash ^= hash_key_u64(&val, lttng_ht_seed);
298 } else if (condition->threshold_ratio.set) {
299 uint64_t val;
300
301 val = condition->threshold_bytes.value;
302 hash ^= hash_key_u64(&val, lttng_ht_seed);
303 }
304 return hash;
305}
306
307/*
308 * The lttng_condition hashing code is kept in this file (rather than
309 * condition.c) since it makes use of GPLv2 code (hashtable utils), which we
310 * don't want to link in liblttng-ctl.
311 */
312static
313unsigned long lttng_condition_hash(struct lttng_condition *condition)
314{
315 switch (condition->type) {
316 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
317 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
318 return lttng_condition_buffer_usage_hash(condition);
319 default:
320 ERR("[notification-thread] Unexpected condition type caught");
321 abort();
322 }
323}
324
e4db5ace
JR
325static
326unsigned long hash_channel_key(struct channel_key *key)
327{
328 unsigned long key_hash = hash_key_u64(&key->key, lttng_ht_seed);
329 unsigned long domain_hash = hash_key_ulong(
330 (void *) (unsigned long) key->domain, lttng_ht_seed);
331
332 return key_hash ^ domain_hash;
333}
334
ab0ee2ca
JG
335static
336void channel_info_destroy(struct channel_info *channel_info)
337{
338 if (!channel_info) {
339 return;
340 }
341
342 if (channel_info->session_name) {
343 free(channel_info->session_name);
344 }
345 if (channel_info->channel_name) {
346 free(channel_info->channel_name);
347 }
348 free(channel_info);
349}
350
351static
352struct channel_info *channel_info_copy(struct channel_info *channel_info)
353{
354 struct channel_info *copy = zmalloc(sizeof(*channel_info));
355
356 assert(channel_info);
357 assert(channel_info->session_name);
358 assert(channel_info->channel_name);
359
360 if (!copy) {
361 goto end;
362 }
363
364 memcpy(copy, channel_info, sizeof(*channel_info));
365 copy->session_name = NULL;
366 copy->channel_name = NULL;
367
368 copy->session_name = strdup(channel_info->session_name);
369 if (!copy->session_name) {
370 goto error;
371 }
372 copy->channel_name = strdup(channel_info->channel_name);
373 if (!copy->channel_name) {
374 goto error;
375 }
376 cds_lfht_node_init(&channel_info->channels_ht_node);
377end:
378 return copy;
379error:
380 channel_info_destroy(copy);
381 return NULL;
382}
383
e4db5ace
JR
384/* This function must be called with the RCU read lock held. */
385static
386int evaluate_condition_for_client(struct lttng_trigger *trigger,
387 struct lttng_condition *condition,
388 struct notification_client *client,
389 struct notification_thread_state *state)
390{
391 int ret;
392 struct cds_lfht_iter iter;
393 struct cds_lfht_node *node;
394 struct channel_info *channel_info = NULL;
395 struct channel_key *channel_key = NULL;
396 struct channel_state_sample *last_sample = NULL;
397 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
398 struct lttng_evaluation *evaluation = NULL;
399 struct notification_client_list client_list = { 0 };
400 struct notification_client_list_element client_list_element = { 0 };
401
402 assert(trigger);
403 assert(condition);
404 assert(client);
405 assert(state);
406
407 /* Find the channel associated with the trigger. */
408 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter,
409 channel_trigger_list , channel_triggers_ht_node) {
410 struct lttng_trigger_list_element *element;
411
412 cds_list_for_each_entry(element, &channel_trigger_list->list, node) {
413 struct lttng_condition *current_condition =
414 lttng_trigger_get_condition(
415 element->trigger);
416
417 assert(current_condition);
418 if (!lttng_condition_is_equal(condition,
419 current_condition)) {
420 continue;
421 }
422
423 /* Found the trigger, save the channel key. */
424 channel_key = &channel_trigger_list->channel_key;
425 break;
426 }
427 if (channel_key) {
428 /* The channel key was found stop iteration. */
429 break;
430 }
431 }
432
433 if (!channel_key){
434 /* No channel found; normal exit. */
435 DBG("[notification-thread] No channel associated with newly subscribed-to condition");
436 ret = 0;
437 goto end;
438 }
439
440 /* Fetch channel info for the matching channel. */
441 cds_lfht_lookup(state->channels_ht,
442 hash_channel_key(channel_key),
443 match_channel_info,
444 channel_key,
445 &iter);
446 node = cds_lfht_iter_get_node(&iter);
447 assert(node);
448 channel_info = caa_container_of(node, struct channel_info,
449 channels_ht_node);
450
451 /* Retrieve the channel's last sample, if it exists. */
452 cds_lfht_lookup(state->channel_state_ht,
453 hash_channel_key(channel_key),
454 match_channel_state_sample,
455 channel_key,
456 &iter);
457 node = cds_lfht_iter_get_node(&iter);
458 if (node) {
459 last_sample = caa_container_of(node,
460 struct channel_state_sample,
461 channel_state_ht_node);
462 } else {
463 /* Nothing to evaluate, no sample was ever taken. Normal exit */
464 DBG("[notification-thread] No channel sample associated with newly subscribed-to condition");
465 ret = 0;
466 goto end;
467 }
468
469 ret = evaluate_condition(condition, &evaluation, state, NULL,
470 last_sample, channel_info->capacity);
471 if (ret) {
472 WARN("[notification-thread] Fatal error occured while evaluating a newly subscribed-to condition");
473 goto end;
474 }
475
476 if (!evaluation) {
477 /* Evaluation yielded nothing. Normal exit. */
478 DBG("[notification-thread] Newly subscribed-to condition evaluated to false, nothing to report to client");
479 ret = 0;
480 goto end;
481 }
482
483 /*
484 * Create a temporary client list with the client currently
485 * subscribing.
486 */
487 cds_lfht_node_init(&client_list.notification_trigger_ht_node);
488 CDS_INIT_LIST_HEAD(&client_list.list);
489 client_list.trigger = trigger;
490
491 CDS_INIT_LIST_HEAD(&client_list_element.node);
492 client_list_element.client = client;
493 cds_list_add(&client_list_element.node, &client_list.list);
494
495 /* Send evaluation result to the newly-subscribed client. */
496 DBG("[notification-thread] Newly subscribed-to condition evaluated to true, notifying client");
497 ret = send_evaluation_to_clients(trigger, evaluation, &client_list,
498 state, channel_info->uid, channel_info->gid);
499
500end:
501 return ret;
502}
503
ab0ee2ca
JG
504static
505int notification_thread_client_subscribe(struct notification_client *client,
506 struct lttng_condition *condition,
507 struct notification_thread_state *state,
508 enum lttng_notification_channel_status *_status)
509{
510 int ret = 0;
511 struct cds_lfht_iter iter;
512 struct cds_lfht_node *node;
513 struct notification_client_list *client_list;
514 struct lttng_condition_list_element *condition_list_element = NULL;
515 struct notification_client_list_element *client_list_element = NULL;
516 enum lttng_notification_channel_status status =
517 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
518
519 /*
520 * Ensure that the client has not already subscribed to this condition
521 * before.
522 */
523 cds_list_for_each_entry(condition_list_element, &client->condition_list, node) {
524 if (lttng_condition_is_equal(condition_list_element->condition,
525 condition)) {
526 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ALREADY_SUBSCRIBED;
527 goto end;
528 }
529 }
530
531 condition_list_element = zmalloc(sizeof(*condition_list_element));
532 if (!condition_list_element) {
533 ret = -1;
534 goto error;
535 }
536 client_list_element = zmalloc(sizeof(*client_list_element));
537 if (!client_list_element) {
538 ret = -1;
539 goto error;
540 }
541
542 rcu_read_lock();
543
544 /*
545 * Add the newly-subscribed condition to the client's subscription list.
546 */
547 CDS_INIT_LIST_HEAD(&condition_list_element->node);
548 condition_list_element->condition = condition;
549 cds_list_add(&condition_list_element->node, &client->condition_list);
550
ab0ee2ca
JG
551 cds_lfht_lookup(state->notification_trigger_clients_ht,
552 lttng_condition_hash(condition),
553 match_client_list_condition,
554 condition,
555 &iter);
556 node = cds_lfht_iter_get_node(&iter);
557 if (!node) {
4fb43b68 558 free(client_list_element);
ab0ee2ca
JG
559 goto end_unlock;
560 }
561
562 client_list = caa_container_of(node, struct notification_client_list,
563 notification_trigger_ht_node);
e4db5ace
JR
564 if (evaluate_condition_for_client(client_list->trigger, condition,
565 client, state)) {
566 WARN("[notification-thread] Evaluation of a condition on client subscription failed, aborting.");
567 ret = -1;
568 goto end_unlock;
569 }
570
571 /*
572 * Add the client to the list of clients interested in a given trigger
573 * if a "notification" trigger with a corresponding condition was
574 * added prior.
575 */
ab0ee2ca
JG
576 client_list_element->client = client;
577 CDS_INIT_LIST_HEAD(&client_list_element->node);
578 cds_list_add(&client_list_element->node, &client_list->list);
579end_unlock:
580 rcu_read_unlock();
581end:
582 if (_status) {
583 *_status = status;
584 }
585 return ret;
586error:
587 free(condition_list_element);
588 free(client_list_element);
589 return ret;
590}
591
592static
593int notification_thread_client_unsubscribe(
594 struct notification_client *client,
595 struct lttng_condition *condition,
596 struct notification_thread_state *state,
597 enum lttng_notification_channel_status *_status)
598{
599 struct cds_lfht_iter iter;
600 struct cds_lfht_node *node;
601 struct notification_client_list *client_list;
602 struct lttng_condition_list_element *condition_list_element,
603 *condition_tmp;
604 struct notification_client_list_element *client_list_element,
605 *client_tmp;
606 bool condition_found = false;
607 enum lttng_notification_channel_status status =
608 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
609
610 /* Remove the condition from the client's condition list. */
611 cds_list_for_each_entry_safe(condition_list_element, condition_tmp,
612 &client->condition_list, node) {
613 if (!lttng_condition_is_equal(condition_list_element->condition,
614 condition)) {
615 continue;
616 }
617
618 cds_list_del(&condition_list_element->node);
619 /*
620 * The caller may be iterating on the client's conditions to
621 * tear down a client's connection. In this case, the condition
622 * will be destroyed at the end.
623 */
624 if (condition != condition_list_element->condition) {
625 lttng_condition_destroy(
626 condition_list_element->condition);
627 }
628 free(condition_list_element);
629 condition_found = true;
630 break;
631 }
632
633 if (!condition_found) {
634 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNKNOWN_CONDITION;
635 goto end;
636 }
637
638 /*
639 * Remove the client from the list of clients interested the trigger
640 * matching the condition.
641 */
642 rcu_read_lock();
643 cds_lfht_lookup(state->notification_trigger_clients_ht,
644 lttng_condition_hash(condition),
645 match_client_list_condition,
646 condition,
647 &iter);
648 node = cds_lfht_iter_get_node(&iter);
649 if (!node) {
650 goto end_unlock;
651 }
652
653 client_list = caa_container_of(node, struct notification_client_list,
654 notification_trigger_ht_node);
655 cds_list_for_each_entry_safe(client_list_element, client_tmp,
656 &client_list->list, node) {
657 if (client_list_element->client->socket != client->socket) {
658 continue;
659 }
660 cds_list_del(&client_list_element->node);
661 free(client_list_element);
662 break;
663 }
664end_unlock:
665 rcu_read_unlock();
666end:
667 lttng_condition_destroy(condition);
668 if (_status) {
669 *_status = status;
670 }
671 return 0;
672}
673
674static
675void notification_client_destroy(struct notification_client *client,
676 struct notification_thread_state *state)
677{
678 struct lttng_condition_list_element *condition_list_element, *tmp;
679
680 if (!client) {
681 return;
682 }
683
684 /* Release all conditions to which the client was subscribed. */
685 cds_list_for_each_entry_safe(condition_list_element, tmp,
686 &client->condition_list, node) {
687 (void) notification_thread_client_unsubscribe(client,
688 condition_list_element->condition, state, NULL);
689 }
690
691 if (client->socket >= 0) {
692 (void) lttcomm_close_unix_sock(client->socket);
693 }
694 lttng_dynamic_buffer_reset(&client->communication.inbound.buffer);
695 lttng_dynamic_buffer_reset(&client->communication.outbound.buffer);
696 free(client);
697}
698
699/*
700 * Call with rcu_read_lock held (and hold for the lifetime of the returned
701 * client pointer).
702 */
703static
704struct notification_client *get_client_from_socket(int socket,
705 struct notification_thread_state *state)
706{
707 struct cds_lfht_iter iter;
708 struct cds_lfht_node *node;
709 struct notification_client *client = NULL;
710
711 cds_lfht_lookup(state->client_socket_ht,
712 hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed),
713 match_client,
714 (void *) (unsigned long) socket,
715 &iter);
716 node = cds_lfht_iter_get_node(&iter);
717 if (!node) {
718 goto end;
719 }
720
721 client = caa_container_of(node, struct notification_client,
722 client_socket_ht_node);
723end:
724 return client;
725}
726
727static
728bool trigger_applies_to_channel(struct lttng_trigger *trigger,
729 struct channel_info *info)
730{
731 enum lttng_condition_status status;
732 struct lttng_condition *condition;
733 const char *trigger_session_name = NULL;
734 const char *trigger_channel_name = NULL;
735 enum lttng_domain_type trigger_domain;
736
737 condition = lttng_trigger_get_condition(trigger);
738 if (!condition) {
739 goto fail;
740 }
741
742 switch (lttng_condition_get_type(condition)) {
743 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
744 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
745 break;
746 default:
747 goto fail;
748 }
749
750 status = lttng_condition_buffer_usage_get_domain_type(condition,
751 &trigger_domain);
752 assert(status == LTTNG_CONDITION_STATUS_OK);
753 if (info->key.domain != trigger_domain) {
754 goto fail;
755 }
756
757 status = lttng_condition_buffer_usage_get_session_name(
758 condition, &trigger_session_name);
759 assert((status == LTTNG_CONDITION_STATUS_OK) && trigger_session_name);
760
761 status = lttng_condition_buffer_usage_get_channel_name(
762 condition, &trigger_channel_name);
763 assert((status == LTTNG_CONDITION_STATUS_OK) && trigger_channel_name);
764
765 if (strcmp(info->session_name, trigger_session_name)) {
766 goto fail;
767 }
768 if (strcmp(info->channel_name, trigger_channel_name)) {
769 goto fail;
770 }
771
772 return true;
773fail:
774 return false;
775}
776
777static
778bool trigger_applies_to_client(struct lttng_trigger *trigger,
779 struct notification_client *client)
780{
781 bool applies = false;
782 struct lttng_condition_list_element *condition_list_element;
783
784 cds_list_for_each_entry(condition_list_element, &client->condition_list,
785 node) {
786 applies = lttng_condition_is_equal(
787 condition_list_element->condition,
788 lttng_trigger_get_condition(trigger));
789 if (applies) {
790 break;
791 }
792 }
793 return applies;
794}
795
ab0ee2ca
JG
796static
797int handle_notification_thread_command_add_channel(
798 struct notification_thread_state *state,
799 struct channel_info *channel_info,
800 enum lttng_error_code *cmd_result)
801{
802 struct cds_list_head trigger_list;
803 struct channel_info *new_channel_info;
804 struct channel_key *channel_key;
805 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
806 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
807 int trigger_count = 0;
808 struct cds_lfht_iter iter;
809
810 DBG("[notification-thread] Adding channel %s from session %s, channel key = %" PRIu64 " in %s domain",
811 channel_info->channel_name, channel_info->session_name,
812 channel_info->key.key, channel_info->key.domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
813
814 CDS_INIT_LIST_HEAD(&trigger_list);
815
816 new_channel_info = channel_info_copy(channel_info);
817 if (!new_channel_info) {
818 goto error;
819 }
820
821 channel_key = &new_channel_info->key;
822
823 /* Build a list of all triggers applying to the new channel. */
824 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
825 node) {
826 struct lttng_trigger_list_element *new_element;
827
828 if (!trigger_applies_to_channel(trigger_ht_element->trigger,
829 channel_info)) {
830 continue;
831 }
832
833 new_element = zmalloc(sizeof(*new_element));
834 if (!new_element) {
835 goto error;
836 }
837 CDS_INIT_LIST_HEAD(&new_element->node);
838 new_element->trigger = trigger_ht_element->trigger;
839 cds_list_add(&new_element->node, &trigger_list);
840 trigger_count++;
841 }
842
843 DBG("[notification-thread] Found %i triggers that apply to newly added channel",
844 trigger_count);
845 channel_trigger_list = zmalloc(sizeof(*channel_trigger_list));
846 if (!channel_trigger_list) {
847 goto error;
848 }
849 channel_trigger_list->channel_key = *channel_key;
850 CDS_INIT_LIST_HEAD(&channel_trigger_list->list);
851 cds_lfht_node_init(&channel_trigger_list->channel_triggers_ht_node);
852 cds_list_splice(&trigger_list, &channel_trigger_list->list);
853
854 rcu_read_lock();
855 /* Add channel to the channel_ht which owns the channel_infos. */
856 cds_lfht_add(state->channels_ht,
857 hash_channel_key(channel_key),
858 &new_channel_info->channels_ht_node);
859 /*
860 * Add the list of triggers associated with this channel to the
861 * channel_triggers_ht.
862 */
863 cds_lfht_add(state->channel_triggers_ht,
864 hash_channel_key(channel_key),
865 &channel_trigger_list->channel_triggers_ht_node);
866 rcu_read_unlock();
867 *cmd_result = LTTNG_OK;
868 return 0;
869error:
870 /* Empty trigger list */
871 channel_info_destroy(new_channel_info);
872 return 1;
873}
874
875static
876int handle_notification_thread_command_remove_channel(
877 struct notification_thread_state *state,
878 uint64_t channel_key, enum lttng_domain_type domain,
879 enum lttng_error_code *cmd_result)
880{
881 struct cds_lfht_node *node;
882 struct cds_lfht_iter iter;
883 struct lttng_channel_trigger_list *trigger_list;
884 struct lttng_trigger_list_element *trigger_list_element, *tmp;
885 struct channel_key key = { .key = channel_key, .domain = domain };
886 struct channel_info *channel_info;
887
888 DBG("[notification-thread] Removing channel key = %" PRIu64 " in %s domain",
889 channel_key, domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
890
891 rcu_read_lock();
892
893 cds_lfht_lookup(state->channel_triggers_ht,
894 hash_channel_key(&key),
895 match_channel_trigger_list,
896 &key,
897 &iter);
898 node = cds_lfht_iter_get_node(&iter);
899 /*
900 * There is a severe internal error if we are being asked to remove a
901 * channel that doesn't exist.
902 */
903 if (!node) {
904 ERR("[notification-thread] Channel being removed is unknown to the notification thread");
905 goto end;
906 }
907
908 /* Free the list of triggers associated with this channel. */
909 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
910 channel_triggers_ht_node);
911 cds_list_for_each_entry_safe(trigger_list_element, tmp,
912 &trigger_list->list, node) {
913 cds_list_del(&trigger_list_element->node);
914 free(trigger_list_element);
915 }
916 cds_lfht_del(state->channel_triggers_ht, node);
917 free(trigger_list);
918
919 /* Free sampled channel state. */
920 cds_lfht_lookup(state->channel_state_ht,
921 hash_channel_key(&key),
922 match_channel_state_sample,
923 &key,
924 &iter);
925 node = cds_lfht_iter_get_node(&iter);
926 /*
927 * This is expected to be NULL if the channel is destroyed before we
928 * received a sample.
929 */
930 if (node) {
931 struct channel_state_sample *sample = caa_container_of(node,
932 struct channel_state_sample,
933 channel_state_ht_node);
934
935 cds_lfht_del(state->channel_state_ht, node);
936 free(sample);
937 }
938
939 /* Remove the channel from the channels_ht and free it. */
940 cds_lfht_lookup(state->channels_ht,
941 hash_channel_key(&key),
942 match_channel_info,
943 &key,
944 &iter);
945 node = cds_lfht_iter_get_node(&iter);
946 assert(node);
947 channel_info = caa_container_of(node, struct channel_info,
948 channels_ht_node);
949 cds_lfht_del(state->channels_ht, node);
950 channel_info_destroy(channel_info);
951end:
952 rcu_read_unlock();
953 *cmd_result = LTTNG_OK;
954 return 0;
955}
956
1da26331
JG
957static
958int condition_is_supported(struct lttng_condition *condition)
959{
960 int ret;
961
962 switch (lttng_condition_get_type(condition)) {
963 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
964 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
965 {
966 enum lttng_domain_type domain;
967
968 ret = lttng_condition_buffer_usage_get_domain_type(condition,
969 &domain);
970 if (ret) {
971 ret = -1;
972 goto end;
973 }
974
975 if (domain != LTTNG_DOMAIN_KERNEL) {
976 ret = 1;
977 goto end;
978 }
979
980 /*
981 * Older kernel tracers don't expose the API to monitor their
982 * buffers. Therefore, we reject triggers that require that
983 * mechanism to be available to be evaluated.
984 */
985 ret = kernel_supports_ring_buffer_snapshot_sample_positions(
986 kernel_tracer_fd);
987 break;
988 }
989 default:
990 ret = 1;
991 }
992end:
993 return ret;
994}
995
ab0ee2ca
JG
996/*
997 * FIXME A client's credentials are not checked when registering a trigger, nor
998 * are they stored alongside with the trigger.
999 *
1da26331 1000 * The effects of this are benign since:
ab0ee2ca
JG
1001 * - The client will succeed in registering the trigger, as it is valid,
1002 * - The trigger will, internally, be bound to the channel,
1003 * - The notifications will not be sent since the client's credentials
1004 * are checked against the channel at that moment.
1da26331
JG
1005 *
1006 * If this function returns a non-zero value, it means something is
1007 * fundamentally and the whole subsystem/thread will be torn down.
1008 *
1009 * If a non-fatal error occurs, just set the cmd_result to the appropriate
1010 * error code.
ab0ee2ca
JG
1011 */
1012static
1013int handle_notification_thread_command_register_trigger(
1014 struct notification_thread_state *state,
1015 struct lttng_trigger *trigger,
1016 enum lttng_error_code *cmd_result)
1017{
1018 int ret = 0;
1019 struct lttng_condition *condition;
1020 struct notification_client *client;
1021 struct notification_client_list *client_list = NULL;
1022 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1023 struct notification_client_list_element *client_list_element, *tmp;
1024 struct cds_lfht_node *node;
1025 struct cds_lfht_iter iter;
1026 struct channel_info *channel;
1027 bool free_trigger = true;
1028
1029 rcu_read_lock();
1030
1031 condition = lttng_trigger_get_condition(trigger);
1da26331
JG
1032 assert(condition);
1033
1034 ret = condition_is_supported(condition);
1035 if (ret < 0) {
1036 goto error;
1037 } else if (ret == 0) {
1038 *cmd_result = LTTNG_ERR_NOT_SUPPORTED;
1039 goto error;
1040 } else {
1041 /* Feature is supported, continue. */
1042 ret = 0;
1043 }
1044
ab0ee2ca
JG
1045 trigger_ht_element = zmalloc(sizeof(*trigger_ht_element));
1046 if (!trigger_ht_element) {
1047 ret = -1;
1048 goto error;
1049 }
1050
1051 /* Add trigger to the trigger_ht. */
1052 cds_lfht_node_init(&trigger_ht_element->node);
1053 trigger_ht_element->trigger = trigger;
1054
1055 node = cds_lfht_add_unique(state->triggers_ht,
1056 lttng_condition_hash(condition),
1057 match_condition,
1058 condition,
1059 &trigger_ht_element->node);
1060 if (node != &trigger_ht_element->node) {
1061 /* Not a fatal error, simply report it to the client. */
1062 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
1063 goto error_free_ht_element;
1064 }
1065
1066 /*
1067 * Ownership of the trigger and of its wrapper was transfered to
1068 * the triggers_ht.
1069 */
1070 trigger_ht_element = NULL;
1071 free_trigger = false;
1072
1073 /*
1074 * The rest only applies to triggers that have a "notify" action.
1075 * It is not skipped as this is the only action type currently
1076 * supported.
1077 */
1078 client_list = zmalloc(sizeof(*client_list));
1079 if (!client_list) {
1080 ret = -1;
1081 goto error_free_ht_element;
1082 }
1083 cds_lfht_node_init(&client_list->notification_trigger_ht_node);
1084 CDS_INIT_LIST_HEAD(&client_list->list);
1085 client_list->trigger = trigger;
1086
1087 /* Build a list of clients to which this new trigger applies. */
1088 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
1089 client_socket_ht_node) {
1090 if (!trigger_applies_to_client(trigger, client)) {
1091 continue;
1092 }
1093
1094 client_list_element = zmalloc(sizeof(*client_list_element));
1095 if (!client_list_element) {
1096 ret = -1;
1097 goto error_free_client_list;
1098 }
1099 CDS_INIT_LIST_HEAD(&client_list_element->node);
1100 client_list_element->client = client;
1101 cds_list_add(&client_list_element->node, &client_list->list);
1102 }
1103
1104 cds_lfht_add(state->notification_trigger_clients_ht,
1105 lttng_condition_hash(condition),
1106 &client_list->notification_trigger_ht_node);
1107 /*
1108 * Client list ownership transferred to the
1109 * notification_trigger_clients_ht.
1110 */
1111 client_list = NULL;
1112
1113 /*
1114 * Add the trigger to list of triggers bound to the channels currently
1115 * known.
1116 */
1117 cds_lfht_for_each_entry(state->channels_ht, &iter, channel,
1118 channels_ht_node) {
1119 struct lttng_trigger_list_element *trigger_list_element;
1120 struct lttng_channel_trigger_list *trigger_list;
1121
1122 if (!trigger_applies_to_channel(trigger, channel)) {
1123 continue;
1124 }
1125
1126 cds_lfht_lookup(state->channel_triggers_ht,
1127 hash_channel_key(&channel->key),
1128 match_channel_trigger_list,
1129 &channel->key,
1130 &iter);
1131 node = cds_lfht_iter_get_node(&iter);
1132 assert(node);
ab0ee2ca
JG
1133 trigger_list = caa_container_of(node,
1134 struct lttng_channel_trigger_list,
1135 channel_triggers_ht_node);
1136
1137 trigger_list_element = zmalloc(sizeof(*trigger_list_element));
1138 if (!trigger_list_element) {
1139 ret = -1;
1140 goto error_free_client_list;
1141 }
1142 CDS_INIT_LIST_HEAD(&trigger_list_element->node);
1143 trigger_list_element->trigger = trigger;
1144 cds_list_add(&trigger_list_element->node, &trigger_list->list);
e4db5ace 1145
ab0ee2ca
JG
1146 /* A trigger can only apply to one channel. */
1147 break;
1148 }
1149
1150 *cmd_result = LTTNG_OK;
1151error_free_client_list:
1152 if (client_list) {
1153 cds_list_for_each_entry_safe(client_list_element, tmp,
1154 &client_list->list, node) {
1155 free(client_list_element);
1156 }
1157 free(client_list);
1158 }
1159error_free_ht_element:
1160 free(trigger_ht_element);
1161error:
1162 if (free_trigger) {
1163 struct lttng_action *action = lttng_trigger_get_action(trigger);
1164
1165 lttng_condition_destroy(condition);
1166 lttng_action_destroy(action);
1167 lttng_trigger_destroy(trigger);
1168 }
1169 rcu_read_unlock();
1170 return ret;
1171}
1172
cc2295b5 1173static
ab0ee2ca
JG
1174int handle_notification_thread_command_unregister_trigger(
1175 struct notification_thread_state *state,
1176 struct lttng_trigger *trigger,
1177 enum lttng_error_code *_cmd_reply)
1178{
1179 struct cds_lfht_iter iter;
1180 struct cds_lfht_node *node, *triggers_ht_node;
1181 struct lttng_channel_trigger_list *trigger_list;
1182 struct notification_client_list *client_list;
1183 struct notification_client_list_element *client_list_element, *tmp;
1184 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1185 struct lttng_condition *condition = lttng_trigger_get_condition(
1186 trigger);
1187 struct lttng_action *action;
1188 enum lttng_error_code cmd_reply;
1189
1190 rcu_read_lock();
1191
1192 cds_lfht_lookup(state->triggers_ht,
1193 lttng_condition_hash(condition),
1194 match_condition,
1195 condition,
1196 &iter);
1197 triggers_ht_node = cds_lfht_iter_get_node(&iter);
1198 if (!triggers_ht_node) {
1199 cmd_reply = LTTNG_ERR_TRIGGER_NOT_FOUND;
1200 goto end;
1201 } else {
1202 cmd_reply = LTTNG_OK;
1203 }
1204
1205 /* Remove trigger from channel_triggers_ht. */
1206 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter, trigger_list,
1207 channel_triggers_ht_node) {
1208 struct lttng_trigger_list_element *trigger_element, *tmp;
1209
1210 cds_list_for_each_entry_safe(trigger_element, tmp,
1211 &trigger_list->list, node) {
1212 struct lttng_condition *current_condition =
1213 lttng_trigger_get_condition(
1214 trigger_element->trigger);
1215
1216 assert(current_condition);
1217 if (!lttng_condition_is_equal(condition,
1218 current_condition)) {
1219 continue;
1220 }
1221
1222 DBG("[notification-thread] Removed trigger from channel_triggers_ht");
1223 cds_list_del(&trigger_element->node);
e4db5ace
JR
1224 /* A trigger can only appear once per channel */
1225 break;
ab0ee2ca
JG
1226 }
1227 }
1228
1229 /*
1230 * Remove and release the client list from
1231 * notification_trigger_clients_ht.
1232 */
1233 cds_lfht_lookup(state->notification_trigger_clients_ht,
1234 lttng_condition_hash(condition),
1235 match_client_list,
1236 trigger,
1237 &iter);
1238 node = cds_lfht_iter_get_node(&iter);
1239 assert(node);
1240 client_list = caa_container_of(node, struct notification_client_list,
1241 notification_trigger_ht_node);
1242 cds_list_for_each_entry_safe(client_list_element, tmp,
1243 &client_list->list, node) {
1244 free(client_list_element);
1245 }
1246 cds_lfht_del(state->notification_trigger_clients_ht, node);
1247 free(client_list);
1248
1249 /* Remove trigger from triggers_ht. */
1250 trigger_ht_element = caa_container_of(triggers_ht_node,
1251 struct lttng_trigger_ht_element, node);
1252 cds_lfht_del(state->triggers_ht, triggers_ht_node);
1253
1254 condition = lttng_trigger_get_condition(trigger_ht_element->trigger);
1255 lttng_condition_destroy(condition);
1256 action = lttng_trigger_get_action(trigger_ht_element->trigger);
1257 lttng_action_destroy(action);
1258 lttng_trigger_destroy(trigger_ht_element->trigger);
1259 free(trigger_ht_element);
1260end:
1261 rcu_read_unlock();
1262 if (_cmd_reply) {
1263 *_cmd_reply = cmd_reply;
1264 }
1265 return 0;
1266}
1267
1268/* Returns 0 on success, 1 on exit requested, negative value on error. */
1269int handle_notification_thread_command(
1270 struct notification_thread_handle *handle,
1271 struct notification_thread_state *state)
1272{
1273 int ret;
1274 uint64_t counter;
1275 struct notification_thread_command *cmd;
1276
1277 /* Read event_fd to put it back into a quiescent state. */
1278 ret = read(handle->cmd_queue.event_fd, &counter, sizeof(counter));
1279 if (ret == -1) {
1280 goto error;
1281 }
1282
1283 pthread_mutex_lock(&handle->cmd_queue.lock);
1284 cmd = cds_list_first_entry(&handle->cmd_queue.list,
1285 struct notification_thread_command, cmd_list_node);
1286 switch (cmd->type) {
1287 case NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER:
1288 DBG("[notification-thread] Received register trigger command");
1289 ret = handle_notification_thread_command_register_trigger(
1290 state, cmd->parameters.trigger,
1291 &cmd->reply_code);
1292 break;
1293 case NOTIFICATION_COMMAND_TYPE_UNREGISTER_TRIGGER:
1294 DBG("[notification-thread] Received unregister trigger command");
1295 ret = handle_notification_thread_command_unregister_trigger(
1296 state, cmd->parameters.trigger,
1297 &cmd->reply_code);
1298 break;
1299 case NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL:
1300 DBG("[notification-thread] Received add channel command");
1301 ret = handle_notification_thread_command_add_channel(
1302 state, &cmd->parameters.add_channel,
1303 &cmd->reply_code);
1304 break;
1305 case NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL:
1306 DBG("[notification-thread] Received remove channel command");
1307 ret = handle_notification_thread_command_remove_channel(
1308 state, cmd->parameters.remove_channel.key,
1309 cmd->parameters.remove_channel.domain,
1310 &cmd->reply_code);
1311 break;
1312 case NOTIFICATION_COMMAND_TYPE_QUIT:
1313 DBG("[notification-thread] Received quit command");
1314 cmd->reply_code = LTTNG_OK;
1315 ret = 1;
1316 goto end;
1317 default:
1318 ERR("[notification-thread] Unknown internal command received");
1319 goto error_unlock;
1320 }
1321
1322 if (ret) {
1323 goto error_unlock;
1324 }
1325end:
1326 cds_list_del(&cmd->cmd_list_node);
8ada111f 1327 lttng_waiter_wake_up(&cmd->reply_waiter);
ab0ee2ca
JG
1328 pthread_mutex_unlock(&handle->cmd_queue.lock);
1329 return ret;
1330error_unlock:
1331 /* Wake-up and return a fatal error to the calling thread. */
8ada111f 1332 lttng_waiter_wake_up(&cmd->reply_waiter);
ab0ee2ca
JG
1333 pthread_mutex_unlock(&handle->cmd_queue.lock);
1334 cmd->reply_code = LTTNG_ERR_FATAL;
1335error:
1336 /* Indicate a fatal error to the caller. */
1337 return -1;
1338}
1339
1340static
1341unsigned long hash_client_socket(int socket)
1342{
1343 return hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed);
1344}
1345
1346static
1347int socket_set_non_blocking(int socket)
1348{
1349 int ret, flags;
1350
1351 /* Set the pipe as non-blocking. */
1352 ret = fcntl(socket, F_GETFL, 0);
1353 if (ret == -1) {
1354 PERROR("fcntl get socket flags");
1355 goto end;
1356 }
1357 flags = ret;
1358
1359 ret = fcntl(socket, F_SETFL, flags | O_NONBLOCK);
1360 if (ret == -1) {
1361 PERROR("fcntl set O_NONBLOCK socket flag");
1362 goto end;
1363 }
1364 DBG("Client socket (fd = %i) set as non-blocking", socket);
1365end:
1366 return ret;
1367}
1368
1369static
14fa22f8 1370int client_reset_inbound_state(struct notification_client *client)
ab0ee2ca
JG
1371{
1372 int ret;
1373
1374 ret = lttng_dynamic_buffer_set_size(
1375 &client->communication.inbound.buffer, 0);
1376 assert(!ret);
1377
1378 client->communication.inbound.bytes_to_receive =
1379 sizeof(struct lttng_notification_channel_message);
1380 client->communication.inbound.msg_type =
1381 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN;
ab0ee2ca
JG
1382 LTTNG_SOCK_SET_UID_CRED(&client->communication.inbound.creds, -1);
1383 LTTNG_SOCK_SET_GID_CRED(&client->communication.inbound.creds, -1);
14fa22f8
JG
1384 ret = lttng_dynamic_buffer_set_size(
1385 &client->communication.inbound.buffer,
1386 client->communication.inbound.bytes_to_receive);
1387 return ret;
ab0ee2ca
JG
1388}
1389
1390int handle_notification_thread_client_connect(
1391 struct notification_thread_state *state)
1392{
1393 int ret;
1394 struct notification_client *client;
1395
1396 DBG("[notification-thread] Handling new notification channel client connection");
1397
1398 client = zmalloc(sizeof(*client));
1399 if (!client) {
1400 /* Fatal error. */
1401 ret = -1;
1402 goto error;
1403 }
1404 CDS_INIT_LIST_HEAD(&client->condition_list);
1405 lttng_dynamic_buffer_init(&client->communication.inbound.buffer);
1406 lttng_dynamic_buffer_init(&client->communication.outbound.buffer);
01ea340e 1407 client->communication.inbound.expect_creds = true;
14fa22f8
JG
1408 ret = client_reset_inbound_state(client);
1409 if (ret) {
1410 ERR("[notification-thread] Failed to reset client communication's inbound state");
1411 ret = 0;
1412 goto error;
1413 }
ab0ee2ca
JG
1414
1415 ret = lttcomm_accept_unix_sock(state->notification_channel_socket);
1416 if (ret < 0) {
1417 ERR("[notification-thread] Failed to accept new notification channel client connection");
1418 ret = 0;
1419 goto error;
1420 }
1421
1422 client->socket = ret;
1423
1424 ret = socket_set_non_blocking(client->socket);
1425 if (ret) {
1426 ERR("[notification-thread] Failed to set new notification channel client connection socket as non-blocking");
1427 goto error;
1428 }
1429
1430 ret = lttcomm_setsockopt_creds_unix_sock(client->socket);
1431 if (ret < 0) {
1432 ERR("[notification-thread] Failed to set socket options on new notification channel client socket");
1433 ret = 0;
1434 goto error;
1435 }
1436
1437 ret = lttng_poll_add(&state->events, client->socket,
1438 LPOLLIN | LPOLLERR |
1439 LPOLLHUP | LPOLLRDHUP);
1440 if (ret < 0) {
1441 ERR("[notification-thread] Failed to add notification channel client socket to poll set");
1442 ret = 0;
1443 goto error;
1444 }
1445 DBG("[notification-thread] Added new notification channel client socket (%i) to poll set",
1446 client->socket);
1447
ab0ee2ca
JG
1448 rcu_read_lock();
1449 cds_lfht_add(state->client_socket_ht,
1450 hash_client_socket(client->socket),
1451 &client->client_socket_ht_node);
1452 rcu_read_unlock();
1453
1454 return ret;
1455error:
1456 notification_client_destroy(client, state);
1457 return ret;
1458}
1459
1460int handle_notification_thread_client_disconnect(
1461 int client_socket,
1462 struct notification_thread_state *state)
1463{
1464 int ret = 0;
1465 struct notification_client *client;
1466
1467 rcu_read_lock();
1468 DBG("[notification-thread] Closing client connection (socket fd = %i)",
1469 client_socket);
1470 client = get_client_from_socket(client_socket, state);
1471 if (!client) {
1472 /* Internal state corruption, fatal error. */
1473 ERR("[notification-thread] Unable to find client (socket fd = %i)",
1474 client_socket);
1475 ret = -1;
1476 goto end;
1477 }
1478
1479 ret = lttng_poll_del(&state->events, client_socket);
1480 if (ret) {
1481 ERR("[notification-thread] Failed to remove client socket from poll set");
1482 }
1483 cds_lfht_del(state->client_socket_ht,
1484 &client->client_socket_ht_node);
1485 notification_client_destroy(client, state);
1486end:
1487 rcu_read_unlock();
1488 return ret;
1489}
1490
1491int handle_notification_thread_client_disconnect_all(
1492 struct notification_thread_state *state)
1493{
1494 struct cds_lfht_iter iter;
1495 struct notification_client *client;
1496 bool error_encoutered = false;
1497
1498 rcu_read_lock();
1499 DBG("[notification-thread] Closing all client connections");
1500 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
1501 client_socket_ht_node) {
1502 int ret;
1503
1504 ret = handle_notification_thread_client_disconnect(
1505 client->socket, state);
1506 if (ret) {
1507 error_encoutered = true;
1508 }
1509 }
1510 rcu_read_unlock();
1511 return error_encoutered ? 1 : 0;
1512}
1513
1514int handle_notification_thread_trigger_unregister_all(
1515 struct notification_thread_state *state)
1516{
4149ace8 1517 bool error_occurred = false;
ab0ee2ca
JG
1518 struct cds_lfht_iter iter;
1519 struct lttng_trigger_ht_element *trigger_ht_element;
1520
1521 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1522 node) {
1523 int ret = handle_notification_thread_command_unregister_trigger(
1524 state, trigger_ht_element->trigger, NULL);
1525 if (ret) {
4149ace8 1526 error_occurred = true;
ab0ee2ca
JG
1527 }
1528 }
4149ace8 1529 return error_occurred ? -1 : 0;
ab0ee2ca
JG
1530}
1531
1532static
1533int client_flush_outgoing_queue(struct notification_client *client,
1534 struct notification_thread_state *state)
1535{
1536 ssize_t ret;
1537 size_t to_send_count;
1538
1539 assert(client->communication.outbound.buffer.size != 0);
1540 to_send_count = client->communication.outbound.buffer.size;
1541 DBG("[notification-thread] Flushing client (socket fd = %i) outgoing queue",
1542 client->socket);
1543
1544 ret = lttcomm_send_unix_sock_non_block(client->socket,
1545 client->communication.outbound.buffer.data,
1546 to_send_count);
1547 if ((ret < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) ||
1548 (ret > 0 && ret < to_send_count)) {
1549 DBG("[notification-thread] Client (socket fd = %i) outgoing queue could not be completely flushed",
1550 client->socket);
1551 to_send_count -= max(ret, 0);
1552
1553 memcpy(client->communication.outbound.buffer.data,
1554 client->communication.outbound.buffer.data +
1555 client->communication.outbound.buffer.size - to_send_count,
1556 to_send_count);
1557 ret = lttng_dynamic_buffer_set_size(
1558 &client->communication.outbound.buffer,
1559 to_send_count);
1560 if (ret) {
1561 goto error;
1562 }
1563
1564 /*
1565 * We want to be notified whenever there is buffer space
1566 * available to send the rest of the payload.
1567 */
1568 ret = lttng_poll_mod(&state->events, client->socket,
1569 CLIENT_POLL_MASK_IN_OUT);
1570 if (ret) {
1571 goto error;
1572 }
1573 } else if (ret < 0) {
1574 /* Generic error, disconnect the client. */
1575 ERR("[notification-thread] Failed to send flush outgoing queue, disconnecting client (socket fd = %i)",
1576 client->socket);
1577 ret = handle_notification_thread_client_disconnect(
1578 client->socket, state);
1579 if (ret) {
1580 goto error;
1581 }
1582 } else {
1583 /* No error and flushed the queue completely. */
1584 ret = lttng_dynamic_buffer_set_size(
1585 &client->communication.outbound.buffer, 0);
1586 if (ret) {
1587 goto error;
1588 }
1589 ret = lttng_poll_mod(&state->events, client->socket,
1590 CLIENT_POLL_MASK_IN);
1591 if (ret) {
1592 goto error;
1593 }
1594
1595 client->communication.outbound.queued_command_reply = false;
1596 client->communication.outbound.dropped_notification = false;
1597 }
1598
1599 return 0;
1600error:
1601 return -1;
1602}
1603
1604static
1605int client_send_command_reply(struct notification_client *client,
1606 struct notification_thread_state *state,
1607 enum lttng_notification_channel_status status)
1608{
1609 int ret;
1610 struct lttng_notification_channel_command_reply reply = {
1611 .status = (int8_t) status,
1612 };
1613 struct lttng_notification_channel_message msg = {
1614 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY,
1615 .size = sizeof(reply),
1616 };
1617 char buffer[sizeof(msg) + sizeof(reply)];
1618
1619 if (client->communication.outbound.queued_command_reply) {
1620 /* Protocol error. */
1621 goto error;
1622 }
1623
1624 memcpy(buffer, &msg, sizeof(msg));
1625 memcpy(buffer + sizeof(msg), &reply, sizeof(reply));
1626 DBG("[notification-thread] Send command reply (%i)", (int) status);
1627
1628 /* Enqueue buffer to outgoing queue and flush it. */
1629 ret = lttng_dynamic_buffer_append(
1630 &client->communication.outbound.buffer,
1631 buffer, sizeof(buffer));
1632 if (ret) {
1633 goto error;
1634 }
1635
1636 ret = client_flush_outgoing_queue(client, state);
1637 if (ret) {
1638 goto error;
1639 }
1640
1641 if (client->communication.outbound.buffer.size != 0) {
1642 /* Queue could not be emptied. */
1643 client->communication.outbound.queued_command_reply = true;
1644 }
1645
1646 return 0;
1647error:
1648 return -1;
1649}
1650
1651static
1652int client_dispatch_message(struct notification_client *client,
1653 struct notification_thread_state *state)
1654{
1655 int ret = 0;
1656
1657 if (client->communication.inbound.msg_type !=
1658 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE &&
1659 client->communication.inbound.msg_type !=
1660 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN &&
1661 !client->validated) {
1662 WARN("[notification-thread] client attempted a command before handshake");
1663 ret = -1;
1664 goto end;
1665 }
1666
1667 switch (client->communication.inbound.msg_type) {
1668 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN:
1669 {
1670 /*
1671 * Receiving message header. The function will be called again
1672 * once the rest of the message as been received and can be
1673 * interpreted.
1674 */
1675 const struct lttng_notification_channel_message *msg;
1676
1677 assert(sizeof(*msg) ==
1678 client->communication.inbound.buffer.size);
1679 msg = (const struct lttng_notification_channel_message *)
1680 client->communication.inbound.buffer.data;
1681
1682 if (msg->size == 0 || msg->size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) {
1683 ERR("[notification-thread] Invalid notification channel message: length = %u", msg->size);
1684 ret = -1;
1685 goto end;
1686 }
1687
1688 switch (msg->type) {
1689 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
1690 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
1691 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
1692 break;
1693 default:
1694 ret = -1;
1695 ERR("[notification-thread] Invalid notification channel message: unexpected message type");
1696 goto end;
1697 }
1698
1699 client->communication.inbound.bytes_to_receive = msg->size;
1700 client->communication.inbound.msg_type =
1701 (enum lttng_notification_channel_message_type) msg->type;
ab0ee2ca 1702 ret = lttng_dynamic_buffer_set_size(
14fa22f8 1703 &client->communication.inbound.buffer, msg->size);
ab0ee2ca
JG
1704 if (ret) {
1705 goto end;
1706 }
1707 break;
1708 }
1709 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
1710 {
1711 struct lttng_notification_channel_command_handshake *handshake_client;
1712 struct lttng_notification_channel_command_handshake handshake_reply = {
1713 .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
1714 .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
1715 };
1716 struct lttng_notification_channel_message msg_header = {
1717 .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE,
1718 .size = sizeof(handshake_reply),
1719 };
1720 enum lttng_notification_channel_status status =
1721 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
1722 char send_buffer[sizeof(msg_header) + sizeof(handshake_reply)];
1723
1724 memcpy(send_buffer, &msg_header, sizeof(msg_header));
1725 memcpy(send_buffer + sizeof(msg_header), &handshake_reply,
1726 sizeof(handshake_reply));
1727
1728 handshake_client =
1729 (struct lttng_notification_channel_command_handshake *)
1730 client->communication.inbound.buffer.data;
47a32869 1731 client->major = handshake_client->major;
ab0ee2ca
JG
1732 client->minor = handshake_client->minor;
1733 if (!client->communication.inbound.creds_received) {
1734 ERR("[notification-thread] No credentials received from client");
1735 ret = -1;
1736 goto end;
1737 }
1738
1739 client->uid = LTTNG_SOCK_GET_UID_CRED(
1740 &client->communication.inbound.creds);
1741 client->gid = LTTNG_SOCK_GET_GID_CRED(
1742 &client->communication.inbound.creds);
1743 DBG("[notification-thread] Received handshake from client (uid = %u, gid = %u) with version %i.%i",
1744 client->uid, client->gid, (int) client->major,
1745 (int) client->minor);
1746
1747 if (handshake_client->major != LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) {
1748 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNSUPPORTED_VERSION;
1749 }
1750
1751 ret = lttng_dynamic_buffer_append(&client->communication.outbound.buffer,
1752 send_buffer, sizeof(send_buffer));
1753 if (ret) {
1754 ERR("[notification-thread] Failed to send protocol version to notification channel client");
1755 goto end;
1756 }
1757
1758 ret = client_flush_outgoing_queue(client, state);
1759 if (ret) {
1760 goto end;
1761 }
1762
1763 ret = client_send_command_reply(client, state, status);
1764 if (ret) {
1765 ERR("[notification-thread] Failed to send reply to notification channel client");
1766 goto end;
1767 }
1768
1769 /* Set reception state to receive the next message header. */
14fa22f8
JG
1770 ret = client_reset_inbound_state(client);
1771 if (ret) {
1772 ERR("[notification-thread] Failed to reset client communication's inbound state");
1773 goto end;
1774 }
ab0ee2ca
JG
1775 client->validated = true;
1776 break;
1777 }
1778 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
1779 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
1780 {
1781 struct lttng_condition *condition;
1782 enum lttng_notification_channel_status status =
1783 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
1784 const struct lttng_buffer_view condition_view =
1785 lttng_buffer_view_from_dynamic_buffer(
1786 &client->communication.inbound.buffer,
1787 0, -1);
1788 size_t expected_condition_size =
1789 client->communication.inbound.buffer.size;
1790
1791 ret = lttng_condition_create_from_buffer(&condition_view,
1792 &condition);
1793 if (ret != expected_condition_size) {
1794 ERR("[notification-thread] Malformed condition received from client");
1795 goto end;
1796 }
1797
1798 if (client->communication.inbound.msg_type ==
1799 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE) {
1800 /*
1801 * FIXME The current state should be evaluated on
1802 * subscription.
1803 */
1804 ret = notification_thread_client_subscribe(client,
1805 condition, state, &status);
1806 } else {
1807 ret = notification_thread_client_unsubscribe(client,
1808 condition, state, &status);
1809 }
1810 if (ret) {
1811 goto end;
1812 }
1813
1814 ret = client_send_command_reply(client, state, status);
1815 if (ret) {
1816 ERR("[notification-thread] Failed to send reply to notification channel client");
1817 goto end;
1818 }
1819
1820 /* Set reception state to receive the next message header. */
14fa22f8
JG
1821 ret = client_reset_inbound_state(client);
1822 if (ret) {
1823 ERR("[notification-thread] Failed to reset client communication's inbound state");
1824 goto end;
1825 }
ab0ee2ca
JG
1826 break;
1827 }
1828 default:
1829 abort();
1830 }
1831end:
1832 return ret;
1833}
1834
1835/* Incoming data from client. */
1836int handle_notification_thread_client_in(
1837 struct notification_thread_state *state, int socket)
1838{
14fa22f8 1839 int ret = 0;
ab0ee2ca
JG
1840 struct notification_client *client;
1841 ssize_t recv_ret;
1842 size_t offset;
1843
1844 client = get_client_from_socket(socket, state);
1845 if (!client) {
1846 /* Internal error, abort. */
1847 ret = -1;
1848 goto end;
1849 }
1850
14fa22f8
JG
1851 offset = client->communication.inbound.buffer.size -
1852 client->communication.inbound.bytes_to_receive;
01ea340e 1853 if (client->communication.inbound.expect_creds) {
ab0ee2ca
JG
1854 recv_ret = lttcomm_recv_creds_unix_sock(socket,
1855 client->communication.inbound.buffer.data + offset,
1856 client->communication.inbound.bytes_to_receive,
1857 &client->communication.inbound.creds);
1858 if (recv_ret > 0) {
01ea340e 1859 client->communication.inbound.expect_creds = false;
ab0ee2ca
JG
1860 client->communication.inbound.creds_received = true;
1861 }
1862 } else {
1863 recv_ret = lttcomm_recv_unix_sock_non_block(socket,
1864 client->communication.inbound.buffer.data + offset,
1865 client->communication.inbound.bytes_to_receive);
1866 }
1867 if (recv_ret < 0) {
1868 goto error_disconnect_client;
1869 }
1870
1871 client->communication.inbound.bytes_to_receive -= recv_ret;
ab0ee2ca
JG
1872 if (client->communication.inbound.bytes_to_receive == 0) {
1873 ret = client_dispatch_message(client, state);
1874 if (ret) {
1875 /*
1876 * Only returns an error if this client must be
1877 * disconnected.
1878 */
1879 goto error_disconnect_client;
1880 }
1881 } else {
1882 goto end;
1883 }
1884end:
1885 return ret;
1886error_disconnect_client:
1887 ret = handle_notification_thread_client_disconnect(socket, state);
1888 return ret;
1889}
1890
1891/* Client ready to receive outgoing data. */
1892int handle_notification_thread_client_out(
1893 struct notification_thread_state *state, int socket)
1894{
1895 int ret;
1896 struct notification_client *client;
1897
1898 client = get_client_from_socket(socket, state);
1899 if (!client) {
1900 /* Internal error, abort. */
1901 ret = -1;
1902 goto end;
1903 }
1904
1905 ret = client_flush_outgoing_queue(client, state);
1906 if (ret) {
1907 goto end;
1908 }
1909end:
1910 return ret;
1911}
1912
1913static
1914bool evaluate_buffer_usage_condition(struct lttng_condition *condition,
1915 struct channel_state_sample *sample, uint64_t buffer_capacity)
1916{
1917 bool result = false;
1918 uint64_t threshold;
1919 enum lttng_condition_type condition_type;
1920 struct lttng_condition_buffer_usage *use_condition = container_of(
1921 condition, struct lttng_condition_buffer_usage,
1922 parent);
1923
1924 if (!sample) {
1925 goto end;
1926 }
1927
1928 if (use_condition->threshold_bytes.set) {
1929 threshold = use_condition->threshold_bytes.value;
1930 } else {
1931 /*
1932 * Threshold was expressed as a ratio.
1933 *
1934 * TODO the threshold (in bytes) of conditions expressed
1935 * as a ratio of total buffer size could be cached to
1936 * forego this double-multiplication or it could be performed
1937 * as fixed-point math.
1938 *
1939 * Note that caching should accomodate the case where the
1940 * condition applies to multiple channels (i.e. don't assume
1941 * that all channels matching my_chann* have the same size...)
1942 */
1943 threshold = (uint64_t) (use_condition->threshold_ratio.value *
1944 (double) buffer_capacity);
1945 }
1946
1947 condition_type = lttng_condition_get_type(condition);
1948 if (condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW) {
1949 DBG("[notification-thread] Low buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
1950 threshold, sample->highest_usage);
1951
1952 /*
1953 * The low condition should only be triggered once _all_ of the
1954 * streams in a channel have gone below the "low" threshold.
1955 */
1956 if (sample->highest_usage <= threshold) {
1957 result = true;
1958 }
1959 } else {
1960 DBG("[notification-thread] High buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
1961 threshold, sample->highest_usage);
1962
1963 /*
1964 * For high buffer usage scenarios, we want to trigger whenever
1965 * _any_ of the streams has reached the "high" threshold.
1966 */
1967 if (sample->highest_usage >= threshold) {
1968 result = true;
1969 }
1970 }
1971end:
1972 return result;
1973}
1974
1975static
1976int evaluate_condition(struct lttng_condition *condition,
1977 struct lttng_evaluation **evaluation,
1978 struct notification_thread_state *state,
1979 struct channel_state_sample *previous_sample,
1980 struct channel_state_sample *latest_sample,
1981 uint64_t buffer_capacity)
1982{
1983 int ret = 0;
1984 enum lttng_condition_type condition_type;
1985 bool previous_sample_result;
1986 bool latest_sample_result;
1987
1988 condition_type = lttng_condition_get_type(condition);
1989 /* No other condition type supported for the moment. */
1990 assert(condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW ||
1991 condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH);
1992
1993 previous_sample_result = evaluate_buffer_usage_condition(condition,
1994 previous_sample, buffer_capacity);
1995 latest_sample_result = evaluate_buffer_usage_condition(condition,
1996 latest_sample, buffer_capacity);
1997
1998 if (!latest_sample_result ||
1999 (previous_sample_result == latest_sample_result)) {
2000 /*
2001 * Only trigger on a condition evaluation transition.
2002 *
2003 * NOTE: This edge-triggered logic may not be appropriate for
2004 * future condition types.
2005 */
2006 goto end;
2007 }
2008
2009 if (evaluation && latest_sample_result) {
2010 *evaluation = lttng_evaluation_buffer_usage_create(
2011 condition_type,
2012 latest_sample->highest_usage,
2013 buffer_capacity);
2014 if (!*evaluation) {
2015 ret = -1;
2016 goto end;
2017 }
2018 }
2019end:
2020 return ret;
2021}
2022
2023static
2024int client_enqueue_dropped_notification(struct notification_client *client,
2025 struct notification_thread_state *state)
2026{
2027 int ret;
2028 struct lttng_notification_channel_message msg = {
2029 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED,
2030 .size = 0,
2031 };
2032
2033 ret = lttng_dynamic_buffer_append(
2034 &client->communication.outbound.buffer, &msg,
2035 sizeof(msg));
2036 return ret;
2037}
2038
2039static
2040int send_evaluation_to_clients(struct lttng_trigger *trigger,
2041 struct lttng_evaluation *evaluation,
2042 struct notification_client_list* client_list,
2043 struct notification_thread_state *state,
2044 uid_t channel_uid, gid_t channel_gid)
2045{
2046 int ret = 0;
2047 struct lttng_dynamic_buffer msg_buffer;
2048 struct notification_client_list_element *client_list_element, *tmp;
2049 struct lttng_notification *notification;
2050 struct lttng_condition *condition;
2051 ssize_t expected_notification_size, notification_size;
2052 struct lttng_notification_channel_message msg;
2053
2054 lttng_dynamic_buffer_init(&msg_buffer);
2055
2056 condition = lttng_trigger_get_condition(trigger);
2057 assert(condition);
2058
2059 notification = lttng_notification_create(condition, evaluation);
2060 if (!notification) {
2061 ret = -1;
2062 goto end;
2063 }
2064
2065 expected_notification_size = lttng_notification_serialize(notification,
2066 NULL);
2067 if (expected_notification_size < 0) {
2068 ERR("[notification-thread] Failed to get size of serialized notification");
2069 ret = -1;
2070 goto end;
2071 }
2072
2073 msg.type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION;
2074 msg.size = (uint32_t) expected_notification_size;
2075 ret = lttng_dynamic_buffer_append(&msg_buffer, &msg, sizeof(msg));
2076 if (ret) {
2077 goto end;
2078 }
2079
2080 ret = lttng_dynamic_buffer_set_size(&msg_buffer,
2081 msg_buffer.size + expected_notification_size);
2082 if (ret) {
2083 goto end;
2084 }
2085
2086 notification_size = lttng_notification_serialize(notification,
2087 msg_buffer.data + sizeof(msg));
2088 if (notification_size != expected_notification_size) {
2089 ERR("[notification-thread] Failed to serialize notification");
2090 ret = -1;
2091 goto end;
2092 }
2093
2094 cds_list_for_each_entry_safe(client_list_element, tmp,
2095 &client_list->list, node) {
2096 struct notification_client *client =
2097 client_list_element->client;
2098
2099 if (client->uid != channel_uid && client->gid != channel_gid &&
2100 client->uid != 0) {
2101 /* Client is not allowed to monitor this channel. */
2102 DBG("[notification-thread] Skipping client at it does not have the permission to receive notification for this channel");
2103 continue;
2104 }
2105
2106 DBG("[notification-thread] Sending notification to client (fd = %i, %zu bytes)",
2107 client->socket, msg_buffer.size);
2108 if (client->communication.outbound.buffer.size) {
2109 /*
2110 * Outgoing data is already buffered for this client;
2111 * drop the notification and enqueue a "dropped
2112 * notification" message if this is the first dropped
2113 * notification since the socket spilled-over to the
2114 * queue.
2115 */
2116 DBG("[notification-thread] Dropping notification addressed to client (socket fd = %i)",
2117 client->socket);
2118 if (!client->communication.outbound.dropped_notification) {
2119 client->communication.outbound.dropped_notification = true;
2120 ret = client_enqueue_dropped_notification(
2121 client, state);
2122 if (ret) {
2123 goto end;
2124 }
2125 }
2126 continue;
2127 }
2128
2129 ret = lttng_dynamic_buffer_append_buffer(
2130 &client->communication.outbound.buffer,
2131 &msg_buffer);
2132 if (ret) {
2133 goto end;
2134 }
2135
2136 ret = client_flush_outgoing_queue(client, state);
2137 if (ret) {
2138 goto end;
2139 }
2140 }
2141 ret = 0;
2142end:
2143 lttng_notification_destroy(notification);
2144 lttng_dynamic_buffer_reset(&msg_buffer);
2145 return ret;
2146}
2147
2148int handle_notification_thread_channel_sample(
2149 struct notification_thread_state *state, int pipe,
2150 enum lttng_domain_type domain)
2151{
2152 int ret = 0;
2153 struct lttcomm_consumer_channel_monitor_msg sample_msg;
2154 struct channel_state_sample previous_sample, latest_sample;
2155 struct channel_info *channel_info;
2156 struct cds_lfht_node *node;
2157 struct cds_lfht_iter iter;
2158 struct lttng_channel_trigger_list *trigger_list;
2159 struct lttng_trigger_list_element *trigger_list_element;
2160 bool previous_sample_available = false;
2161
2162 /*
2163 * The monitoring pipe only holds messages smaller than PIPE_BUF,
2164 * ensuring that read/write of sampling messages are atomic.
2165 */
7c2551ef 2166 ret = lttng_read(pipe, &sample_msg, sizeof(sample_msg));
ab0ee2ca
JG
2167 if (ret != sizeof(sample_msg)) {
2168 ERR("[notification-thread] Failed to read from monitoring pipe (fd = %i)",
2169 pipe);
2170 ret = -1;
2171 goto end;
2172 }
2173
2174 ret = 0;
2175 latest_sample.key.key = sample_msg.key;
2176 latest_sample.key.domain = domain;
2177 latest_sample.highest_usage = sample_msg.highest;
2178 latest_sample.lowest_usage = sample_msg.lowest;
2179
2180 rcu_read_lock();
2181
2182 /* Retrieve the channel's informations */
2183 cds_lfht_lookup(state->channels_ht,
2184 hash_channel_key(&latest_sample.key),
2185 match_channel_info,
2186 &latest_sample.key,
2187 &iter);
2188 node = cds_lfht_iter_get_node(&iter);
2189 if (!node) {
2190 /*
2191 * Not an error since the consumer can push a sample to the pipe
2192 * and the rest of the session daemon could notify us of the
2193 * channel's destruction before we get a chance to process that
2194 * sample.
2195 */
2196 DBG("[notification-thread] Received a sample for an unknown channel from consumerd, key = %" PRIu64 " in %s domain",
2197 latest_sample.key.key,
2198 domain == LTTNG_DOMAIN_KERNEL ? "kernel" :
2199 "user space");
2200 goto end_unlock;
2201 }
2202 channel_info = caa_container_of(node, struct channel_info,
2203 channels_ht_node);
2204 DBG("[notification-thread] Handling channel sample for channel %s (key = %" PRIu64 ") in session %s (highest usage = %" PRIu64 ", lowest usage = %" PRIu64")",
2205 channel_info->channel_name,
2206 latest_sample.key.key,
2207 channel_info->session_name,
2208 latest_sample.highest_usage,
2209 latest_sample.lowest_usage);
2210
2211 /* Retrieve the channel's last sample, if it exists, and update it. */
2212 cds_lfht_lookup(state->channel_state_ht,
2213 hash_channel_key(&latest_sample.key),
2214 match_channel_state_sample,
2215 &latest_sample.key,
2216 &iter);
2217 node = cds_lfht_iter_get_node(&iter);
2218 if (node) {
2219 struct channel_state_sample *stored_sample;
2220
2221 /* Update the sample stored. */
2222 stored_sample = caa_container_of(node,
2223 struct channel_state_sample,
2224 channel_state_ht_node);
2225 memcpy(&previous_sample, stored_sample,
2226 sizeof(previous_sample));
2227 stored_sample->highest_usage = latest_sample.highest_usage;
2228 stored_sample->lowest_usage = latest_sample.lowest_usage;
2229 previous_sample_available = true;
2230 } else {
2231 /*
2232 * This is the channel's first sample, allocate space for and
2233 * store the new sample.
2234 */
2235 struct channel_state_sample *stored_sample;
2236
2237 stored_sample = zmalloc(sizeof(*stored_sample));
2238 if (!stored_sample) {
2239 ret = -1;
2240 goto end_unlock;
2241 }
2242
2243 memcpy(stored_sample, &latest_sample, sizeof(*stored_sample));
2244 cds_lfht_node_init(&stored_sample->channel_state_ht_node);
2245 cds_lfht_add(state->channel_state_ht,
2246 hash_channel_key(&stored_sample->key),
2247 &stored_sample->channel_state_ht_node);
2248 }
2249
2250 /* Find triggers associated with this channel. */
2251 cds_lfht_lookup(state->channel_triggers_ht,
2252 hash_channel_key(&latest_sample.key),
2253 match_channel_trigger_list,
2254 &latest_sample.key,
2255 &iter);
2256 node = cds_lfht_iter_get_node(&iter);
2257 if (!node) {
2258 goto end_unlock;
2259 }
2260
2261 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
2262 channel_triggers_ht_node);
2263 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
2264 node) {
2265 struct lttng_condition *condition;
2266 struct lttng_action *action;
2267 struct lttng_trigger *trigger;
2268 struct notification_client_list *client_list;
2269 struct lttng_evaluation *evaluation = NULL;
2270
2271 trigger = trigger_list_element->trigger;
2272 condition = lttng_trigger_get_condition(trigger);
2273 assert(condition);
2274 action = lttng_trigger_get_action(trigger);
2275
2276 /* Notify actions are the only type currently supported. */
2277 assert(lttng_action_get_type(action) ==
2278 LTTNG_ACTION_TYPE_NOTIFY);
2279
2280 /*
2281 * Check if any client is subscribed to the result of this
2282 * evaluation.
2283 */
2284 cds_lfht_lookup(state->notification_trigger_clients_ht,
2285 lttng_condition_hash(condition),
2286 match_client_list,
2287 trigger,
2288 &iter);
2289 node = cds_lfht_iter_get_node(&iter);
2290 assert(node);
2291
2292 client_list = caa_container_of(node,
2293 struct notification_client_list,
2294 notification_trigger_ht_node);
2295 if (cds_list_empty(&client_list->list)) {
2296 /*
2297 * No clients interested in the evaluation's result,
2298 * skip it.
2299 */
2300 continue;
2301 }
2302
2303 ret = evaluate_condition(condition, &evaluation, state,
2304 previous_sample_available ? &previous_sample : NULL,
2305 &latest_sample, channel_info->capacity);
2306 if (ret) {
2307 goto end_unlock;
2308 }
2309
2310 if (!evaluation) {
2311 continue;
2312 }
2313
2314 /* Dispatch evaluation result to all clients. */
2315 ret = send_evaluation_to_clients(trigger_list_element->trigger,
2316 evaluation, client_list, state,
2317 channel_info->uid, channel_info->gid);
2318 if (ret) {
2319 goto end_unlock;
2320 }
2321 }
2322end_unlock:
2323 rcu_read_unlock();
2324end:
2325 return ret;
2326}
This page took 0.112939 seconds and 4 git commands to generate.