Add the sessiond notification-handling subsystem
[lttng-tools.git] / src / bin / lttng-sessiond / notification-thread-events.c
CommitLineData
ab0ee2ca
JG
1/*
2 * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18#define _LGPL_SOURCE
19#include <urcu.h>
20#include <urcu/rculfhash.h>
21
22#include "notification-thread.h"
23#include "notification-thread-events.h"
24#include "notification-thread-commands.h"
25#include <common/defaults.h>
26#include <common/error.h>
27#include <common/futex.h>
28#include <common/unix.h>
29#include <common/dynamic-buffer.h>
30#include <common/hashtable/utils.h>
31#include <common/sessiond-comm/sessiond-comm.h>
32#include <common/macros.h>
33#include <lttng/condition/condition.h>
34#include <lttng/action/action.h>
35#include <lttng/notification/notification-internal.h>
36#include <lttng/condition/condition-internal.h>
37#include <lttng/condition/buffer-usage-internal.h>
38#include <lttng/notification/channel-internal.h>
39#include <time.h>
40#include <unistd.h>
41#include <assert.h>
42#include <inttypes.h>
43
44#define CLIENT_POLL_MASK_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
45#define CLIENT_POLL_MASK_IN_OUT (CLIENT_POLL_MASK_IN | LPOLLOUT)
46
47struct lttng_trigger_list_element {
48 struct lttng_trigger *trigger;
49 struct cds_list_head node;
50};
51
52struct lttng_channel_trigger_list {
53 struct channel_key channel_key;
54 struct cds_list_head list;
55 struct cds_lfht_node channel_triggers_ht_node;
56};
57
58struct lttng_trigger_ht_element {
59 struct lttng_trigger *trigger;
60 struct cds_lfht_node node;
61};
62
63struct lttng_condition_list_element {
64 struct lttng_condition *condition;
65 struct cds_list_head node;
66};
67
68struct notification_client_list_element {
69 struct notification_client *client;
70 struct cds_list_head node;
71};
72
73struct notification_client_list {
74 struct lttng_trigger *trigger;
75 struct cds_list_head list;
76 struct cds_lfht_node notification_trigger_ht_node;
77};
78
79struct notification_client {
80 int socket;
81 /* Client protocol version. */
82 uint8_t major, minor;
83 uid_t uid;
84 gid_t gid;
85 /*
86 * Indicates if the credentials and versions of the client has been
87 * checked.
88 */
89 bool validated;
90 /*
91 * Conditions to which the client's notification channel is subscribed.
92 * List of struct lttng_condition_list_node. The condition member is
93 * owned by the client.
94 */
95 struct cds_list_head condition_list;
96 struct cds_lfht_node client_socket_ht_node;
97 struct {
98 struct {
99 struct lttng_dynamic_buffer buffer;
100 /* Bytes left to receive for the current message. */
101 size_t bytes_to_receive;
102 /* Type of the message being received. */
103 enum lttng_notification_channel_message_type msg_type;
104 /*
105 * Indicates whether or not credentials are expected
106 * from the client.
107 */
108 bool receive_creds;
109 /*
110 * Indicates whether or not credentials were received
111 * from the client.
112 */
113 bool creds_received;
114 lttng_sock_cred creds;
115 } inbound;
116 struct {
117 /*
118 * Indicates whether or not a notification addressed to
119 * this client was dropped because a command reply was
120 * already buffered.
121 *
122 * A notification is dropped whenever the buffer is not
123 * empty.
124 */
125 bool dropped_notification;
126 /*
127 * Indicates whether or not a command reply is already
128 * buffered. In this case, it means that the client is
129 * not consuming command replies before emitting a new
130 * one. This could be caused by a protocol error or a
131 * misbehaving/malicious client.
132 */
133 bool queued_command_reply;
134 struct lttng_dynamic_buffer buffer;
135 } outbound;
136 } communication;
137};
138
139struct channel_state_sample {
140 struct channel_key key;
141 struct cds_lfht_node channel_state_ht_node;
142 uint64_t highest_usage;
143 uint64_t lowest_usage;
144};
145
146static
147int match_client(struct cds_lfht_node *node, const void *key)
148{
149 /* This double-cast is intended to supress pointer-to-cast warning. */
150 int socket = (int) (intptr_t) key;
151 struct notification_client *client;
152
153 client = caa_container_of(node, struct notification_client,
154 client_socket_ht_node);
155
156 return !!(client->socket == socket);
157}
158
159static
160int match_channel_trigger_list(struct cds_lfht_node *node, const void *key)
161{
162 struct channel_key *channel_key = (struct channel_key *) key;
163 struct lttng_channel_trigger_list *trigger_list;
164
165 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
166 channel_triggers_ht_node);
167
168 return !!((channel_key->key == trigger_list->channel_key.key) &&
169 (channel_key->domain == trigger_list->channel_key.domain));
170}
171
172static
173int match_channel_state_sample(struct cds_lfht_node *node, const void *key)
174{
175 struct channel_key *channel_key = (struct channel_key *) key;
176 struct channel_state_sample *sample;
177
178 sample = caa_container_of(node, struct channel_state_sample,
179 channel_state_ht_node);
180
181 return !!((channel_key->key == sample->key.key) &&
182 (channel_key->domain == sample->key.domain));
183}
184
185static
186int match_channel_info(struct cds_lfht_node *node, const void *key)
187{
188 struct channel_key *channel_key = (struct channel_key *) key;
189 struct channel_info *channel_info;
190
191 channel_info = caa_container_of(node, struct channel_info,
192 channels_ht_node);
193
194 return !!((channel_key->key == channel_info->key.key) &&
195 (channel_key->domain == channel_info->key.domain));
196}
197
198static
199int match_condition(struct cds_lfht_node *node, const void *key)
200{
201 struct lttng_condition *condition_key = (struct lttng_condition *) key;
202 struct lttng_trigger_ht_element *trigger;
203 struct lttng_condition *condition;
204
205 trigger = caa_container_of(node, struct lttng_trigger_ht_element,
206 node);
207 condition = lttng_trigger_get_condition(trigger->trigger);
208 assert(condition);
209
210 return !!lttng_condition_is_equal(condition_key, condition);
211}
212
213static
214int match_client_list(struct cds_lfht_node *node, const void *key)
215{
216 struct lttng_trigger *trigger_key = (struct lttng_trigger *) key;
217 struct notification_client_list *client_list;
218 struct lttng_condition *condition;
219 struct lttng_condition *condition_key = lttng_trigger_get_condition(
220 trigger_key);
221
222 assert(condition_key);
223
224 client_list = caa_container_of(node, struct notification_client_list,
225 notification_trigger_ht_node);
226 condition = lttng_trigger_get_condition(client_list->trigger);
227
228 return !!lttng_condition_is_equal(condition_key, condition);
229}
230
231static
232int match_client_list_condition(struct cds_lfht_node *node, const void *key)
233{
234 struct lttng_condition *condition_key = (struct lttng_condition *) key;
235 struct notification_client_list *client_list;
236 struct lttng_condition *condition;
237
238 assert(condition_key);
239
240 client_list = caa_container_of(node, struct notification_client_list,
241 notification_trigger_ht_node);
242 condition = lttng_trigger_get_condition(client_list->trigger);
243
244 return !!lttng_condition_is_equal(condition_key, condition);
245}
246
247static
248unsigned long lttng_condition_buffer_usage_hash(
249 struct lttng_condition *_condition)
250{
251 unsigned long hash = 0;
252 struct lttng_condition_buffer_usage *condition;
253
254 condition = container_of(_condition,
255 struct lttng_condition_buffer_usage, parent);
256
257 if (condition->session_name) {
258 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
259 }
260 if (condition->channel_name) {
261 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
262 }
263 if (condition->domain.set) {
264 hash ^= hash_key_ulong(
265 (void *) condition->domain.type,
266 lttng_ht_seed);
267 }
268 if (condition->threshold_ratio.set) {
269 uint64_t val;
270
271 val = condition->threshold_ratio.value * (double) UINT32_MAX;
272 hash ^= hash_key_u64(&val, lttng_ht_seed);
273 } else if (condition->threshold_ratio.set) {
274 uint64_t val;
275
276 val = condition->threshold_bytes.value;
277 hash ^= hash_key_u64(&val, lttng_ht_seed);
278 }
279 return hash;
280}
281
282/*
283 * The lttng_condition hashing code is kept in this file (rather than
284 * condition.c) since it makes use of GPLv2 code (hashtable utils), which we
285 * don't want to link in liblttng-ctl.
286 */
287static
288unsigned long lttng_condition_hash(struct lttng_condition *condition)
289{
290 switch (condition->type) {
291 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
292 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
293 return lttng_condition_buffer_usage_hash(condition);
294 default:
295 ERR("[notification-thread] Unexpected condition type caught");
296 abort();
297 }
298}
299
300static
301void channel_info_destroy(struct channel_info *channel_info)
302{
303 if (!channel_info) {
304 return;
305 }
306
307 if (channel_info->session_name) {
308 free(channel_info->session_name);
309 }
310 if (channel_info->channel_name) {
311 free(channel_info->channel_name);
312 }
313 free(channel_info);
314}
315
316static
317struct channel_info *channel_info_copy(struct channel_info *channel_info)
318{
319 struct channel_info *copy = zmalloc(sizeof(*channel_info));
320
321 assert(channel_info);
322 assert(channel_info->session_name);
323 assert(channel_info->channel_name);
324
325 if (!copy) {
326 goto end;
327 }
328
329 memcpy(copy, channel_info, sizeof(*channel_info));
330 copy->session_name = NULL;
331 copy->channel_name = NULL;
332
333 copy->session_name = strdup(channel_info->session_name);
334 if (!copy->session_name) {
335 goto error;
336 }
337 copy->channel_name = strdup(channel_info->channel_name);
338 if (!copy->channel_name) {
339 goto error;
340 }
341 cds_lfht_node_init(&channel_info->channels_ht_node);
342end:
343 return copy;
344error:
345 channel_info_destroy(copy);
346 return NULL;
347}
348
349static
350int notification_thread_client_subscribe(struct notification_client *client,
351 struct lttng_condition *condition,
352 struct notification_thread_state *state,
353 enum lttng_notification_channel_status *_status)
354{
355 int ret = 0;
356 struct cds_lfht_iter iter;
357 struct cds_lfht_node *node;
358 struct notification_client_list *client_list;
359 struct lttng_condition_list_element *condition_list_element = NULL;
360 struct notification_client_list_element *client_list_element = NULL;
361 enum lttng_notification_channel_status status =
362 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
363
364 /*
365 * Ensure that the client has not already subscribed to this condition
366 * before.
367 */
368 cds_list_for_each_entry(condition_list_element, &client->condition_list, node) {
369 if (lttng_condition_is_equal(condition_list_element->condition,
370 condition)) {
371 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ALREADY_SUBSCRIBED;
372 goto end;
373 }
374 }
375
376 condition_list_element = zmalloc(sizeof(*condition_list_element));
377 if (!condition_list_element) {
378 ret = -1;
379 goto error;
380 }
381 client_list_element = zmalloc(sizeof(*client_list_element));
382 if (!client_list_element) {
383 ret = -1;
384 goto error;
385 }
386
387 rcu_read_lock();
388
389 /*
390 * Add the newly-subscribed condition to the client's subscription list.
391 */
392 CDS_INIT_LIST_HEAD(&condition_list_element->node);
393 condition_list_element->condition = condition;
394 cds_list_add(&condition_list_element->node, &client->condition_list);
395
396 /*
397 * Add the client to the list of clients interested in a given trigger
398 * if a "notification" trigger with a corresponding condition was
399 * added prior.
400 */
401 cds_lfht_lookup(state->notification_trigger_clients_ht,
402 lttng_condition_hash(condition),
403 match_client_list_condition,
404 condition,
405 &iter);
406 node = cds_lfht_iter_get_node(&iter);
407 if (!node) {
408 goto end_unlock;
409 }
410
411 client_list = caa_container_of(node, struct notification_client_list,
412 notification_trigger_ht_node);
413 client_list_element->client = client;
414 CDS_INIT_LIST_HEAD(&client_list_element->node);
415 cds_list_add(&client_list_element->node, &client_list->list);
416end_unlock:
417 rcu_read_unlock();
418end:
419 if (_status) {
420 *_status = status;
421 }
422 return ret;
423error:
424 free(condition_list_element);
425 free(client_list_element);
426 return ret;
427}
428
429static
430int notification_thread_client_unsubscribe(
431 struct notification_client *client,
432 struct lttng_condition *condition,
433 struct notification_thread_state *state,
434 enum lttng_notification_channel_status *_status)
435{
436 struct cds_lfht_iter iter;
437 struct cds_lfht_node *node;
438 struct notification_client_list *client_list;
439 struct lttng_condition_list_element *condition_list_element,
440 *condition_tmp;
441 struct notification_client_list_element *client_list_element,
442 *client_tmp;
443 bool condition_found = false;
444 enum lttng_notification_channel_status status =
445 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
446
447 /* Remove the condition from the client's condition list. */
448 cds_list_for_each_entry_safe(condition_list_element, condition_tmp,
449 &client->condition_list, node) {
450 if (!lttng_condition_is_equal(condition_list_element->condition,
451 condition)) {
452 continue;
453 }
454
455 cds_list_del(&condition_list_element->node);
456 /*
457 * The caller may be iterating on the client's conditions to
458 * tear down a client's connection. In this case, the condition
459 * will be destroyed at the end.
460 */
461 if (condition != condition_list_element->condition) {
462 lttng_condition_destroy(
463 condition_list_element->condition);
464 }
465 free(condition_list_element);
466 condition_found = true;
467 break;
468 }
469
470 if (!condition_found) {
471 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNKNOWN_CONDITION;
472 goto end;
473 }
474
475 /*
476 * Remove the client from the list of clients interested the trigger
477 * matching the condition.
478 */
479 rcu_read_lock();
480 cds_lfht_lookup(state->notification_trigger_clients_ht,
481 lttng_condition_hash(condition),
482 match_client_list_condition,
483 condition,
484 &iter);
485 node = cds_lfht_iter_get_node(&iter);
486 if (!node) {
487 goto end_unlock;
488 }
489
490 client_list = caa_container_of(node, struct notification_client_list,
491 notification_trigger_ht_node);
492 cds_list_for_each_entry_safe(client_list_element, client_tmp,
493 &client_list->list, node) {
494 if (client_list_element->client->socket != client->socket) {
495 continue;
496 }
497 cds_list_del(&client_list_element->node);
498 free(client_list_element);
499 break;
500 }
501end_unlock:
502 rcu_read_unlock();
503end:
504 lttng_condition_destroy(condition);
505 if (_status) {
506 *_status = status;
507 }
508 return 0;
509}
510
511static
512void notification_client_destroy(struct notification_client *client,
513 struct notification_thread_state *state)
514{
515 struct lttng_condition_list_element *condition_list_element, *tmp;
516
517 if (!client) {
518 return;
519 }
520
521 /* Release all conditions to which the client was subscribed. */
522 cds_list_for_each_entry_safe(condition_list_element, tmp,
523 &client->condition_list, node) {
524 (void) notification_thread_client_unsubscribe(client,
525 condition_list_element->condition, state, NULL);
526 }
527
528 if (client->socket >= 0) {
529 (void) lttcomm_close_unix_sock(client->socket);
530 }
531 lttng_dynamic_buffer_reset(&client->communication.inbound.buffer);
532 lttng_dynamic_buffer_reset(&client->communication.outbound.buffer);
533 free(client);
534}
535
536/*
537 * Call with rcu_read_lock held (and hold for the lifetime of the returned
538 * client pointer).
539 */
540static
541struct notification_client *get_client_from_socket(int socket,
542 struct notification_thread_state *state)
543{
544 struct cds_lfht_iter iter;
545 struct cds_lfht_node *node;
546 struct notification_client *client = NULL;
547
548 cds_lfht_lookup(state->client_socket_ht,
549 hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed),
550 match_client,
551 (void *) (unsigned long) socket,
552 &iter);
553 node = cds_lfht_iter_get_node(&iter);
554 if (!node) {
555 goto end;
556 }
557
558 client = caa_container_of(node, struct notification_client,
559 client_socket_ht_node);
560end:
561 return client;
562}
563
564static
565bool trigger_applies_to_channel(struct lttng_trigger *trigger,
566 struct channel_info *info)
567{
568 enum lttng_condition_status status;
569 struct lttng_condition *condition;
570 const char *trigger_session_name = NULL;
571 const char *trigger_channel_name = NULL;
572 enum lttng_domain_type trigger_domain;
573
574 condition = lttng_trigger_get_condition(trigger);
575 if (!condition) {
576 goto fail;
577 }
578
579 switch (lttng_condition_get_type(condition)) {
580 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
581 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
582 break;
583 default:
584 goto fail;
585 }
586
587 status = lttng_condition_buffer_usage_get_domain_type(condition,
588 &trigger_domain);
589 assert(status == LTTNG_CONDITION_STATUS_OK);
590 if (info->key.domain != trigger_domain) {
591 goto fail;
592 }
593
594 status = lttng_condition_buffer_usage_get_session_name(
595 condition, &trigger_session_name);
596 assert((status == LTTNG_CONDITION_STATUS_OK) && trigger_session_name);
597
598 status = lttng_condition_buffer_usage_get_channel_name(
599 condition, &trigger_channel_name);
600 assert((status == LTTNG_CONDITION_STATUS_OK) && trigger_channel_name);
601
602 if (strcmp(info->session_name, trigger_session_name)) {
603 goto fail;
604 }
605 if (strcmp(info->channel_name, trigger_channel_name)) {
606 goto fail;
607 }
608
609 return true;
610fail:
611 return false;
612}
613
614static
615bool trigger_applies_to_client(struct lttng_trigger *trigger,
616 struct notification_client *client)
617{
618 bool applies = false;
619 struct lttng_condition_list_element *condition_list_element;
620
621 cds_list_for_each_entry(condition_list_element, &client->condition_list,
622 node) {
623 applies = lttng_condition_is_equal(
624 condition_list_element->condition,
625 lttng_trigger_get_condition(trigger));
626 if (applies) {
627 break;
628 }
629 }
630 return applies;
631}
632
633static
634unsigned long hash_channel_key(struct channel_key *key)
635{
636 return hash_key_u64(&key->key, lttng_ht_seed) ^ hash_key_ulong(
637 (void *) (unsigned long) key->domain, lttng_ht_seed);
638}
639
640static
641int handle_notification_thread_command_add_channel(
642 struct notification_thread_state *state,
643 struct channel_info *channel_info,
644 enum lttng_error_code *cmd_result)
645{
646 struct cds_list_head trigger_list;
647 struct channel_info *new_channel_info;
648 struct channel_key *channel_key;
649 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
650 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
651 int trigger_count = 0;
652 struct cds_lfht_iter iter;
653
654 DBG("[notification-thread] Adding channel %s from session %s, channel key = %" PRIu64 " in %s domain",
655 channel_info->channel_name, channel_info->session_name,
656 channel_info->key.key, channel_info->key.domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
657
658 CDS_INIT_LIST_HEAD(&trigger_list);
659
660 new_channel_info = channel_info_copy(channel_info);
661 if (!new_channel_info) {
662 goto error;
663 }
664
665 channel_key = &new_channel_info->key;
666
667 /* Build a list of all triggers applying to the new channel. */
668 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
669 node) {
670 struct lttng_trigger_list_element *new_element;
671
672 if (!trigger_applies_to_channel(trigger_ht_element->trigger,
673 channel_info)) {
674 continue;
675 }
676
677 new_element = zmalloc(sizeof(*new_element));
678 if (!new_element) {
679 goto error;
680 }
681 CDS_INIT_LIST_HEAD(&new_element->node);
682 new_element->trigger = trigger_ht_element->trigger;
683 cds_list_add(&new_element->node, &trigger_list);
684 trigger_count++;
685 }
686
687 DBG("[notification-thread] Found %i triggers that apply to newly added channel",
688 trigger_count);
689 channel_trigger_list = zmalloc(sizeof(*channel_trigger_list));
690 if (!channel_trigger_list) {
691 goto error;
692 }
693 channel_trigger_list->channel_key = *channel_key;
694 CDS_INIT_LIST_HEAD(&channel_trigger_list->list);
695 cds_lfht_node_init(&channel_trigger_list->channel_triggers_ht_node);
696 cds_list_splice(&trigger_list, &channel_trigger_list->list);
697
698 rcu_read_lock();
699 /* Add channel to the channel_ht which owns the channel_infos. */
700 cds_lfht_add(state->channels_ht,
701 hash_channel_key(channel_key),
702 &new_channel_info->channels_ht_node);
703 /*
704 * Add the list of triggers associated with this channel to the
705 * channel_triggers_ht.
706 */
707 cds_lfht_add(state->channel_triggers_ht,
708 hash_channel_key(channel_key),
709 &channel_trigger_list->channel_triggers_ht_node);
710 rcu_read_unlock();
711 *cmd_result = LTTNG_OK;
712 return 0;
713error:
714 /* Empty trigger list */
715 channel_info_destroy(new_channel_info);
716 return 1;
717}
718
719static
720int handle_notification_thread_command_remove_channel(
721 struct notification_thread_state *state,
722 uint64_t channel_key, enum lttng_domain_type domain,
723 enum lttng_error_code *cmd_result)
724{
725 struct cds_lfht_node *node;
726 struct cds_lfht_iter iter;
727 struct lttng_channel_trigger_list *trigger_list;
728 struct lttng_trigger_list_element *trigger_list_element, *tmp;
729 struct channel_key key = { .key = channel_key, .domain = domain };
730 struct channel_info *channel_info;
731
732 DBG("[notification-thread] Removing channel key = %" PRIu64 " in %s domain",
733 channel_key, domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
734
735 rcu_read_lock();
736
737 cds_lfht_lookup(state->channel_triggers_ht,
738 hash_channel_key(&key),
739 match_channel_trigger_list,
740 &key,
741 &iter);
742 node = cds_lfht_iter_get_node(&iter);
743 /*
744 * There is a severe internal error if we are being asked to remove a
745 * channel that doesn't exist.
746 */
747 if (!node) {
748 ERR("[notification-thread] Channel being removed is unknown to the notification thread");
749 goto end;
750 }
751
752 /* Free the list of triggers associated with this channel. */
753 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
754 channel_triggers_ht_node);
755 cds_list_for_each_entry_safe(trigger_list_element, tmp,
756 &trigger_list->list, node) {
757 cds_list_del(&trigger_list_element->node);
758 free(trigger_list_element);
759 }
760 cds_lfht_del(state->channel_triggers_ht, node);
761 free(trigger_list);
762
763 /* Free sampled channel state. */
764 cds_lfht_lookup(state->channel_state_ht,
765 hash_channel_key(&key),
766 match_channel_state_sample,
767 &key,
768 &iter);
769 node = cds_lfht_iter_get_node(&iter);
770 /*
771 * This is expected to be NULL if the channel is destroyed before we
772 * received a sample.
773 */
774 if (node) {
775 struct channel_state_sample *sample = caa_container_of(node,
776 struct channel_state_sample,
777 channel_state_ht_node);
778
779 cds_lfht_del(state->channel_state_ht, node);
780 free(sample);
781 }
782
783 /* Remove the channel from the channels_ht and free it. */
784 cds_lfht_lookup(state->channels_ht,
785 hash_channel_key(&key),
786 match_channel_info,
787 &key,
788 &iter);
789 node = cds_lfht_iter_get_node(&iter);
790 assert(node);
791 channel_info = caa_container_of(node, struct channel_info,
792 channels_ht_node);
793 cds_lfht_del(state->channels_ht, node);
794 channel_info_destroy(channel_info);
795end:
796 rcu_read_unlock();
797 *cmd_result = LTTNG_OK;
798 return 0;
799}
800
801/*
802 * FIXME A client's credentials are not checked when registering a trigger, nor
803 * are they stored alongside with the trigger.
804 *
805 * The effects of this are benign:
806 * - The client will succeed in registering the trigger, as it is valid,
807 * - The trigger will, internally, be bound to the channel,
808 * - The notifications will not be sent since the client's credentials
809 * are checked against the channel at that moment.
810 */
811static
812int handle_notification_thread_command_register_trigger(
813 struct notification_thread_state *state,
814 struct lttng_trigger *trigger,
815 enum lttng_error_code *cmd_result)
816{
817 int ret = 0;
818 struct lttng_condition *condition;
819 struct notification_client *client;
820 struct notification_client_list *client_list = NULL;
821 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
822 struct notification_client_list_element *client_list_element, *tmp;
823 struct cds_lfht_node *node;
824 struct cds_lfht_iter iter;
825 struct channel_info *channel;
826 bool free_trigger = true;
827
828 rcu_read_lock();
829
830 condition = lttng_trigger_get_condition(trigger);
831 trigger_ht_element = zmalloc(sizeof(*trigger_ht_element));
832 if (!trigger_ht_element) {
833 ret = -1;
834 goto error;
835 }
836
837 /* Add trigger to the trigger_ht. */
838 cds_lfht_node_init(&trigger_ht_element->node);
839 trigger_ht_element->trigger = trigger;
840
841 node = cds_lfht_add_unique(state->triggers_ht,
842 lttng_condition_hash(condition),
843 match_condition,
844 condition,
845 &trigger_ht_element->node);
846 if (node != &trigger_ht_element->node) {
847 /* Not a fatal error, simply report it to the client. */
848 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
849 goto error_free_ht_element;
850 }
851
852 /*
853 * Ownership of the trigger and of its wrapper was transfered to
854 * the triggers_ht.
855 */
856 trigger_ht_element = NULL;
857 free_trigger = false;
858
859 /*
860 * The rest only applies to triggers that have a "notify" action.
861 * It is not skipped as this is the only action type currently
862 * supported.
863 */
864 client_list = zmalloc(sizeof(*client_list));
865 if (!client_list) {
866 ret = -1;
867 goto error_free_ht_element;
868 }
869 cds_lfht_node_init(&client_list->notification_trigger_ht_node);
870 CDS_INIT_LIST_HEAD(&client_list->list);
871 client_list->trigger = trigger;
872
873 /* Build a list of clients to which this new trigger applies. */
874 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
875 client_socket_ht_node) {
876 if (!trigger_applies_to_client(trigger, client)) {
877 continue;
878 }
879
880 client_list_element = zmalloc(sizeof(*client_list_element));
881 if (!client_list_element) {
882 ret = -1;
883 goto error_free_client_list;
884 }
885 CDS_INIT_LIST_HEAD(&client_list_element->node);
886 client_list_element->client = client;
887 cds_list_add(&client_list_element->node, &client_list->list);
888 }
889
890 cds_lfht_add(state->notification_trigger_clients_ht,
891 lttng_condition_hash(condition),
892 &client_list->notification_trigger_ht_node);
893 /*
894 * Client list ownership transferred to the
895 * notification_trigger_clients_ht.
896 */
897 client_list = NULL;
898
899 /*
900 * Add the trigger to list of triggers bound to the channels currently
901 * known.
902 */
903 cds_lfht_for_each_entry(state->channels_ht, &iter, channel,
904 channels_ht_node) {
905 struct lttng_trigger_list_element *trigger_list_element;
906 struct lttng_channel_trigger_list *trigger_list;
907
908 if (!trigger_applies_to_channel(trigger, channel)) {
909 continue;
910 }
911
912 cds_lfht_lookup(state->channel_triggers_ht,
913 hash_channel_key(&channel->key),
914 match_channel_trigger_list,
915 &channel->key,
916 &iter);
917 node = cds_lfht_iter_get_node(&iter);
918 assert(node);
919 /* Free the list of triggers associated with this channel. */
920 trigger_list = caa_container_of(node,
921 struct lttng_channel_trigger_list,
922 channel_triggers_ht_node);
923
924 trigger_list_element = zmalloc(sizeof(*trigger_list_element));
925 if (!trigger_list_element) {
926 ret = -1;
927 goto error_free_client_list;
928 }
929 CDS_INIT_LIST_HEAD(&trigger_list_element->node);
930 trigger_list_element->trigger = trigger;
931 cds_list_add(&trigger_list_element->node, &trigger_list->list);
932 /* A trigger can only apply to one channel. */
933 break;
934 }
935
936 *cmd_result = LTTNG_OK;
937error_free_client_list:
938 if (client_list) {
939 cds_list_for_each_entry_safe(client_list_element, tmp,
940 &client_list->list, node) {
941 free(client_list_element);
942 }
943 free(client_list);
944 }
945error_free_ht_element:
946 free(trigger_ht_element);
947error:
948 if (free_trigger) {
949 struct lttng_action *action = lttng_trigger_get_action(trigger);
950
951 lttng_condition_destroy(condition);
952 lttng_action_destroy(action);
953 lttng_trigger_destroy(trigger);
954 }
955 rcu_read_unlock();
956 return ret;
957}
958
959int handle_notification_thread_command_unregister_trigger(
960 struct notification_thread_state *state,
961 struct lttng_trigger *trigger,
962 enum lttng_error_code *_cmd_reply)
963{
964 struct cds_lfht_iter iter;
965 struct cds_lfht_node *node, *triggers_ht_node;
966 struct lttng_channel_trigger_list *trigger_list;
967 struct notification_client_list *client_list;
968 struct notification_client_list_element *client_list_element, *tmp;
969 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
970 struct lttng_condition *condition = lttng_trigger_get_condition(
971 trigger);
972 struct lttng_action *action;
973 enum lttng_error_code cmd_reply;
974
975 rcu_read_lock();
976
977 cds_lfht_lookup(state->triggers_ht,
978 lttng_condition_hash(condition),
979 match_condition,
980 condition,
981 &iter);
982 triggers_ht_node = cds_lfht_iter_get_node(&iter);
983 if (!triggers_ht_node) {
984 cmd_reply = LTTNG_ERR_TRIGGER_NOT_FOUND;
985 goto end;
986 } else {
987 cmd_reply = LTTNG_OK;
988 }
989
990 /* Remove trigger from channel_triggers_ht. */
991 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter, trigger_list,
992 channel_triggers_ht_node) {
993 struct lttng_trigger_list_element *trigger_element, *tmp;
994
995 cds_list_for_each_entry_safe(trigger_element, tmp,
996 &trigger_list->list, node) {
997 struct lttng_condition *current_condition =
998 lttng_trigger_get_condition(
999 trigger_element->trigger);
1000
1001 assert(current_condition);
1002 if (!lttng_condition_is_equal(condition,
1003 current_condition)) {
1004 continue;
1005 }
1006
1007 DBG("[notification-thread] Removed trigger from channel_triggers_ht");
1008 cds_list_del(&trigger_element->node);
1009 }
1010 }
1011
1012 /*
1013 * Remove and release the client list from
1014 * notification_trigger_clients_ht.
1015 */
1016 cds_lfht_lookup(state->notification_trigger_clients_ht,
1017 lttng_condition_hash(condition),
1018 match_client_list,
1019 trigger,
1020 &iter);
1021 node = cds_lfht_iter_get_node(&iter);
1022 assert(node);
1023 client_list = caa_container_of(node, struct notification_client_list,
1024 notification_trigger_ht_node);
1025 cds_list_for_each_entry_safe(client_list_element, tmp,
1026 &client_list->list, node) {
1027 free(client_list_element);
1028 }
1029 cds_lfht_del(state->notification_trigger_clients_ht, node);
1030 free(client_list);
1031
1032 /* Remove trigger from triggers_ht. */
1033 trigger_ht_element = caa_container_of(triggers_ht_node,
1034 struct lttng_trigger_ht_element, node);
1035 cds_lfht_del(state->triggers_ht, triggers_ht_node);
1036
1037 condition = lttng_trigger_get_condition(trigger_ht_element->trigger);
1038 lttng_condition_destroy(condition);
1039 action = lttng_trigger_get_action(trigger_ht_element->trigger);
1040 lttng_action_destroy(action);
1041 lttng_trigger_destroy(trigger_ht_element->trigger);
1042 free(trigger_ht_element);
1043end:
1044 rcu_read_unlock();
1045 if (_cmd_reply) {
1046 *_cmd_reply = cmd_reply;
1047 }
1048 return 0;
1049}
1050
1051/* Returns 0 on success, 1 on exit requested, negative value on error. */
1052int handle_notification_thread_command(
1053 struct notification_thread_handle *handle,
1054 struct notification_thread_state *state)
1055{
1056 int ret;
1057 uint64_t counter;
1058 struct notification_thread_command *cmd;
1059
1060 /* Read event_fd to put it back into a quiescent state. */
1061 ret = read(handle->cmd_queue.event_fd, &counter, sizeof(counter));
1062 if (ret == -1) {
1063 goto error;
1064 }
1065
1066 pthread_mutex_lock(&handle->cmd_queue.lock);
1067 cmd = cds_list_first_entry(&handle->cmd_queue.list,
1068 struct notification_thread_command, cmd_list_node);
1069 switch (cmd->type) {
1070 case NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER:
1071 DBG("[notification-thread] Received register trigger command");
1072 ret = handle_notification_thread_command_register_trigger(
1073 state, cmd->parameters.trigger,
1074 &cmd->reply_code);
1075 break;
1076 case NOTIFICATION_COMMAND_TYPE_UNREGISTER_TRIGGER:
1077 DBG("[notification-thread] Received unregister trigger command");
1078 ret = handle_notification_thread_command_unregister_trigger(
1079 state, cmd->parameters.trigger,
1080 &cmd->reply_code);
1081 break;
1082 case NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL:
1083 DBG("[notification-thread] Received add channel command");
1084 ret = handle_notification_thread_command_add_channel(
1085 state, &cmd->parameters.add_channel,
1086 &cmd->reply_code);
1087 break;
1088 case NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL:
1089 DBG("[notification-thread] Received remove channel command");
1090 ret = handle_notification_thread_command_remove_channel(
1091 state, cmd->parameters.remove_channel.key,
1092 cmd->parameters.remove_channel.domain,
1093 &cmd->reply_code);
1094 break;
1095 case NOTIFICATION_COMMAND_TYPE_QUIT:
1096 DBG("[notification-thread] Received quit command");
1097 cmd->reply_code = LTTNG_OK;
1098 ret = 1;
1099 goto end;
1100 default:
1101 ERR("[notification-thread] Unknown internal command received");
1102 goto error_unlock;
1103 }
1104
1105 if (ret) {
1106 goto error_unlock;
1107 }
1108end:
1109 cds_list_del(&cmd->cmd_list_node);
1110 futex_nto1_wake(&cmd->reply_futex);
1111 pthread_mutex_unlock(&handle->cmd_queue.lock);
1112 return ret;
1113error_unlock:
1114 /* Wake-up and return a fatal error to the calling thread. */
1115 futex_nto1_wake(&cmd->reply_futex);
1116 pthread_mutex_unlock(&handle->cmd_queue.lock);
1117 cmd->reply_code = LTTNG_ERR_FATAL;
1118error:
1119 /* Indicate a fatal error to the caller. */
1120 return -1;
1121}
1122
1123static
1124unsigned long hash_client_socket(int socket)
1125{
1126 return hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed);
1127}
1128
1129static
1130int socket_set_non_blocking(int socket)
1131{
1132 int ret, flags;
1133
1134 /* Set the pipe as non-blocking. */
1135 ret = fcntl(socket, F_GETFL, 0);
1136 if (ret == -1) {
1137 PERROR("fcntl get socket flags");
1138 goto end;
1139 }
1140 flags = ret;
1141
1142 ret = fcntl(socket, F_SETFL, flags | O_NONBLOCK);
1143 if (ret == -1) {
1144 PERROR("fcntl set O_NONBLOCK socket flag");
1145 goto end;
1146 }
1147 DBG("Client socket (fd = %i) set as non-blocking", socket);
1148end:
1149 return ret;
1150}
1151
1152static
1153void client_reset_inbound_state(struct notification_client *client)
1154{
1155 int ret;
1156
1157 ret = lttng_dynamic_buffer_set_size(
1158 &client->communication.inbound.buffer, 0);
1159 assert(!ret);
1160
1161 client->communication.inbound.bytes_to_receive =
1162 sizeof(struct lttng_notification_channel_message);
1163 client->communication.inbound.msg_type =
1164 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN;
1165 client->communication.inbound.receive_creds = false;
1166 LTTNG_SOCK_SET_UID_CRED(&client->communication.inbound.creds, -1);
1167 LTTNG_SOCK_SET_GID_CRED(&client->communication.inbound.creds, -1);
1168}
1169
1170int handle_notification_thread_client_connect(
1171 struct notification_thread_state *state)
1172{
1173 int ret;
1174 struct notification_client *client;
1175
1176 DBG("[notification-thread] Handling new notification channel client connection");
1177
1178 client = zmalloc(sizeof(*client));
1179 if (!client) {
1180 /* Fatal error. */
1181 ret = -1;
1182 goto error;
1183 }
1184 CDS_INIT_LIST_HEAD(&client->condition_list);
1185 lttng_dynamic_buffer_init(&client->communication.inbound.buffer);
1186 lttng_dynamic_buffer_init(&client->communication.outbound.buffer);
1187 client_reset_inbound_state(client);
1188
1189 ret = lttcomm_accept_unix_sock(state->notification_channel_socket);
1190 if (ret < 0) {
1191 ERR("[notification-thread] Failed to accept new notification channel client connection");
1192 ret = 0;
1193 goto error;
1194 }
1195
1196 client->socket = ret;
1197
1198 ret = socket_set_non_blocking(client->socket);
1199 if (ret) {
1200 ERR("[notification-thread] Failed to set new notification channel client connection socket as non-blocking");
1201 goto error;
1202 }
1203
1204 ret = lttcomm_setsockopt_creds_unix_sock(client->socket);
1205 if (ret < 0) {
1206 ERR("[notification-thread] Failed to set socket options on new notification channel client socket");
1207 ret = 0;
1208 goto error;
1209 }
1210
1211 ret = lttng_poll_add(&state->events, client->socket,
1212 LPOLLIN | LPOLLERR |
1213 LPOLLHUP | LPOLLRDHUP);
1214 if (ret < 0) {
1215 ERR("[notification-thread] Failed to add notification channel client socket to poll set");
1216 ret = 0;
1217 goto error;
1218 }
1219 DBG("[notification-thread] Added new notification channel client socket (%i) to poll set",
1220 client->socket);
1221
1222 /* Add to ht. */
1223 rcu_read_lock();
1224 cds_lfht_add(state->client_socket_ht,
1225 hash_client_socket(client->socket),
1226 &client->client_socket_ht_node);
1227 rcu_read_unlock();
1228
1229 return ret;
1230error:
1231 notification_client_destroy(client, state);
1232 return ret;
1233}
1234
1235int handle_notification_thread_client_disconnect(
1236 int client_socket,
1237 struct notification_thread_state *state)
1238{
1239 int ret = 0;
1240 struct notification_client *client;
1241
1242 rcu_read_lock();
1243 DBG("[notification-thread] Closing client connection (socket fd = %i)",
1244 client_socket);
1245 client = get_client_from_socket(client_socket, state);
1246 if (!client) {
1247 /* Internal state corruption, fatal error. */
1248 ERR("[notification-thread] Unable to find client (socket fd = %i)",
1249 client_socket);
1250 ret = -1;
1251 goto end;
1252 }
1253
1254 ret = lttng_poll_del(&state->events, client_socket);
1255 if (ret) {
1256 ERR("[notification-thread] Failed to remove client socket from poll set");
1257 }
1258 cds_lfht_del(state->client_socket_ht,
1259 &client->client_socket_ht_node);
1260 notification_client_destroy(client, state);
1261end:
1262 rcu_read_unlock();
1263 return ret;
1264}
1265
1266int handle_notification_thread_client_disconnect_all(
1267 struct notification_thread_state *state)
1268{
1269 struct cds_lfht_iter iter;
1270 struct notification_client *client;
1271 bool error_encoutered = false;
1272
1273 rcu_read_lock();
1274 DBG("[notification-thread] Closing all client connections");
1275 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
1276 client_socket_ht_node) {
1277 int ret;
1278
1279 ret = handle_notification_thread_client_disconnect(
1280 client->socket, state);
1281 if (ret) {
1282 error_encoutered = true;
1283 }
1284 }
1285 rcu_read_unlock();
1286 return error_encoutered ? 1 : 0;
1287}
1288
1289int handle_notification_thread_trigger_unregister_all(
1290 struct notification_thread_state *state)
1291{
1292 bool error_occured = false;
1293 struct cds_lfht_iter iter;
1294 struct lttng_trigger_ht_element *trigger_ht_element;
1295
1296 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1297 node) {
1298 int ret = handle_notification_thread_command_unregister_trigger(
1299 state, trigger_ht_element->trigger, NULL);
1300 if (ret) {
1301 error_occured = true;
1302 }
1303 }
1304 return error_occured ? -1 : 0;
1305}
1306
1307static
1308int client_flush_outgoing_queue(struct notification_client *client,
1309 struct notification_thread_state *state)
1310{
1311 ssize_t ret;
1312 size_t to_send_count;
1313
1314 assert(client->communication.outbound.buffer.size != 0);
1315 to_send_count = client->communication.outbound.buffer.size;
1316 DBG("[notification-thread] Flushing client (socket fd = %i) outgoing queue",
1317 client->socket);
1318
1319 ret = lttcomm_send_unix_sock_non_block(client->socket,
1320 client->communication.outbound.buffer.data,
1321 to_send_count);
1322 if ((ret < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) ||
1323 (ret > 0 && ret < to_send_count)) {
1324 DBG("[notification-thread] Client (socket fd = %i) outgoing queue could not be completely flushed",
1325 client->socket);
1326 to_send_count -= max(ret, 0);
1327
1328 memcpy(client->communication.outbound.buffer.data,
1329 client->communication.outbound.buffer.data +
1330 client->communication.outbound.buffer.size - to_send_count,
1331 to_send_count);
1332 ret = lttng_dynamic_buffer_set_size(
1333 &client->communication.outbound.buffer,
1334 to_send_count);
1335 if (ret) {
1336 goto error;
1337 }
1338
1339 /*
1340 * We want to be notified whenever there is buffer space
1341 * available to send the rest of the payload.
1342 */
1343 ret = lttng_poll_mod(&state->events, client->socket,
1344 CLIENT_POLL_MASK_IN_OUT);
1345 if (ret) {
1346 goto error;
1347 }
1348 } else if (ret < 0) {
1349 /* Generic error, disconnect the client. */
1350 ERR("[notification-thread] Failed to send flush outgoing queue, disconnecting client (socket fd = %i)",
1351 client->socket);
1352 ret = handle_notification_thread_client_disconnect(
1353 client->socket, state);
1354 if (ret) {
1355 goto error;
1356 }
1357 } else {
1358 /* No error and flushed the queue completely. */
1359 ret = lttng_dynamic_buffer_set_size(
1360 &client->communication.outbound.buffer, 0);
1361 if (ret) {
1362 goto error;
1363 }
1364 ret = lttng_poll_mod(&state->events, client->socket,
1365 CLIENT_POLL_MASK_IN);
1366 if (ret) {
1367 goto error;
1368 }
1369
1370 client->communication.outbound.queued_command_reply = false;
1371 client->communication.outbound.dropped_notification = false;
1372 }
1373
1374 return 0;
1375error:
1376 return -1;
1377}
1378
1379static
1380int client_send_command_reply(struct notification_client *client,
1381 struct notification_thread_state *state,
1382 enum lttng_notification_channel_status status)
1383{
1384 int ret;
1385 struct lttng_notification_channel_command_reply reply = {
1386 .status = (int8_t) status,
1387 };
1388 struct lttng_notification_channel_message msg = {
1389 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY,
1390 .size = sizeof(reply),
1391 };
1392 char buffer[sizeof(msg) + sizeof(reply)];
1393
1394 if (client->communication.outbound.queued_command_reply) {
1395 /* Protocol error. */
1396 goto error;
1397 }
1398
1399 memcpy(buffer, &msg, sizeof(msg));
1400 memcpy(buffer + sizeof(msg), &reply, sizeof(reply));
1401 DBG("[notification-thread] Send command reply (%i)", (int) status);
1402
1403 /* Enqueue buffer to outgoing queue and flush it. */
1404 ret = lttng_dynamic_buffer_append(
1405 &client->communication.outbound.buffer,
1406 buffer, sizeof(buffer));
1407 if (ret) {
1408 goto error;
1409 }
1410
1411 ret = client_flush_outgoing_queue(client, state);
1412 if (ret) {
1413 goto error;
1414 }
1415
1416 if (client->communication.outbound.buffer.size != 0) {
1417 /* Queue could not be emptied. */
1418 client->communication.outbound.queued_command_reply = true;
1419 }
1420
1421 return 0;
1422error:
1423 return -1;
1424}
1425
1426static
1427int client_dispatch_message(struct notification_client *client,
1428 struct notification_thread_state *state)
1429{
1430 int ret = 0;
1431
1432 if (client->communication.inbound.msg_type !=
1433 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE &&
1434 client->communication.inbound.msg_type !=
1435 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN &&
1436 !client->validated) {
1437 WARN("[notification-thread] client attempted a command before handshake");
1438 ret = -1;
1439 goto end;
1440 }
1441
1442 switch (client->communication.inbound.msg_type) {
1443 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN:
1444 {
1445 /*
1446 * Receiving message header. The function will be called again
1447 * once the rest of the message as been received and can be
1448 * interpreted.
1449 */
1450 const struct lttng_notification_channel_message *msg;
1451
1452 assert(sizeof(*msg) ==
1453 client->communication.inbound.buffer.size);
1454 msg = (const struct lttng_notification_channel_message *)
1455 client->communication.inbound.buffer.data;
1456
1457 if (msg->size == 0 || msg->size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) {
1458 ERR("[notification-thread] Invalid notification channel message: length = %u", msg->size);
1459 ret = -1;
1460 goto end;
1461 }
1462
1463 switch (msg->type) {
1464 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
1465 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
1466 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
1467 break;
1468 default:
1469 ret = -1;
1470 ERR("[notification-thread] Invalid notification channel message: unexpected message type");
1471 goto end;
1472 }
1473
1474 client->communication.inbound.bytes_to_receive = msg->size;
1475 client->communication.inbound.msg_type =
1476 (enum lttng_notification_channel_message_type) msg->type;
1477 if (client->communication.inbound.msg_type ==
1478 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE) {
1479 client->communication.inbound.receive_creds = true;
1480 }
1481 ret = lttng_dynamic_buffer_set_size(
1482 &client->communication.inbound.buffer, 0);
1483 if (ret) {
1484 goto end;
1485 }
1486 break;
1487 }
1488 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
1489 {
1490 struct lttng_notification_channel_command_handshake *handshake_client;
1491 struct lttng_notification_channel_command_handshake handshake_reply = {
1492 .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
1493 .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
1494 };
1495 struct lttng_notification_channel_message msg_header = {
1496 .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE,
1497 .size = sizeof(handshake_reply),
1498 };
1499 enum lttng_notification_channel_status status =
1500 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
1501 char send_buffer[sizeof(msg_header) + sizeof(handshake_reply)];
1502
1503 memcpy(send_buffer, &msg_header, sizeof(msg_header));
1504 memcpy(send_buffer + sizeof(msg_header), &handshake_reply,
1505 sizeof(handshake_reply));
1506
1507 handshake_client =
1508 (struct lttng_notification_channel_command_handshake *)
1509 client->communication.inbound.buffer.data;
1510 client->major = handshake_client->major;
1511 client->minor = handshake_client->minor;
1512 if (!client->communication.inbound.creds_received) {
1513 ERR("[notification-thread] No credentials received from client");
1514 ret = -1;
1515 goto end;
1516 }
1517
1518 client->uid = LTTNG_SOCK_GET_UID_CRED(
1519 &client->communication.inbound.creds);
1520 client->gid = LTTNG_SOCK_GET_GID_CRED(
1521 &client->communication.inbound.creds);
1522 DBG("[notification-thread] Received handshake from client (uid = %u, gid = %u) with version %i.%i",
1523 client->uid, client->gid, (int) client->major,
1524 (int) client->minor);
1525
1526 if (handshake_client->major != LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) {
1527 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNSUPPORTED_VERSION;
1528 }
1529
1530 ret = lttng_dynamic_buffer_append(&client->communication.outbound.buffer,
1531 send_buffer, sizeof(send_buffer));
1532 if (ret) {
1533 ERR("[notification-thread] Failed to send protocol version to notification channel client");
1534 goto end;
1535 }
1536
1537 ret = client_flush_outgoing_queue(client, state);
1538 if (ret) {
1539 goto end;
1540 }
1541
1542 ret = client_send_command_reply(client, state, status);
1543 if (ret) {
1544 ERR("[notification-thread] Failed to send reply to notification channel client");
1545 goto end;
1546 }
1547
1548 /* Set reception state to receive the next message header. */
1549 client_reset_inbound_state(client);
1550 client->validated = true;
1551 break;
1552 }
1553 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
1554 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
1555 {
1556 struct lttng_condition *condition;
1557 enum lttng_notification_channel_status status =
1558 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
1559 const struct lttng_buffer_view condition_view =
1560 lttng_buffer_view_from_dynamic_buffer(
1561 &client->communication.inbound.buffer,
1562 0, -1);
1563 size_t expected_condition_size =
1564 client->communication.inbound.buffer.size;
1565
1566 ret = lttng_condition_create_from_buffer(&condition_view,
1567 &condition);
1568 if (ret != expected_condition_size) {
1569 ERR("[notification-thread] Malformed condition received from client");
1570 goto end;
1571 }
1572
1573 if (client->communication.inbound.msg_type ==
1574 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE) {
1575 /*
1576 * FIXME The current state should be evaluated on
1577 * subscription.
1578 */
1579 ret = notification_thread_client_subscribe(client,
1580 condition, state, &status);
1581 } else {
1582 ret = notification_thread_client_unsubscribe(client,
1583 condition, state, &status);
1584 }
1585 if (ret) {
1586 goto end;
1587 }
1588
1589 ret = client_send_command_reply(client, state, status);
1590 if (ret) {
1591 ERR("[notification-thread] Failed to send reply to notification channel client");
1592 goto end;
1593 }
1594
1595 /* Set reception state to receive the next message header. */
1596 client_reset_inbound_state(client);
1597 break;
1598 }
1599 default:
1600 abort();
1601 }
1602end:
1603 return ret;
1604}
1605
1606/* Incoming data from client. */
1607int handle_notification_thread_client_in(
1608 struct notification_thread_state *state, int socket)
1609{
1610 int ret;
1611 struct notification_client *client;
1612 ssize_t recv_ret;
1613 size_t offset;
1614
1615 client = get_client_from_socket(socket, state);
1616 if (!client) {
1617 /* Internal error, abort. */
1618 ret = -1;
1619 goto end;
1620 }
1621
1622 offset = client->communication.inbound.buffer.size;
1623 ret = lttng_dynamic_buffer_set_size(
1624 &client->communication.inbound.buffer,
1625 client->communication.inbound.bytes_to_receive);
1626 if (ret) {
1627 goto end;
1628 }
1629
1630 if (client->communication.inbound.receive_creds) {
1631 recv_ret = lttcomm_recv_creds_unix_sock(socket,
1632 client->communication.inbound.buffer.data + offset,
1633 client->communication.inbound.bytes_to_receive,
1634 &client->communication.inbound.creds);
1635 if (recv_ret > 0) {
1636 client->communication.inbound.receive_creds = false;
1637 client->communication.inbound.creds_received = true;
1638 }
1639 } else {
1640 recv_ret = lttcomm_recv_unix_sock_non_block(socket,
1641 client->communication.inbound.buffer.data + offset,
1642 client->communication.inbound.bytes_to_receive);
1643 }
1644 if (recv_ret < 0) {
1645 goto error_disconnect_client;
1646 }
1647
1648 client->communication.inbound.bytes_to_receive -= recv_ret;
1649 ret = lttng_dynamic_buffer_set_size(
1650 &client->communication.inbound.buffer,
1651 client->communication.inbound.buffer.size -
1652 client->communication.inbound.bytes_to_receive);
1653 if (ret) {
1654 goto end;
1655 }
1656
1657 if (client->communication.inbound.bytes_to_receive == 0) {
1658 ret = client_dispatch_message(client, state);
1659 if (ret) {
1660 /*
1661 * Only returns an error if this client must be
1662 * disconnected.
1663 */
1664 goto error_disconnect_client;
1665 }
1666 } else {
1667 goto end;
1668 }
1669end:
1670 return ret;
1671error_disconnect_client:
1672 ret = handle_notification_thread_client_disconnect(socket, state);
1673 return ret;
1674}
1675
1676/* Client ready to receive outgoing data. */
1677int handle_notification_thread_client_out(
1678 struct notification_thread_state *state, int socket)
1679{
1680 int ret;
1681 struct notification_client *client;
1682
1683 client = get_client_from_socket(socket, state);
1684 if (!client) {
1685 /* Internal error, abort. */
1686 ret = -1;
1687 goto end;
1688 }
1689
1690 ret = client_flush_outgoing_queue(client, state);
1691 if (ret) {
1692 goto end;
1693 }
1694end:
1695 return ret;
1696}
1697
1698static
1699bool evaluate_buffer_usage_condition(struct lttng_condition *condition,
1700 struct channel_state_sample *sample, uint64_t buffer_capacity)
1701{
1702 bool result = false;
1703 uint64_t threshold;
1704 enum lttng_condition_type condition_type;
1705 struct lttng_condition_buffer_usage *use_condition = container_of(
1706 condition, struct lttng_condition_buffer_usage,
1707 parent);
1708
1709 if (!sample) {
1710 goto end;
1711 }
1712
1713 if (use_condition->threshold_bytes.set) {
1714 threshold = use_condition->threshold_bytes.value;
1715 } else {
1716 /*
1717 * Threshold was expressed as a ratio.
1718 *
1719 * TODO the threshold (in bytes) of conditions expressed
1720 * as a ratio of total buffer size could be cached to
1721 * forego this double-multiplication or it could be performed
1722 * as fixed-point math.
1723 *
1724 * Note that caching should accomodate the case where the
1725 * condition applies to multiple channels (i.e. don't assume
1726 * that all channels matching my_chann* have the same size...)
1727 */
1728 threshold = (uint64_t) (use_condition->threshold_ratio.value *
1729 (double) buffer_capacity);
1730 }
1731
1732 condition_type = lttng_condition_get_type(condition);
1733 if (condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW) {
1734 DBG("[notification-thread] Low buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
1735 threshold, sample->highest_usage);
1736
1737 /*
1738 * The low condition should only be triggered once _all_ of the
1739 * streams in a channel have gone below the "low" threshold.
1740 */
1741 if (sample->highest_usage <= threshold) {
1742 result = true;
1743 }
1744 } else {
1745 DBG("[notification-thread] High buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
1746 threshold, sample->highest_usage);
1747
1748 /*
1749 * For high buffer usage scenarios, we want to trigger whenever
1750 * _any_ of the streams has reached the "high" threshold.
1751 */
1752 if (sample->highest_usage >= threshold) {
1753 result = true;
1754 }
1755 }
1756end:
1757 return result;
1758}
1759
1760static
1761int evaluate_condition(struct lttng_condition *condition,
1762 struct lttng_evaluation **evaluation,
1763 struct notification_thread_state *state,
1764 struct channel_state_sample *previous_sample,
1765 struct channel_state_sample *latest_sample,
1766 uint64_t buffer_capacity)
1767{
1768 int ret = 0;
1769 enum lttng_condition_type condition_type;
1770 bool previous_sample_result;
1771 bool latest_sample_result;
1772
1773 condition_type = lttng_condition_get_type(condition);
1774 /* No other condition type supported for the moment. */
1775 assert(condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW ||
1776 condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH);
1777
1778 previous_sample_result = evaluate_buffer_usage_condition(condition,
1779 previous_sample, buffer_capacity);
1780 latest_sample_result = evaluate_buffer_usage_condition(condition,
1781 latest_sample, buffer_capacity);
1782
1783 if (!latest_sample_result ||
1784 (previous_sample_result == latest_sample_result)) {
1785 /*
1786 * Only trigger on a condition evaluation transition.
1787 *
1788 * NOTE: This edge-triggered logic may not be appropriate for
1789 * future condition types.
1790 */
1791 goto end;
1792 }
1793
1794 if (evaluation && latest_sample_result) {
1795 *evaluation = lttng_evaluation_buffer_usage_create(
1796 condition_type,
1797 latest_sample->highest_usage,
1798 buffer_capacity);
1799 if (!*evaluation) {
1800 ret = -1;
1801 goto end;
1802 }
1803 }
1804end:
1805 return ret;
1806}
1807
1808static
1809int client_enqueue_dropped_notification(struct notification_client *client,
1810 struct notification_thread_state *state)
1811{
1812 int ret;
1813 struct lttng_notification_channel_message msg = {
1814 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED,
1815 .size = 0,
1816 };
1817
1818 ret = lttng_dynamic_buffer_append(
1819 &client->communication.outbound.buffer, &msg,
1820 sizeof(msg));
1821 return ret;
1822}
1823
1824static
1825int send_evaluation_to_clients(struct lttng_trigger *trigger,
1826 struct lttng_evaluation *evaluation,
1827 struct notification_client_list* client_list,
1828 struct notification_thread_state *state,
1829 uid_t channel_uid, gid_t channel_gid)
1830{
1831 int ret = 0;
1832 struct lttng_dynamic_buffer msg_buffer;
1833 struct notification_client_list_element *client_list_element, *tmp;
1834 struct lttng_notification *notification;
1835 struct lttng_condition *condition;
1836 ssize_t expected_notification_size, notification_size;
1837 struct lttng_notification_channel_message msg;
1838
1839 lttng_dynamic_buffer_init(&msg_buffer);
1840
1841 condition = lttng_trigger_get_condition(trigger);
1842 assert(condition);
1843
1844 notification = lttng_notification_create(condition, evaluation);
1845 if (!notification) {
1846 ret = -1;
1847 goto end;
1848 }
1849
1850 expected_notification_size = lttng_notification_serialize(notification,
1851 NULL);
1852 if (expected_notification_size < 0) {
1853 ERR("[notification-thread] Failed to get size of serialized notification");
1854 ret = -1;
1855 goto end;
1856 }
1857
1858 msg.type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION;
1859 msg.size = (uint32_t) expected_notification_size;
1860 ret = lttng_dynamic_buffer_append(&msg_buffer, &msg, sizeof(msg));
1861 if (ret) {
1862 goto end;
1863 }
1864
1865 ret = lttng_dynamic_buffer_set_size(&msg_buffer,
1866 msg_buffer.size + expected_notification_size);
1867 if (ret) {
1868 goto end;
1869 }
1870
1871 notification_size = lttng_notification_serialize(notification,
1872 msg_buffer.data + sizeof(msg));
1873 if (notification_size != expected_notification_size) {
1874 ERR("[notification-thread] Failed to serialize notification");
1875 ret = -1;
1876 goto end;
1877 }
1878
1879 cds_list_for_each_entry_safe(client_list_element, tmp,
1880 &client_list->list, node) {
1881 struct notification_client *client =
1882 client_list_element->client;
1883
1884 if (client->uid != channel_uid && client->gid != channel_gid &&
1885 client->uid != 0) {
1886 /* Client is not allowed to monitor this channel. */
1887 DBG("[notification-thread] Skipping client at it does not have the permission to receive notification for this channel");
1888 continue;
1889 }
1890
1891 DBG("[notification-thread] Sending notification to client (fd = %i, %zu bytes)",
1892 client->socket, msg_buffer.size);
1893 if (client->communication.outbound.buffer.size) {
1894 /*
1895 * Outgoing data is already buffered for this client;
1896 * drop the notification and enqueue a "dropped
1897 * notification" message if this is the first dropped
1898 * notification since the socket spilled-over to the
1899 * queue.
1900 */
1901 DBG("[notification-thread] Dropping notification addressed to client (socket fd = %i)",
1902 client->socket);
1903 if (!client->communication.outbound.dropped_notification) {
1904 client->communication.outbound.dropped_notification = true;
1905 ret = client_enqueue_dropped_notification(
1906 client, state);
1907 if (ret) {
1908 goto end;
1909 }
1910 }
1911 continue;
1912 }
1913
1914 ret = lttng_dynamic_buffer_append_buffer(
1915 &client->communication.outbound.buffer,
1916 &msg_buffer);
1917 if (ret) {
1918 goto end;
1919 }
1920
1921 ret = client_flush_outgoing_queue(client, state);
1922 if (ret) {
1923 goto end;
1924 }
1925 }
1926 ret = 0;
1927end:
1928 lttng_notification_destroy(notification);
1929 lttng_dynamic_buffer_reset(&msg_buffer);
1930 return ret;
1931}
1932
1933int handle_notification_thread_channel_sample(
1934 struct notification_thread_state *state, int pipe,
1935 enum lttng_domain_type domain)
1936{
1937 int ret = 0;
1938 struct lttcomm_consumer_channel_monitor_msg sample_msg;
1939 struct channel_state_sample previous_sample, latest_sample;
1940 struct channel_info *channel_info;
1941 struct cds_lfht_node *node;
1942 struct cds_lfht_iter iter;
1943 struct lttng_channel_trigger_list *trigger_list;
1944 struct lttng_trigger_list_element *trigger_list_element;
1945 bool previous_sample_available = false;
1946
1947 /*
1948 * The monitoring pipe only holds messages smaller than PIPE_BUF,
1949 * ensuring that read/write of sampling messages are atomic.
1950 */
1951 do {
1952 ret = read(pipe, &sample_msg, sizeof(sample_msg));
1953 } while (ret == -1 && errno == EINTR);
1954 if (ret != sizeof(sample_msg)) {
1955 ERR("[notification-thread] Failed to read from monitoring pipe (fd = %i)",
1956 pipe);
1957 ret = -1;
1958 goto end;
1959 }
1960
1961 ret = 0;
1962 latest_sample.key.key = sample_msg.key;
1963 latest_sample.key.domain = domain;
1964 latest_sample.highest_usage = sample_msg.highest;
1965 latest_sample.lowest_usage = sample_msg.lowest;
1966
1967 rcu_read_lock();
1968
1969 /* Retrieve the channel's informations */
1970 cds_lfht_lookup(state->channels_ht,
1971 hash_channel_key(&latest_sample.key),
1972 match_channel_info,
1973 &latest_sample.key,
1974 &iter);
1975 node = cds_lfht_iter_get_node(&iter);
1976 if (!node) {
1977 /*
1978 * Not an error since the consumer can push a sample to the pipe
1979 * and the rest of the session daemon could notify us of the
1980 * channel's destruction before we get a chance to process that
1981 * sample.
1982 */
1983 DBG("[notification-thread] Received a sample for an unknown channel from consumerd, key = %" PRIu64 " in %s domain",
1984 latest_sample.key.key,
1985 domain == LTTNG_DOMAIN_KERNEL ? "kernel" :
1986 "user space");
1987 goto end_unlock;
1988 }
1989 channel_info = caa_container_of(node, struct channel_info,
1990 channels_ht_node);
1991 DBG("[notification-thread] Handling channel sample for channel %s (key = %" PRIu64 ") in session %s (highest usage = %" PRIu64 ", lowest usage = %" PRIu64")",
1992 channel_info->channel_name,
1993 latest_sample.key.key,
1994 channel_info->session_name,
1995 latest_sample.highest_usage,
1996 latest_sample.lowest_usage);
1997
1998 /* Retrieve the channel's last sample, if it exists, and update it. */
1999 cds_lfht_lookup(state->channel_state_ht,
2000 hash_channel_key(&latest_sample.key),
2001 match_channel_state_sample,
2002 &latest_sample.key,
2003 &iter);
2004 node = cds_lfht_iter_get_node(&iter);
2005 if (node) {
2006 struct channel_state_sample *stored_sample;
2007
2008 /* Update the sample stored. */
2009 stored_sample = caa_container_of(node,
2010 struct channel_state_sample,
2011 channel_state_ht_node);
2012 memcpy(&previous_sample, stored_sample,
2013 sizeof(previous_sample));
2014 stored_sample->highest_usage = latest_sample.highest_usage;
2015 stored_sample->lowest_usage = latest_sample.lowest_usage;
2016 previous_sample_available = true;
2017 } else {
2018 /*
2019 * This is the channel's first sample, allocate space for and
2020 * store the new sample.
2021 */
2022 struct channel_state_sample *stored_sample;
2023
2024 stored_sample = zmalloc(sizeof(*stored_sample));
2025 if (!stored_sample) {
2026 ret = -1;
2027 goto end_unlock;
2028 }
2029
2030 memcpy(stored_sample, &latest_sample, sizeof(*stored_sample));
2031 cds_lfht_node_init(&stored_sample->channel_state_ht_node);
2032 cds_lfht_add(state->channel_state_ht,
2033 hash_channel_key(&stored_sample->key),
2034 &stored_sample->channel_state_ht_node);
2035 }
2036
2037 /* Find triggers associated with this channel. */
2038 cds_lfht_lookup(state->channel_triggers_ht,
2039 hash_channel_key(&latest_sample.key),
2040 match_channel_trigger_list,
2041 &latest_sample.key,
2042 &iter);
2043 node = cds_lfht_iter_get_node(&iter);
2044 if (!node) {
2045 goto end_unlock;
2046 }
2047
2048 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
2049 channel_triggers_ht_node);
2050 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
2051 node) {
2052 struct lttng_condition *condition;
2053 struct lttng_action *action;
2054 struct lttng_trigger *trigger;
2055 struct notification_client_list *client_list;
2056 struct lttng_evaluation *evaluation = NULL;
2057
2058 trigger = trigger_list_element->trigger;
2059 condition = lttng_trigger_get_condition(trigger);
2060 assert(condition);
2061 action = lttng_trigger_get_action(trigger);
2062
2063 /* Notify actions are the only type currently supported. */
2064 assert(lttng_action_get_type(action) ==
2065 LTTNG_ACTION_TYPE_NOTIFY);
2066
2067 /*
2068 * Check if any client is subscribed to the result of this
2069 * evaluation.
2070 */
2071 cds_lfht_lookup(state->notification_trigger_clients_ht,
2072 lttng_condition_hash(condition),
2073 match_client_list,
2074 trigger,
2075 &iter);
2076 node = cds_lfht_iter_get_node(&iter);
2077 assert(node);
2078
2079 client_list = caa_container_of(node,
2080 struct notification_client_list,
2081 notification_trigger_ht_node);
2082 if (cds_list_empty(&client_list->list)) {
2083 /*
2084 * No clients interested in the evaluation's result,
2085 * skip it.
2086 */
2087 continue;
2088 }
2089
2090 ret = evaluate_condition(condition, &evaluation, state,
2091 previous_sample_available ? &previous_sample : NULL,
2092 &latest_sample, channel_info->capacity);
2093 if (ret) {
2094 goto end_unlock;
2095 }
2096
2097 if (!evaluation) {
2098 continue;
2099 }
2100
2101 /* Dispatch evaluation result to all clients. */
2102 ret = send_evaluation_to_clients(trigger_list_element->trigger,
2103 evaluation, client_list, state,
2104 channel_info->uid, channel_info->gid);
2105 if (ret) {
2106 goto end_unlock;
2107 }
2108 }
2109end_unlock:
2110 rcu_read_unlock();
2111end:
2112 return ret;
2113}
This page took 0.169288 seconds and 4 git commands to generate.