Fix: NULL pointer dereference in lttng_condition_serialize
[lttng-tools.git] / src / bin / lttng-sessiond / notification-thread-events.c
CommitLineData
ab0ee2ca
JG
1/*
2 * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18#define _LGPL_SOURCE
19#include <urcu.h>
20#include <urcu/rculfhash.h>
21
22#include "notification-thread.h"
23#include "notification-thread-events.h"
24#include "notification-thread-commands.h"
25#include <common/defaults.h>
26#include <common/error.h>
27#include <common/futex.h>
28#include <common/unix.h>
29#include <common/dynamic-buffer.h>
30#include <common/hashtable/utils.h>
31#include <common/sessiond-comm/sessiond-comm.h>
32#include <common/macros.h>
33#include <lttng/condition/condition.h>
34#include <lttng/action/action.h>
35#include <lttng/notification/notification-internal.h>
36#include <lttng/condition/condition-internal.h>
37#include <lttng/condition/buffer-usage-internal.h>
38#include <lttng/notification/channel-internal.h>
39#include <time.h>
40#include <unistd.h>
41#include <assert.h>
42#include <inttypes.h>
43
44#define CLIENT_POLL_MASK_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
45#define CLIENT_POLL_MASK_IN_OUT (CLIENT_POLL_MASK_IN | LPOLLOUT)
46
47struct lttng_trigger_list_element {
48 struct lttng_trigger *trigger;
49 struct cds_list_head node;
50};
51
52struct lttng_channel_trigger_list {
53 struct channel_key channel_key;
54 struct cds_list_head list;
55 struct cds_lfht_node channel_triggers_ht_node;
56};
57
58struct lttng_trigger_ht_element {
59 struct lttng_trigger *trigger;
60 struct cds_lfht_node node;
61};
62
63struct lttng_condition_list_element {
64 struct lttng_condition *condition;
65 struct cds_list_head node;
66};
67
68struct notification_client_list_element {
69 struct notification_client *client;
70 struct cds_list_head node;
71};
72
73struct notification_client_list {
74 struct lttng_trigger *trigger;
75 struct cds_list_head list;
76 struct cds_lfht_node notification_trigger_ht_node;
77};
78
79struct notification_client {
80 int socket;
81 /* Client protocol version. */
82 uint8_t major, minor;
83 uid_t uid;
84 gid_t gid;
85 /*
86 * Indicates if the credentials and versions of the client has been
87 * checked.
88 */
89 bool validated;
90 /*
91 * Conditions to which the client's notification channel is subscribed.
92 * List of struct lttng_condition_list_node. The condition member is
93 * owned by the client.
94 */
95 struct cds_list_head condition_list;
96 struct cds_lfht_node client_socket_ht_node;
97 struct {
98 struct {
99 struct lttng_dynamic_buffer buffer;
100 /* Bytes left to receive for the current message. */
101 size_t bytes_to_receive;
102 /* Type of the message being received. */
103 enum lttng_notification_channel_message_type msg_type;
104 /*
105 * Indicates whether or not credentials are expected
106 * from the client.
107 */
108 bool receive_creds;
109 /*
110 * Indicates whether or not credentials were received
111 * from the client.
112 */
113 bool creds_received;
114 lttng_sock_cred creds;
115 } inbound;
116 struct {
117 /*
118 * Indicates whether or not a notification addressed to
119 * this client was dropped because a command reply was
120 * already buffered.
121 *
122 * A notification is dropped whenever the buffer is not
123 * empty.
124 */
125 bool dropped_notification;
126 /*
127 * Indicates whether or not a command reply is already
128 * buffered. In this case, it means that the client is
129 * not consuming command replies before emitting a new
130 * one. This could be caused by a protocol error or a
131 * misbehaving/malicious client.
132 */
133 bool queued_command_reply;
134 struct lttng_dynamic_buffer buffer;
135 } outbound;
136 } communication;
137};
138
139struct channel_state_sample {
140 struct channel_key key;
141 struct cds_lfht_node channel_state_ht_node;
142 uint64_t highest_usage;
143 uint64_t lowest_usage;
144};
145
146static
147int match_client(struct cds_lfht_node *node, const void *key)
148{
149 /* This double-cast is intended to supress pointer-to-cast warning. */
150 int socket = (int) (intptr_t) key;
151 struct notification_client *client;
152
153 client = caa_container_of(node, struct notification_client,
154 client_socket_ht_node);
155
156 return !!(client->socket == socket);
157}
158
159static
160int match_channel_trigger_list(struct cds_lfht_node *node, const void *key)
161{
162 struct channel_key *channel_key = (struct channel_key *) key;
163 struct lttng_channel_trigger_list *trigger_list;
164
165 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
166 channel_triggers_ht_node);
167
168 return !!((channel_key->key == trigger_list->channel_key.key) &&
169 (channel_key->domain == trigger_list->channel_key.domain));
170}
171
172static
173int match_channel_state_sample(struct cds_lfht_node *node, const void *key)
174{
175 struct channel_key *channel_key = (struct channel_key *) key;
176 struct channel_state_sample *sample;
177
178 sample = caa_container_of(node, struct channel_state_sample,
179 channel_state_ht_node);
180
181 return !!((channel_key->key == sample->key.key) &&
182 (channel_key->domain == sample->key.domain));
183}
184
185static
186int match_channel_info(struct cds_lfht_node *node, const void *key)
187{
188 struct channel_key *channel_key = (struct channel_key *) key;
189 struct channel_info *channel_info;
190
191 channel_info = caa_container_of(node, struct channel_info,
192 channels_ht_node);
193
194 return !!((channel_key->key == channel_info->key.key) &&
195 (channel_key->domain == channel_info->key.domain));
196}
197
198static
199int match_condition(struct cds_lfht_node *node, const void *key)
200{
201 struct lttng_condition *condition_key = (struct lttng_condition *) key;
202 struct lttng_trigger_ht_element *trigger;
203 struct lttng_condition *condition;
204
205 trigger = caa_container_of(node, struct lttng_trigger_ht_element,
206 node);
207 condition = lttng_trigger_get_condition(trigger->trigger);
208 assert(condition);
209
210 return !!lttng_condition_is_equal(condition_key, condition);
211}
212
213static
214int match_client_list(struct cds_lfht_node *node, const void *key)
215{
216 struct lttng_trigger *trigger_key = (struct lttng_trigger *) key;
217 struct notification_client_list *client_list;
218 struct lttng_condition *condition;
219 struct lttng_condition *condition_key = lttng_trigger_get_condition(
220 trigger_key);
221
222 assert(condition_key);
223
224 client_list = caa_container_of(node, struct notification_client_list,
225 notification_trigger_ht_node);
226 condition = lttng_trigger_get_condition(client_list->trigger);
227
228 return !!lttng_condition_is_equal(condition_key, condition);
229}
230
231static
232int match_client_list_condition(struct cds_lfht_node *node, const void *key)
233{
234 struct lttng_condition *condition_key = (struct lttng_condition *) key;
235 struct notification_client_list *client_list;
236 struct lttng_condition *condition;
237
238 assert(condition_key);
239
240 client_list = caa_container_of(node, struct notification_client_list,
241 notification_trigger_ht_node);
242 condition = lttng_trigger_get_condition(client_list->trigger);
243
244 return !!lttng_condition_is_equal(condition_key, condition);
245}
246
247static
248unsigned long lttng_condition_buffer_usage_hash(
249 struct lttng_condition *_condition)
250{
251 unsigned long hash = 0;
252 struct lttng_condition_buffer_usage *condition;
253
254 condition = container_of(_condition,
255 struct lttng_condition_buffer_usage, parent);
256
257 if (condition->session_name) {
258 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
259 }
260 if (condition->channel_name) {
8f56701f 261 hash ^= hash_key_str(condition->channel_name, lttng_ht_seed);
ab0ee2ca
JG
262 }
263 if (condition->domain.set) {
264 hash ^= hash_key_ulong(
265 (void *) condition->domain.type,
266 lttng_ht_seed);
267 }
268 if (condition->threshold_ratio.set) {
269 uint64_t val;
270
271 val = condition->threshold_ratio.value * (double) UINT32_MAX;
272 hash ^= hash_key_u64(&val, lttng_ht_seed);
273 } else if (condition->threshold_ratio.set) {
274 uint64_t val;
275
276 val = condition->threshold_bytes.value;
277 hash ^= hash_key_u64(&val, lttng_ht_seed);
278 }
279 return hash;
280}
281
282/*
283 * The lttng_condition hashing code is kept in this file (rather than
284 * condition.c) since it makes use of GPLv2 code (hashtable utils), which we
285 * don't want to link in liblttng-ctl.
286 */
287static
288unsigned long lttng_condition_hash(struct lttng_condition *condition)
289{
290 switch (condition->type) {
291 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
292 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
293 return lttng_condition_buffer_usage_hash(condition);
294 default:
295 ERR("[notification-thread] Unexpected condition type caught");
296 abort();
297 }
298}
299
300static
301void channel_info_destroy(struct channel_info *channel_info)
302{
303 if (!channel_info) {
304 return;
305 }
306
307 if (channel_info->session_name) {
308 free(channel_info->session_name);
309 }
310 if (channel_info->channel_name) {
311 free(channel_info->channel_name);
312 }
313 free(channel_info);
314}
315
316static
317struct channel_info *channel_info_copy(struct channel_info *channel_info)
318{
319 struct channel_info *copy = zmalloc(sizeof(*channel_info));
320
321 assert(channel_info);
322 assert(channel_info->session_name);
323 assert(channel_info->channel_name);
324
325 if (!copy) {
326 goto end;
327 }
328
329 memcpy(copy, channel_info, sizeof(*channel_info));
330 copy->session_name = NULL;
331 copy->channel_name = NULL;
332
333 copy->session_name = strdup(channel_info->session_name);
334 if (!copy->session_name) {
335 goto error;
336 }
337 copy->channel_name = strdup(channel_info->channel_name);
338 if (!copy->channel_name) {
339 goto error;
340 }
341 cds_lfht_node_init(&channel_info->channels_ht_node);
342end:
343 return copy;
344error:
345 channel_info_destroy(copy);
346 return NULL;
347}
348
349static
350int notification_thread_client_subscribe(struct notification_client *client,
351 struct lttng_condition *condition,
352 struct notification_thread_state *state,
353 enum lttng_notification_channel_status *_status)
354{
355 int ret = 0;
356 struct cds_lfht_iter iter;
357 struct cds_lfht_node *node;
358 struct notification_client_list *client_list;
359 struct lttng_condition_list_element *condition_list_element = NULL;
360 struct notification_client_list_element *client_list_element = NULL;
361 enum lttng_notification_channel_status status =
362 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
363
364 /*
365 * Ensure that the client has not already subscribed to this condition
366 * before.
367 */
368 cds_list_for_each_entry(condition_list_element, &client->condition_list, node) {
369 if (lttng_condition_is_equal(condition_list_element->condition,
370 condition)) {
371 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ALREADY_SUBSCRIBED;
372 goto end;
373 }
374 }
375
376 condition_list_element = zmalloc(sizeof(*condition_list_element));
377 if (!condition_list_element) {
378 ret = -1;
379 goto error;
380 }
381 client_list_element = zmalloc(sizeof(*client_list_element));
382 if (!client_list_element) {
383 ret = -1;
384 goto error;
385 }
386
387 rcu_read_lock();
388
389 /*
390 * Add the newly-subscribed condition to the client's subscription list.
391 */
392 CDS_INIT_LIST_HEAD(&condition_list_element->node);
393 condition_list_element->condition = condition;
394 cds_list_add(&condition_list_element->node, &client->condition_list);
395
396 /*
397 * Add the client to the list of clients interested in a given trigger
398 * if a "notification" trigger with a corresponding condition was
399 * added prior.
400 */
401 cds_lfht_lookup(state->notification_trigger_clients_ht,
402 lttng_condition_hash(condition),
403 match_client_list_condition,
404 condition,
405 &iter);
406 node = cds_lfht_iter_get_node(&iter);
407 if (!node) {
4fb43b68 408 free(client_list_element);
ab0ee2ca
JG
409 goto end_unlock;
410 }
411
412 client_list = caa_container_of(node, struct notification_client_list,
413 notification_trigger_ht_node);
414 client_list_element->client = client;
415 CDS_INIT_LIST_HEAD(&client_list_element->node);
416 cds_list_add(&client_list_element->node, &client_list->list);
417end_unlock:
418 rcu_read_unlock();
419end:
420 if (_status) {
421 *_status = status;
422 }
423 return ret;
424error:
425 free(condition_list_element);
426 free(client_list_element);
427 return ret;
428}
429
430static
431int notification_thread_client_unsubscribe(
432 struct notification_client *client,
433 struct lttng_condition *condition,
434 struct notification_thread_state *state,
435 enum lttng_notification_channel_status *_status)
436{
437 struct cds_lfht_iter iter;
438 struct cds_lfht_node *node;
439 struct notification_client_list *client_list;
440 struct lttng_condition_list_element *condition_list_element,
441 *condition_tmp;
442 struct notification_client_list_element *client_list_element,
443 *client_tmp;
444 bool condition_found = false;
445 enum lttng_notification_channel_status status =
446 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
447
448 /* Remove the condition from the client's condition list. */
449 cds_list_for_each_entry_safe(condition_list_element, condition_tmp,
450 &client->condition_list, node) {
451 if (!lttng_condition_is_equal(condition_list_element->condition,
452 condition)) {
453 continue;
454 }
455
456 cds_list_del(&condition_list_element->node);
457 /*
458 * The caller may be iterating on the client's conditions to
459 * tear down a client's connection. In this case, the condition
460 * will be destroyed at the end.
461 */
462 if (condition != condition_list_element->condition) {
463 lttng_condition_destroy(
464 condition_list_element->condition);
465 }
466 free(condition_list_element);
467 condition_found = true;
468 break;
469 }
470
471 if (!condition_found) {
472 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNKNOWN_CONDITION;
473 goto end;
474 }
475
476 /*
477 * Remove the client from the list of clients interested the trigger
478 * matching the condition.
479 */
480 rcu_read_lock();
481 cds_lfht_lookup(state->notification_trigger_clients_ht,
482 lttng_condition_hash(condition),
483 match_client_list_condition,
484 condition,
485 &iter);
486 node = cds_lfht_iter_get_node(&iter);
487 if (!node) {
488 goto end_unlock;
489 }
490
491 client_list = caa_container_of(node, struct notification_client_list,
492 notification_trigger_ht_node);
493 cds_list_for_each_entry_safe(client_list_element, client_tmp,
494 &client_list->list, node) {
495 if (client_list_element->client->socket != client->socket) {
496 continue;
497 }
498 cds_list_del(&client_list_element->node);
499 free(client_list_element);
500 break;
501 }
502end_unlock:
503 rcu_read_unlock();
504end:
505 lttng_condition_destroy(condition);
506 if (_status) {
507 *_status = status;
508 }
509 return 0;
510}
511
512static
513void notification_client_destroy(struct notification_client *client,
514 struct notification_thread_state *state)
515{
516 struct lttng_condition_list_element *condition_list_element, *tmp;
517
518 if (!client) {
519 return;
520 }
521
522 /* Release all conditions to which the client was subscribed. */
523 cds_list_for_each_entry_safe(condition_list_element, tmp,
524 &client->condition_list, node) {
525 (void) notification_thread_client_unsubscribe(client,
526 condition_list_element->condition, state, NULL);
527 }
528
529 if (client->socket >= 0) {
530 (void) lttcomm_close_unix_sock(client->socket);
531 }
532 lttng_dynamic_buffer_reset(&client->communication.inbound.buffer);
533 lttng_dynamic_buffer_reset(&client->communication.outbound.buffer);
534 free(client);
535}
536
537/*
538 * Call with rcu_read_lock held (and hold for the lifetime of the returned
539 * client pointer).
540 */
541static
542struct notification_client *get_client_from_socket(int socket,
543 struct notification_thread_state *state)
544{
545 struct cds_lfht_iter iter;
546 struct cds_lfht_node *node;
547 struct notification_client *client = NULL;
548
549 cds_lfht_lookup(state->client_socket_ht,
550 hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed),
551 match_client,
552 (void *) (unsigned long) socket,
553 &iter);
554 node = cds_lfht_iter_get_node(&iter);
555 if (!node) {
556 goto end;
557 }
558
559 client = caa_container_of(node, struct notification_client,
560 client_socket_ht_node);
561end:
562 return client;
563}
564
565static
566bool trigger_applies_to_channel(struct lttng_trigger *trigger,
567 struct channel_info *info)
568{
569 enum lttng_condition_status status;
570 struct lttng_condition *condition;
571 const char *trigger_session_name = NULL;
572 const char *trigger_channel_name = NULL;
573 enum lttng_domain_type trigger_domain;
574
575 condition = lttng_trigger_get_condition(trigger);
576 if (!condition) {
577 goto fail;
578 }
579
580 switch (lttng_condition_get_type(condition)) {
581 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
582 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
583 break;
584 default:
585 goto fail;
586 }
587
588 status = lttng_condition_buffer_usage_get_domain_type(condition,
589 &trigger_domain);
590 assert(status == LTTNG_CONDITION_STATUS_OK);
591 if (info->key.domain != trigger_domain) {
592 goto fail;
593 }
594
595 status = lttng_condition_buffer_usage_get_session_name(
596 condition, &trigger_session_name);
597 assert((status == LTTNG_CONDITION_STATUS_OK) && trigger_session_name);
598
599 status = lttng_condition_buffer_usage_get_channel_name(
600 condition, &trigger_channel_name);
601 assert((status == LTTNG_CONDITION_STATUS_OK) && trigger_channel_name);
602
603 if (strcmp(info->session_name, trigger_session_name)) {
604 goto fail;
605 }
606 if (strcmp(info->channel_name, trigger_channel_name)) {
607 goto fail;
608 }
609
610 return true;
611fail:
612 return false;
613}
614
615static
616bool trigger_applies_to_client(struct lttng_trigger *trigger,
617 struct notification_client *client)
618{
619 bool applies = false;
620 struct lttng_condition_list_element *condition_list_element;
621
622 cds_list_for_each_entry(condition_list_element, &client->condition_list,
623 node) {
624 applies = lttng_condition_is_equal(
625 condition_list_element->condition,
626 lttng_trigger_get_condition(trigger));
627 if (applies) {
628 break;
629 }
630 }
631 return applies;
632}
633
634static
635unsigned long hash_channel_key(struct channel_key *key)
636{
637 return hash_key_u64(&key->key, lttng_ht_seed) ^ hash_key_ulong(
638 (void *) (unsigned long) key->domain, lttng_ht_seed);
639}
640
641static
642int handle_notification_thread_command_add_channel(
643 struct notification_thread_state *state,
644 struct channel_info *channel_info,
645 enum lttng_error_code *cmd_result)
646{
647 struct cds_list_head trigger_list;
648 struct channel_info *new_channel_info;
649 struct channel_key *channel_key;
650 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
651 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
652 int trigger_count = 0;
653 struct cds_lfht_iter iter;
654
655 DBG("[notification-thread] Adding channel %s from session %s, channel key = %" PRIu64 " in %s domain",
656 channel_info->channel_name, channel_info->session_name,
657 channel_info->key.key, channel_info->key.domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
658
659 CDS_INIT_LIST_HEAD(&trigger_list);
660
661 new_channel_info = channel_info_copy(channel_info);
662 if (!new_channel_info) {
663 goto error;
664 }
665
666 channel_key = &new_channel_info->key;
667
668 /* Build a list of all triggers applying to the new channel. */
669 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
670 node) {
671 struct lttng_trigger_list_element *new_element;
672
673 if (!trigger_applies_to_channel(trigger_ht_element->trigger,
674 channel_info)) {
675 continue;
676 }
677
678 new_element = zmalloc(sizeof(*new_element));
679 if (!new_element) {
680 goto error;
681 }
682 CDS_INIT_LIST_HEAD(&new_element->node);
683 new_element->trigger = trigger_ht_element->trigger;
684 cds_list_add(&new_element->node, &trigger_list);
685 trigger_count++;
686 }
687
688 DBG("[notification-thread] Found %i triggers that apply to newly added channel",
689 trigger_count);
690 channel_trigger_list = zmalloc(sizeof(*channel_trigger_list));
691 if (!channel_trigger_list) {
692 goto error;
693 }
694 channel_trigger_list->channel_key = *channel_key;
695 CDS_INIT_LIST_HEAD(&channel_trigger_list->list);
696 cds_lfht_node_init(&channel_trigger_list->channel_triggers_ht_node);
697 cds_list_splice(&trigger_list, &channel_trigger_list->list);
698
699 rcu_read_lock();
700 /* Add channel to the channel_ht which owns the channel_infos. */
701 cds_lfht_add(state->channels_ht,
702 hash_channel_key(channel_key),
703 &new_channel_info->channels_ht_node);
704 /*
705 * Add the list of triggers associated with this channel to the
706 * channel_triggers_ht.
707 */
708 cds_lfht_add(state->channel_triggers_ht,
709 hash_channel_key(channel_key),
710 &channel_trigger_list->channel_triggers_ht_node);
711 rcu_read_unlock();
712 *cmd_result = LTTNG_OK;
713 return 0;
714error:
715 /* Empty trigger list */
716 channel_info_destroy(new_channel_info);
717 return 1;
718}
719
720static
721int handle_notification_thread_command_remove_channel(
722 struct notification_thread_state *state,
723 uint64_t channel_key, enum lttng_domain_type domain,
724 enum lttng_error_code *cmd_result)
725{
726 struct cds_lfht_node *node;
727 struct cds_lfht_iter iter;
728 struct lttng_channel_trigger_list *trigger_list;
729 struct lttng_trigger_list_element *trigger_list_element, *tmp;
730 struct channel_key key = { .key = channel_key, .domain = domain };
731 struct channel_info *channel_info;
732
733 DBG("[notification-thread] Removing channel key = %" PRIu64 " in %s domain",
734 channel_key, domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
735
736 rcu_read_lock();
737
738 cds_lfht_lookup(state->channel_triggers_ht,
739 hash_channel_key(&key),
740 match_channel_trigger_list,
741 &key,
742 &iter);
743 node = cds_lfht_iter_get_node(&iter);
744 /*
745 * There is a severe internal error if we are being asked to remove a
746 * channel that doesn't exist.
747 */
748 if (!node) {
749 ERR("[notification-thread] Channel being removed is unknown to the notification thread");
750 goto end;
751 }
752
753 /* Free the list of triggers associated with this channel. */
754 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
755 channel_triggers_ht_node);
756 cds_list_for_each_entry_safe(trigger_list_element, tmp,
757 &trigger_list->list, node) {
758 cds_list_del(&trigger_list_element->node);
759 free(trigger_list_element);
760 }
761 cds_lfht_del(state->channel_triggers_ht, node);
762 free(trigger_list);
763
764 /* Free sampled channel state. */
765 cds_lfht_lookup(state->channel_state_ht,
766 hash_channel_key(&key),
767 match_channel_state_sample,
768 &key,
769 &iter);
770 node = cds_lfht_iter_get_node(&iter);
771 /*
772 * This is expected to be NULL if the channel is destroyed before we
773 * received a sample.
774 */
775 if (node) {
776 struct channel_state_sample *sample = caa_container_of(node,
777 struct channel_state_sample,
778 channel_state_ht_node);
779
780 cds_lfht_del(state->channel_state_ht, node);
781 free(sample);
782 }
783
784 /* Remove the channel from the channels_ht and free it. */
785 cds_lfht_lookup(state->channels_ht,
786 hash_channel_key(&key),
787 match_channel_info,
788 &key,
789 &iter);
790 node = cds_lfht_iter_get_node(&iter);
791 assert(node);
792 channel_info = caa_container_of(node, struct channel_info,
793 channels_ht_node);
794 cds_lfht_del(state->channels_ht, node);
795 channel_info_destroy(channel_info);
796end:
797 rcu_read_unlock();
798 *cmd_result = LTTNG_OK;
799 return 0;
800}
801
802/*
803 * FIXME A client's credentials are not checked when registering a trigger, nor
804 * are they stored alongside with the trigger.
805 *
806 * The effects of this are benign:
807 * - The client will succeed in registering the trigger, as it is valid,
808 * - The trigger will, internally, be bound to the channel,
809 * - The notifications will not be sent since the client's credentials
810 * are checked against the channel at that moment.
811 */
812static
813int handle_notification_thread_command_register_trigger(
814 struct notification_thread_state *state,
815 struct lttng_trigger *trigger,
816 enum lttng_error_code *cmd_result)
817{
818 int ret = 0;
819 struct lttng_condition *condition;
820 struct notification_client *client;
821 struct notification_client_list *client_list = NULL;
822 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
823 struct notification_client_list_element *client_list_element, *tmp;
824 struct cds_lfht_node *node;
825 struct cds_lfht_iter iter;
826 struct channel_info *channel;
827 bool free_trigger = true;
828
829 rcu_read_lock();
830
831 condition = lttng_trigger_get_condition(trigger);
832 trigger_ht_element = zmalloc(sizeof(*trigger_ht_element));
833 if (!trigger_ht_element) {
834 ret = -1;
835 goto error;
836 }
837
838 /* Add trigger to the trigger_ht. */
839 cds_lfht_node_init(&trigger_ht_element->node);
840 trigger_ht_element->trigger = trigger;
841
842 node = cds_lfht_add_unique(state->triggers_ht,
843 lttng_condition_hash(condition),
844 match_condition,
845 condition,
846 &trigger_ht_element->node);
847 if (node != &trigger_ht_element->node) {
848 /* Not a fatal error, simply report it to the client. */
849 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
850 goto error_free_ht_element;
851 }
852
853 /*
854 * Ownership of the trigger and of its wrapper was transfered to
855 * the triggers_ht.
856 */
857 trigger_ht_element = NULL;
858 free_trigger = false;
859
860 /*
861 * The rest only applies to triggers that have a "notify" action.
862 * It is not skipped as this is the only action type currently
863 * supported.
864 */
865 client_list = zmalloc(sizeof(*client_list));
866 if (!client_list) {
867 ret = -1;
868 goto error_free_ht_element;
869 }
870 cds_lfht_node_init(&client_list->notification_trigger_ht_node);
871 CDS_INIT_LIST_HEAD(&client_list->list);
872 client_list->trigger = trigger;
873
874 /* Build a list of clients to which this new trigger applies. */
875 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
876 client_socket_ht_node) {
877 if (!trigger_applies_to_client(trigger, client)) {
878 continue;
879 }
880
881 client_list_element = zmalloc(sizeof(*client_list_element));
882 if (!client_list_element) {
883 ret = -1;
884 goto error_free_client_list;
885 }
886 CDS_INIT_LIST_HEAD(&client_list_element->node);
887 client_list_element->client = client;
888 cds_list_add(&client_list_element->node, &client_list->list);
889 }
890
891 cds_lfht_add(state->notification_trigger_clients_ht,
892 lttng_condition_hash(condition),
893 &client_list->notification_trigger_ht_node);
894 /*
895 * Client list ownership transferred to the
896 * notification_trigger_clients_ht.
897 */
898 client_list = NULL;
899
900 /*
901 * Add the trigger to list of triggers bound to the channels currently
902 * known.
903 */
904 cds_lfht_for_each_entry(state->channels_ht, &iter, channel,
905 channels_ht_node) {
906 struct lttng_trigger_list_element *trigger_list_element;
907 struct lttng_channel_trigger_list *trigger_list;
908
909 if (!trigger_applies_to_channel(trigger, channel)) {
910 continue;
911 }
912
913 cds_lfht_lookup(state->channel_triggers_ht,
914 hash_channel_key(&channel->key),
915 match_channel_trigger_list,
916 &channel->key,
917 &iter);
918 node = cds_lfht_iter_get_node(&iter);
919 assert(node);
920 /* Free the list of triggers associated with this channel. */
921 trigger_list = caa_container_of(node,
922 struct lttng_channel_trigger_list,
923 channel_triggers_ht_node);
924
925 trigger_list_element = zmalloc(sizeof(*trigger_list_element));
926 if (!trigger_list_element) {
927 ret = -1;
928 goto error_free_client_list;
929 }
930 CDS_INIT_LIST_HEAD(&trigger_list_element->node);
931 trigger_list_element->trigger = trigger;
932 cds_list_add(&trigger_list_element->node, &trigger_list->list);
933 /* A trigger can only apply to one channel. */
934 break;
935 }
936
937 *cmd_result = LTTNG_OK;
938error_free_client_list:
939 if (client_list) {
940 cds_list_for_each_entry_safe(client_list_element, tmp,
941 &client_list->list, node) {
942 free(client_list_element);
943 }
944 free(client_list);
945 }
946error_free_ht_element:
947 free(trigger_ht_element);
948error:
949 if (free_trigger) {
950 struct lttng_action *action = lttng_trigger_get_action(trigger);
951
952 lttng_condition_destroy(condition);
953 lttng_action_destroy(action);
954 lttng_trigger_destroy(trigger);
955 }
956 rcu_read_unlock();
957 return ret;
958}
959
960int handle_notification_thread_command_unregister_trigger(
961 struct notification_thread_state *state,
962 struct lttng_trigger *trigger,
963 enum lttng_error_code *_cmd_reply)
964{
965 struct cds_lfht_iter iter;
966 struct cds_lfht_node *node, *triggers_ht_node;
967 struct lttng_channel_trigger_list *trigger_list;
968 struct notification_client_list *client_list;
969 struct notification_client_list_element *client_list_element, *tmp;
970 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
971 struct lttng_condition *condition = lttng_trigger_get_condition(
972 trigger);
973 struct lttng_action *action;
974 enum lttng_error_code cmd_reply;
975
976 rcu_read_lock();
977
978 cds_lfht_lookup(state->triggers_ht,
979 lttng_condition_hash(condition),
980 match_condition,
981 condition,
982 &iter);
983 triggers_ht_node = cds_lfht_iter_get_node(&iter);
984 if (!triggers_ht_node) {
985 cmd_reply = LTTNG_ERR_TRIGGER_NOT_FOUND;
986 goto end;
987 } else {
988 cmd_reply = LTTNG_OK;
989 }
990
991 /* Remove trigger from channel_triggers_ht. */
992 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter, trigger_list,
993 channel_triggers_ht_node) {
994 struct lttng_trigger_list_element *trigger_element, *tmp;
995
996 cds_list_for_each_entry_safe(trigger_element, tmp,
997 &trigger_list->list, node) {
998 struct lttng_condition *current_condition =
999 lttng_trigger_get_condition(
1000 trigger_element->trigger);
1001
1002 assert(current_condition);
1003 if (!lttng_condition_is_equal(condition,
1004 current_condition)) {
1005 continue;
1006 }
1007
1008 DBG("[notification-thread] Removed trigger from channel_triggers_ht");
1009 cds_list_del(&trigger_element->node);
1010 }
1011 }
1012
1013 /*
1014 * Remove and release the client list from
1015 * notification_trigger_clients_ht.
1016 */
1017 cds_lfht_lookup(state->notification_trigger_clients_ht,
1018 lttng_condition_hash(condition),
1019 match_client_list,
1020 trigger,
1021 &iter);
1022 node = cds_lfht_iter_get_node(&iter);
1023 assert(node);
1024 client_list = caa_container_of(node, struct notification_client_list,
1025 notification_trigger_ht_node);
1026 cds_list_for_each_entry_safe(client_list_element, tmp,
1027 &client_list->list, node) {
1028 free(client_list_element);
1029 }
1030 cds_lfht_del(state->notification_trigger_clients_ht, node);
1031 free(client_list);
1032
1033 /* Remove trigger from triggers_ht. */
1034 trigger_ht_element = caa_container_of(triggers_ht_node,
1035 struct lttng_trigger_ht_element, node);
1036 cds_lfht_del(state->triggers_ht, triggers_ht_node);
1037
1038 condition = lttng_trigger_get_condition(trigger_ht_element->trigger);
1039 lttng_condition_destroy(condition);
1040 action = lttng_trigger_get_action(trigger_ht_element->trigger);
1041 lttng_action_destroy(action);
1042 lttng_trigger_destroy(trigger_ht_element->trigger);
1043 free(trigger_ht_element);
1044end:
1045 rcu_read_unlock();
1046 if (_cmd_reply) {
1047 *_cmd_reply = cmd_reply;
1048 }
1049 return 0;
1050}
1051
1052/* Returns 0 on success, 1 on exit requested, negative value on error. */
1053int handle_notification_thread_command(
1054 struct notification_thread_handle *handle,
1055 struct notification_thread_state *state)
1056{
1057 int ret;
1058 uint64_t counter;
1059 struct notification_thread_command *cmd;
1060
1061 /* Read event_fd to put it back into a quiescent state. */
1062 ret = read(handle->cmd_queue.event_fd, &counter, sizeof(counter));
1063 if (ret == -1) {
1064 goto error;
1065 }
1066
1067 pthread_mutex_lock(&handle->cmd_queue.lock);
1068 cmd = cds_list_first_entry(&handle->cmd_queue.list,
1069 struct notification_thread_command, cmd_list_node);
1070 switch (cmd->type) {
1071 case NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER:
1072 DBG("[notification-thread] Received register trigger command");
1073 ret = handle_notification_thread_command_register_trigger(
1074 state, cmd->parameters.trigger,
1075 &cmd->reply_code);
1076 break;
1077 case NOTIFICATION_COMMAND_TYPE_UNREGISTER_TRIGGER:
1078 DBG("[notification-thread] Received unregister trigger command");
1079 ret = handle_notification_thread_command_unregister_trigger(
1080 state, cmd->parameters.trigger,
1081 &cmd->reply_code);
1082 break;
1083 case NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL:
1084 DBG("[notification-thread] Received add channel command");
1085 ret = handle_notification_thread_command_add_channel(
1086 state, &cmd->parameters.add_channel,
1087 &cmd->reply_code);
1088 break;
1089 case NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL:
1090 DBG("[notification-thread] Received remove channel command");
1091 ret = handle_notification_thread_command_remove_channel(
1092 state, cmd->parameters.remove_channel.key,
1093 cmd->parameters.remove_channel.domain,
1094 &cmd->reply_code);
1095 break;
1096 case NOTIFICATION_COMMAND_TYPE_QUIT:
1097 DBG("[notification-thread] Received quit command");
1098 cmd->reply_code = LTTNG_OK;
1099 ret = 1;
1100 goto end;
1101 default:
1102 ERR("[notification-thread] Unknown internal command received");
1103 goto error_unlock;
1104 }
1105
1106 if (ret) {
1107 goto error_unlock;
1108 }
1109end:
1110 cds_list_del(&cmd->cmd_list_node);
1111 futex_nto1_wake(&cmd->reply_futex);
1112 pthread_mutex_unlock(&handle->cmd_queue.lock);
1113 return ret;
1114error_unlock:
1115 /* Wake-up and return a fatal error to the calling thread. */
1116 futex_nto1_wake(&cmd->reply_futex);
1117 pthread_mutex_unlock(&handle->cmd_queue.lock);
1118 cmd->reply_code = LTTNG_ERR_FATAL;
1119error:
1120 /* Indicate a fatal error to the caller. */
1121 return -1;
1122}
1123
1124static
1125unsigned long hash_client_socket(int socket)
1126{
1127 return hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed);
1128}
1129
1130static
1131int socket_set_non_blocking(int socket)
1132{
1133 int ret, flags;
1134
1135 /* Set the pipe as non-blocking. */
1136 ret = fcntl(socket, F_GETFL, 0);
1137 if (ret == -1) {
1138 PERROR("fcntl get socket flags");
1139 goto end;
1140 }
1141 flags = ret;
1142
1143 ret = fcntl(socket, F_SETFL, flags | O_NONBLOCK);
1144 if (ret == -1) {
1145 PERROR("fcntl set O_NONBLOCK socket flag");
1146 goto end;
1147 }
1148 DBG("Client socket (fd = %i) set as non-blocking", socket);
1149end:
1150 return ret;
1151}
1152
1153static
1154void client_reset_inbound_state(struct notification_client *client)
1155{
1156 int ret;
1157
1158 ret = lttng_dynamic_buffer_set_size(
1159 &client->communication.inbound.buffer, 0);
1160 assert(!ret);
1161
1162 client->communication.inbound.bytes_to_receive =
1163 sizeof(struct lttng_notification_channel_message);
1164 client->communication.inbound.msg_type =
1165 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN;
1166 client->communication.inbound.receive_creds = false;
1167 LTTNG_SOCK_SET_UID_CRED(&client->communication.inbound.creds, -1);
1168 LTTNG_SOCK_SET_GID_CRED(&client->communication.inbound.creds, -1);
1169}
1170
1171int handle_notification_thread_client_connect(
1172 struct notification_thread_state *state)
1173{
1174 int ret;
1175 struct notification_client *client;
1176
1177 DBG("[notification-thread] Handling new notification channel client connection");
1178
1179 client = zmalloc(sizeof(*client));
1180 if (!client) {
1181 /* Fatal error. */
1182 ret = -1;
1183 goto error;
1184 }
1185 CDS_INIT_LIST_HEAD(&client->condition_list);
1186 lttng_dynamic_buffer_init(&client->communication.inbound.buffer);
1187 lttng_dynamic_buffer_init(&client->communication.outbound.buffer);
1188 client_reset_inbound_state(client);
1189
1190 ret = lttcomm_accept_unix_sock(state->notification_channel_socket);
1191 if (ret < 0) {
1192 ERR("[notification-thread] Failed to accept new notification channel client connection");
1193 ret = 0;
1194 goto error;
1195 }
1196
1197 client->socket = ret;
1198
1199 ret = socket_set_non_blocking(client->socket);
1200 if (ret) {
1201 ERR("[notification-thread] Failed to set new notification channel client connection socket as non-blocking");
1202 goto error;
1203 }
1204
1205 ret = lttcomm_setsockopt_creds_unix_sock(client->socket);
1206 if (ret < 0) {
1207 ERR("[notification-thread] Failed to set socket options on new notification channel client socket");
1208 ret = 0;
1209 goto error;
1210 }
1211
1212 ret = lttng_poll_add(&state->events, client->socket,
1213 LPOLLIN | LPOLLERR |
1214 LPOLLHUP | LPOLLRDHUP);
1215 if (ret < 0) {
1216 ERR("[notification-thread] Failed to add notification channel client socket to poll set");
1217 ret = 0;
1218 goto error;
1219 }
1220 DBG("[notification-thread] Added new notification channel client socket (%i) to poll set",
1221 client->socket);
1222
1223 /* Add to ht. */
1224 rcu_read_lock();
1225 cds_lfht_add(state->client_socket_ht,
1226 hash_client_socket(client->socket),
1227 &client->client_socket_ht_node);
1228 rcu_read_unlock();
1229
1230 return ret;
1231error:
1232 notification_client_destroy(client, state);
1233 return ret;
1234}
1235
1236int handle_notification_thread_client_disconnect(
1237 int client_socket,
1238 struct notification_thread_state *state)
1239{
1240 int ret = 0;
1241 struct notification_client *client;
1242
1243 rcu_read_lock();
1244 DBG("[notification-thread] Closing client connection (socket fd = %i)",
1245 client_socket);
1246 client = get_client_from_socket(client_socket, state);
1247 if (!client) {
1248 /* Internal state corruption, fatal error. */
1249 ERR("[notification-thread] Unable to find client (socket fd = %i)",
1250 client_socket);
1251 ret = -1;
1252 goto end;
1253 }
1254
1255 ret = lttng_poll_del(&state->events, client_socket);
1256 if (ret) {
1257 ERR("[notification-thread] Failed to remove client socket from poll set");
1258 }
1259 cds_lfht_del(state->client_socket_ht,
1260 &client->client_socket_ht_node);
1261 notification_client_destroy(client, state);
1262end:
1263 rcu_read_unlock();
1264 return ret;
1265}
1266
1267int handle_notification_thread_client_disconnect_all(
1268 struct notification_thread_state *state)
1269{
1270 struct cds_lfht_iter iter;
1271 struct notification_client *client;
1272 bool error_encoutered = false;
1273
1274 rcu_read_lock();
1275 DBG("[notification-thread] Closing all client connections");
1276 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
1277 client_socket_ht_node) {
1278 int ret;
1279
1280 ret = handle_notification_thread_client_disconnect(
1281 client->socket, state);
1282 if (ret) {
1283 error_encoutered = true;
1284 }
1285 }
1286 rcu_read_unlock();
1287 return error_encoutered ? 1 : 0;
1288}
1289
1290int handle_notification_thread_trigger_unregister_all(
1291 struct notification_thread_state *state)
1292{
1293 bool error_occured = false;
1294 struct cds_lfht_iter iter;
1295 struct lttng_trigger_ht_element *trigger_ht_element;
1296
1297 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1298 node) {
1299 int ret = handle_notification_thread_command_unregister_trigger(
1300 state, trigger_ht_element->trigger, NULL);
1301 if (ret) {
1302 error_occured = true;
1303 }
1304 }
1305 return error_occured ? -1 : 0;
1306}
1307
1308static
1309int client_flush_outgoing_queue(struct notification_client *client,
1310 struct notification_thread_state *state)
1311{
1312 ssize_t ret;
1313 size_t to_send_count;
1314
1315 assert(client->communication.outbound.buffer.size != 0);
1316 to_send_count = client->communication.outbound.buffer.size;
1317 DBG("[notification-thread] Flushing client (socket fd = %i) outgoing queue",
1318 client->socket);
1319
1320 ret = lttcomm_send_unix_sock_non_block(client->socket,
1321 client->communication.outbound.buffer.data,
1322 to_send_count);
1323 if ((ret < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) ||
1324 (ret > 0 && ret < to_send_count)) {
1325 DBG("[notification-thread] Client (socket fd = %i) outgoing queue could not be completely flushed",
1326 client->socket);
1327 to_send_count -= max(ret, 0);
1328
1329 memcpy(client->communication.outbound.buffer.data,
1330 client->communication.outbound.buffer.data +
1331 client->communication.outbound.buffer.size - to_send_count,
1332 to_send_count);
1333 ret = lttng_dynamic_buffer_set_size(
1334 &client->communication.outbound.buffer,
1335 to_send_count);
1336 if (ret) {
1337 goto error;
1338 }
1339
1340 /*
1341 * We want to be notified whenever there is buffer space
1342 * available to send the rest of the payload.
1343 */
1344 ret = lttng_poll_mod(&state->events, client->socket,
1345 CLIENT_POLL_MASK_IN_OUT);
1346 if (ret) {
1347 goto error;
1348 }
1349 } else if (ret < 0) {
1350 /* Generic error, disconnect the client. */
1351 ERR("[notification-thread] Failed to send flush outgoing queue, disconnecting client (socket fd = %i)",
1352 client->socket);
1353 ret = handle_notification_thread_client_disconnect(
1354 client->socket, state);
1355 if (ret) {
1356 goto error;
1357 }
1358 } else {
1359 /* No error and flushed the queue completely. */
1360 ret = lttng_dynamic_buffer_set_size(
1361 &client->communication.outbound.buffer, 0);
1362 if (ret) {
1363 goto error;
1364 }
1365 ret = lttng_poll_mod(&state->events, client->socket,
1366 CLIENT_POLL_MASK_IN);
1367 if (ret) {
1368 goto error;
1369 }
1370
1371 client->communication.outbound.queued_command_reply = false;
1372 client->communication.outbound.dropped_notification = false;
1373 }
1374
1375 return 0;
1376error:
1377 return -1;
1378}
1379
1380static
1381int client_send_command_reply(struct notification_client *client,
1382 struct notification_thread_state *state,
1383 enum lttng_notification_channel_status status)
1384{
1385 int ret;
1386 struct lttng_notification_channel_command_reply reply = {
1387 .status = (int8_t) status,
1388 };
1389 struct lttng_notification_channel_message msg = {
1390 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY,
1391 .size = sizeof(reply),
1392 };
1393 char buffer[sizeof(msg) + sizeof(reply)];
1394
1395 if (client->communication.outbound.queued_command_reply) {
1396 /* Protocol error. */
1397 goto error;
1398 }
1399
1400 memcpy(buffer, &msg, sizeof(msg));
1401 memcpy(buffer + sizeof(msg), &reply, sizeof(reply));
1402 DBG("[notification-thread] Send command reply (%i)", (int) status);
1403
1404 /* Enqueue buffer to outgoing queue and flush it. */
1405 ret = lttng_dynamic_buffer_append(
1406 &client->communication.outbound.buffer,
1407 buffer, sizeof(buffer));
1408 if (ret) {
1409 goto error;
1410 }
1411
1412 ret = client_flush_outgoing_queue(client, state);
1413 if (ret) {
1414 goto error;
1415 }
1416
1417 if (client->communication.outbound.buffer.size != 0) {
1418 /* Queue could not be emptied. */
1419 client->communication.outbound.queued_command_reply = true;
1420 }
1421
1422 return 0;
1423error:
1424 return -1;
1425}
1426
1427static
1428int client_dispatch_message(struct notification_client *client,
1429 struct notification_thread_state *state)
1430{
1431 int ret = 0;
1432
1433 if (client->communication.inbound.msg_type !=
1434 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE &&
1435 client->communication.inbound.msg_type !=
1436 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN &&
1437 !client->validated) {
1438 WARN("[notification-thread] client attempted a command before handshake");
1439 ret = -1;
1440 goto end;
1441 }
1442
1443 switch (client->communication.inbound.msg_type) {
1444 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN:
1445 {
1446 /*
1447 * Receiving message header. The function will be called again
1448 * once the rest of the message as been received and can be
1449 * interpreted.
1450 */
1451 const struct lttng_notification_channel_message *msg;
1452
1453 assert(sizeof(*msg) ==
1454 client->communication.inbound.buffer.size);
1455 msg = (const struct lttng_notification_channel_message *)
1456 client->communication.inbound.buffer.data;
1457
1458 if (msg->size == 0 || msg->size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) {
1459 ERR("[notification-thread] Invalid notification channel message: length = %u", msg->size);
1460 ret = -1;
1461 goto end;
1462 }
1463
1464 switch (msg->type) {
1465 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
1466 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
1467 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
1468 break;
1469 default:
1470 ret = -1;
1471 ERR("[notification-thread] Invalid notification channel message: unexpected message type");
1472 goto end;
1473 }
1474
1475 client->communication.inbound.bytes_to_receive = msg->size;
1476 client->communication.inbound.msg_type =
1477 (enum lttng_notification_channel_message_type) msg->type;
1478 if (client->communication.inbound.msg_type ==
1479 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE) {
1480 client->communication.inbound.receive_creds = true;
1481 }
1482 ret = lttng_dynamic_buffer_set_size(
1483 &client->communication.inbound.buffer, 0);
1484 if (ret) {
1485 goto end;
1486 }
1487 break;
1488 }
1489 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
1490 {
1491 struct lttng_notification_channel_command_handshake *handshake_client;
1492 struct lttng_notification_channel_command_handshake handshake_reply = {
1493 .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
1494 .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
1495 };
1496 struct lttng_notification_channel_message msg_header = {
1497 .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE,
1498 .size = sizeof(handshake_reply),
1499 };
1500 enum lttng_notification_channel_status status =
1501 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
1502 char send_buffer[sizeof(msg_header) + sizeof(handshake_reply)];
1503
1504 memcpy(send_buffer, &msg_header, sizeof(msg_header));
1505 memcpy(send_buffer + sizeof(msg_header), &handshake_reply,
1506 sizeof(handshake_reply));
1507
1508 handshake_client =
1509 (struct lttng_notification_channel_command_handshake *)
1510 client->communication.inbound.buffer.data;
1511 client->major = handshake_client->major;
1512 client->minor = handshake_client->minor;
1513 if (!client->communication.inbound.creds_received) {
1514 ERR("[notification-thread] No credentials received from client");
1515 ret = -1;
1516 goto end;
1517 }
1518
1519 client->uid = LTTNG_SOCK_GET_UID_CRED(
1520 &client->communication.inbound.creds);
1521 client->gid = LTTNG_SOCK_GET_GID_CRED(
1522 &client->communication.inbound.creds);
1523 DBG("[notification-thread] Received handshake from client (uid = %u, gid = %u) with version %i.%i",
1524 client->uid, client->gid, (int) client->major,
1525 (int) client->minor);
1526
1527 if (handshake_client->major != LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) {
1528 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNSUPPORTED_VERSION;
1529 }
1530
1531 ret = lttng_dynamic_buffer_append(&client->communication.outbound.buffer,
1532 send_buffer, sizeof(send_buffer));
1533 if (ret) {
1534 ERR("[notification-thread] Failed to send protocol version to notification channel client");
1535 goto end;
1536 }
1537
1538 ret = client_flush_outgoing_queue(client, state);
1539 if (ret) {
1540 goto end;
1541 }
1542
1543 ret = client_send_command_reply(client, state, status);
1544 if (ret) {
1545 ERR("[notification-thread] Failed to send reply to notification channel client");
1546 goto end;
1547 }
1548
1549 /* Set reception state to receive the next message header. */
1550 client_reset_inbound_state(client);
1551 client->validated = true;
1552 break;
1553 }
1554 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
1555 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
1556 {
1557 struct lttng_condition *condition;
1558 enum lttng_notification_channel_status status =
1559 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
1560 const struct lttng_buffer_view condition_view =
1561 lttng_buffer_view_from_dynamic_buffer(
1562 &client->communication.inbound.buffer,
1563 0, -1);
1564 size_t expected_condition_size =
1565 client->communication.inbound.buffer.size;
1566
1567 ret = lttng_condition_create_from_buffer(&condition_view,
1568 &condition);
1569 if (ret != expected_condition_size) {
1570 ERR("[notification-thread] Malformed condition received from client");
1571 goto end;
1572 }
1573
1574 if (client->communication.inbound.msg_type ==
1575 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE) {
1576 /*
1577 * FIXME The current state should be evaluated on
1578 * subscription.
1579 */
1580 ret = notification_thread_client_subscribe(client,
1581 condition, state, &status);
1582 } else {
1583 ret = notification_thread_client_unsubscribe(client,
1584 condition, state, &status);
1585 }
1586 if (ret) {
1587 goto end;
1588 }
1589
1590 ret = client_send_command_reply(client, state, status);
1591 if (ret) {
1592 ERR("[notification-thread] Failed to send reply to notification channel client");
1593 goto end;
1594 }
1595
1596 /* Set reception state to receive the next message header. */
1597 client_reset_inbound_state(client);
1598 break;
1599 }
1600 default:
1601 abort();
1602 }
1603end:
1604 return ret;
1605}
1606
1607/* Incoming data from client. */
1608int handle_notification_thread_client_in(
1609 struct notification_thread_state *state, int socket)
1610{
1611 int ret;
1612 struct notification_client *client;
1613 ssize_t recv_ret;
1614 size_t offset;
1615
1616 client = get_client_from_socket(socket, state);
1617 if (!client) {
1618 /* Internal error, abort. */
1619 ret = -1;
1620 goto end;
1621 }
1622
1623 offset = client->communication.inbound.buffer.size;
1624 ret = lttng_dynamic_buffer_set_size(
1625 &client->communication.inbound.buffer,
1626 client->communication.inbound.bytes_to_receive);
1627 if (ret) {
1628 goto end;
1629 }
1630
1631 if (client->communication.inbound.receive_creds) {
1632 recv_ret = lttcomm_recv_creds_unix_sock(socket,
1633 client->communication.inbound.buffer.data + offset,
1634 client->communication.inbound.bytes_to_receive,
1635 &client->communication.inbound.creds);
1636 if (recv_ret > 0) {
1637 client->communication.inbound.receive_creds = false;
1638 client->communication.inbound.creds_received = true;
1639 }
1640 } else {
1641 recv_ret = lttcomm_recv_unix_sock_non_block(socket,
1642 client->communication.inbound.buffer.data + offset,
1643 client->communication.inbound.bytes_to_receive);
1644 }
1645 if (recv_ret < 0) {
1646 goto error_disconnect_client;
1647 }
1648
1649 client->communication.inbound.bytes_to_receive -= recv_ret;
1650 ret = lttng_dynamic_buffer_set_size(
1651 &client->communication.inbound.buffer,
1652 client->communication.inbound.buffer.size -
1653 client->communication.inbound.bytes_to_receive);
1654 if (ret) {
1655 goto end;
1656 }
1657
1658 if (client->communication.inbound.bytes_to_receive == 0) {
1659 ret = client_dispatch_message(client, state);
1660 if (ret) {
1661 /*
1662 * Only returns an error if this client must be
1663 * disconnected.
1664 */
1665 goto error_disconnect_client;
1666 }
1667 } else {
1668 goto end;
1669 }
1670end:
1671 return ret;
1672error_disconnect_client:
1673 ret = handle_notification_thread_client_disconnect(socket, state);
1674 return ret;
1675}
1676
1677/* Client ready to receive outgoing data. */
1678int handle_notification_thread_client_out(
1679 struct notification_thread_state *state, int socket)
1680{
1681 int ret;
1682 struct notification_client *client;
1683
1684 client = get_client_from_socket(socket, state);
1685 if (!client) {
1686 /* Internal error, abort. */
1687 ret = -1;
1688 goto end;
1689 }
1690
1691 ret = client_flush_outgoing_queue(client, state);
1692 if (ret) {
1693 goto end;
1694 }
1695end:
1696 return ret;
1697}
1698
1699static
1700bool evaluate_buffer_usage_condition(struct lttng_condition *condition,
1701 struct channel_state_sample *sample, uint64_t buffer_capacity)
1702{
1703 bool result = false;
1704 uint64_t threshold;
1705 enum lttng_condition_type condition_type;
1706 struct lttng_condition_buffer_usage *use_condition = container_of(
1707 condition, struct lttng_condition_buffer_usage,
1708 parent);
1709
1710 if (!sample) {
1711 goto end;
1712 }
1713
1714 if (use_condition->threshold_bytes.set) {
1715 threshold = use_condition->threshold_bytes.value;
1716 } else {
1717 /*
1718 * Threshold was expressed as a ratio.
1719 *
1720 * TODO the threshold (in bytes) of conditions expressed
1721 * as a ratio of total buffer size could be cached to
1722 * forego this double-multiplication or it could be performed
1723 * as fixed-point math.
1724 *
1725 * Note that caching should accomodate the case where the
1726 * condition applies to multiple channels (i.e. don't assume
1727 * that all channels matching my_chann* have the same size...)
1728 */
1729 threshold = (uint64_t) (use_condition->threshold_ratio.value *
1730 (double) buffer_capacity);
1731 }
1732
1733 condition_type = lttng_condition_get_type(condition);
1734 if (condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW) {
1735 DBG("[notification-thread] Low buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
1736 threshold, sample->highest_usage);
1737
1738 /*
1739 * The low condition should only be triggered once _all_ of the
1740 * streams in a channel have gone below the "low" threshold.
1741 */
1742 if (sample->highest_usage <= threshold) {
1743 result = true;
1744 }
1745 } else {
1746 DBG("[notification-thread] High buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
1747 threshold, sample->highest_usage);
1748
1749 /*
1750 * For high buffer usage scenarios, we want to trigger whenever
1751 * _any_ of the streams has reached the "high" threshold.
1752 */
1753 if (sample->highest_usage >= threshold) {
1754 result = true;
1755 }
1756 }
1757end:
1758 return result;
1759}
1760
1761static
1762int evaluate_condition(struct lttng_condition *condition,
1763 struct lttng_evaluation **evaluation,
1764 struct notification_thread_state *state,
1765 struct channel_state_sample *previous_sample,
1766 struct channel_state_sample *latest_sample,
1767 uint64_t buffer_capacity)
1768{
1769 int ret = 0;
1770 enum lttng_condition_type condition_type;
1771 bool previous_sample_result;
1772 bool latest_sample_result;
1773
1774 condition_type = lttng_condition_get_type(condition);
1775 /* No other condition type supported for the moment. */
1776 assert(condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW ||
1777 condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH);
1778
1779 previous_sample_result = evaluate_buffer_usage_condition(condition,
1780 previous_sample, buffer_capacity);
1781 latest_sample_result = evaluate_buffer_usage_condition(condition,
1782 latest_sample, buffer_capacity);
1783
1784 if (!latest_sample_result ||
1785 (previous_sample_result == latest_sample_result)) {
1786 /*
1787 * Only trigger on a condition evaluation transition.
1788 *
1789 * NOTE: This edge-triggered logic may not be appropriate for
1790 * future condition types.
1791 */
1792 goto end;
1793 }
1794
1795 if (evaluation && latest_sample_result) {
1796 *evaluation = lttng_evaluation_buffer_usage_create(
1797 condition_type,
1798 latest_sample->highest_usage,
1799 buffer_capacity);
1800 if (!*evaluation) {
1801 ret = -1;
1802 goto end;
1803 }
1804 }
1805end:
1806 return ret;
1807}
1808
1809static
1810int client_enqueue_dropped_notification(struct notification_client *client,
1811 struct notification_thread_state *state)
1812{
1813 int ret;
1814 struct lttng_notification_channel_message msg = {
1815 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED,
1816 .size = 0,
1817 };
1818
1819 ret = lttng_dynamic_buffer_append(
1820 &client->communication.outbound.buffer, &msg,
1821 sizeof(msg));
1822 return ret;
1823}
1824
1825static
1826int send_evaluation_to_clients(struct lttng_trigger *trigger,
1827 struct lttng_evaluation *evaluation,
1828 struct notification_client_list* client_list,
1829 struct notification_thread_state *state,
1830 uid_t channel_uid, gid_t channel_gid)
1831{
1832 int ret = 0;
1833 struct lttng_dynamic_buffer msg_buffer;
1834 struct notification_client_list_element *client_list_element, *tmp;
1835 struct lttng_notification *notification;
1836 struct lttng_condition *condition;
1837 ssize_t expected_notification_size, notification_size;
1838 struct lttng_notification_channel_message msg;
1839
1840 lttng_dynamic_buffer_init(&msg_buffer);
1841
1842 condition = lttng_trigger_get_condition(trigger);
1843 assert(condition);
1844
1845 notification = lttng_notification_create(condition, evaluation);
1846 if (!notification) {
1847 ret = -1;
1848 goto end;
1849 }
1850
1851 expected_notification_size = lttng_notification_serialize(notification,
1852 NULL);
1853 if (expected_notification_size < 0) {
1854 ERR("[notification-thread] Failed to get size of serialized notification");
1855 ret = -1;
1856 goto end;
1857 }
1858
1859 msg.type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION;
1860 msg.size = (uint32_t) expected_notification_size;
1861 ret = lttng_dynamic_buffer_append(&msg_buffer, &msg, sizeof(msg));
1862 if (ret) {
1863 goto end;
1864 }
1865
1866 ret = lttng_dynamic_buffer_set_size(&msg_buffer,
1867 msg_buffer.size + expected_notification_size);
1868 if (ret) {
1869 goto end;
1870 }
1871
1872 notification_size = lttng_notification_serialize(notification,
1873 msg_buffer.data + sizeof(msg));
1874 if (notification_size != expected_notification_size) {
1875 ERR("[notification-thread] Failed to serialize notification");
1876 ret = -1;
1877 goto end;
1878 }
1879
1880 cds_list_for_each_entry_safe(client_list_element, tmp,
1881 &client_list->list, node) {
1882 struct notification_client *client =
1883 client_list_element->client;
1884
1885 if (client->uid != channel_uid && client->gid != channel_gid &&
1886 client->uid != 0) {
1887 /* Client is not allowed to monitor this channel. */
1888 DBG("[notification-thread] Skipping client at it does not have the permission to receive notification for this channel");
1889 continue;
1890 }
1891
1892 DBG("[notification-thread] Sending notification to client (fd = %i, %zu bytes)",
1893 client->socket, msg_buffer.size);
1894 if (client->communication.outbound.buffer.size) {
1895 /*
1896 * Outgoing data is already buffered for this client;
1897 * drop the notification and enqueue a "dropped
1898 * notification" message if this is the first dropped
1899 * notification since the socket spilled-over to the
1900 * queue.
1901 */
1902 DBG("[notification-thread] Dropping notification addressed to client (socket fd = %i)",
1903 client->socket);
1904 if (!client->communication.outbound.dropped_notification) {
1905 client->communication.outbound.dropped_notification = true;
1906 ret = client_enqueue_dropped_notification(
1907 client, state);
1908 if (ret) {
1909 goto end;
1910 }
1911 }
1912 continue;
1913 }
1914
1915 ret = lttng_dynamic_buffer_append_buffer(
1916 &client->communication.outbound.buffer,
1917 &msg_buffer);
1918 if (ret) {
1919 goto end;
1920 }
1921
1922 ret = client_flush_outgoing_queue(client, state);
1923 if (ret) {
1924 goto end;
1925 }
1926 }
1927 ret = 0;
1928end:
1929 lttng_notification_destroy(notification);
1930 lttng_dynamic_buffer_reset(&msg_buffer);
1931 return ret;
1932}
1933
1934int handle_notification_thread_channel_sample(
1935 struct notification_thread_state *state, int pipe,
1936 enum lttng_domain_type domain)
1937{
1938 int ret = 0;
1939 struct lttcomm_consumer_channel_monitor_msg sample_msg;
1940 struct channel_state_sample previous_sample, latest_sample;
1941 struct channel_info *channel_info;
1942 struct cds_lfht_node *node;
1943 struct cds_lfht_iter iter;
1944 struct lttng_channel_trigger_list *trigger_list;
1945 struct lttng_trigger_list_element *trigger_list_element;
1946 bool previous_sample_available = false;
1947
1948 /*
1949 * The monitoring pipe only holds messages smaller than PIPE_BUF,
1950 * ensuring that read/write of sampling messages are atomic.
1951 */
1952 do {
1953 ret = read(pipe, &sample_msg, sizeof(sample_msg));
1954 } while (ret == -1 && errno == EINTR);
1955 if (ret != sizeof(sample_msg)) {
1956 ERR("[notification-thread] Failed to read from monitoring pipe (fd = %i)",
1957 pipe);
1958 ret = -1;
1959 goto end;
1960 }
1961
1962 ret = 0;
1963 latest_sample.key.key = sample_msg.key;
1964 latest_sample.key.domain = domain;
1965 latest_sample.highest_usage = sample_msg.highest;
1966 latest_sample.lowest_usage = sample_msg.lowest;
1967
1968 rcu_read_lock();
1969
1970 /* Retrieve the channel's informations */
1971 cds_lfht_lookup(state->channels_ht,
1972 hash_channel_key(&latest_sample.key),
1973 match_channel_info,
1974 &latest_sample.key,
1975 &iter);
1976 node = cds_lfht_iter_get_node(&iter);
1977 if (!node) {
1978 /*
1979 * Not an error since the consumer can push a sample to the pipe
1980 * and the rest of the session daemon could notify us of the
1981 * channel's destruction before we get a chance to process that
1982 * sample.
1983 */
1984 DBG("[notification-thread] Received a sample for an unknown channel from consumerd, key = %" PRIu64 " in %s domain",
1985 latest_sample.key.key,
1986 domain == LTTNG_DOMAIN_KERNEL ? "kernel" :
1987 "user space");
1988 goto end_unlock;
1989 }
1990 channel_info = caa_container_of(node, struct channel_info,
1991 channels_ht_node);
1992 DBG("[notification-thread] Handling channel sample for channel %s (key = %" PRIu64 ") in session %s (highest usage = %" PRIu64 ", lowest usage = %" PRIu64")",
1993 channel_info->channel_name,
1994 latest_sample.key.key,
1995 channel_info->session_name,
1996 latest_sample.highest_usage,
1997 latest_sample.lowest_usage);
1998
1999 /* Retrieve the channel's last sample, if it exists, and update it. */
2000 cds_lfht_lookup(state->channel_state_ht,
2001 hash_channel_key(&latest_sample.key),
2002 match_channel_state_sample,
2003 &latest_sample.key,
2004 &iter);
2005 node = cds_lfht_iter_get_node(&iter);
2006 if (node) {
2007 struct channel_state_sample *stored_sample;
2008
2009 /* Update the sample stored. */
2010 stored_sample = caa_container_of(node,
2011 struct channel_state_sample,
2012 channel_state_ht_node);
2013 memcpy(&previous_sample, stored_sample,
2014 sizeof(previous_sample));
2015 stored_sample->highest_usage = latest_sample.highest_usage;
2016 stored_sample->lowest_usage = latest_sample.lowest_usage;
2017 previous_sample_available = true;
2018 } else {
2019 /*
2020 * This is the channel's first sample, allocate space for and
2021 * store the new sample.
2022 */
2023 struct channel_state_sample *stored_sample;
2024
2025 stored_sample = zmalloc(sizeof(*stored_sample));
2026 if (!stored_sample) {
2027 ret = -1;
2028 goto end_unlock;
2029 }
2030
2031 memcpy(stored_sample, &latest_sample, sizeof(*stored_sample));
2032 cds_lfht_node_init(&stored_sample->channel_state_ht_node);
2033 cds_lfht_add(state->channel_state_ht,
2034 hash_channel_key(&stored_sample->key),
2035 &stored_sample->channel_state_ht_node);
2036 }
2037
2038 /* Find triggers associated with this channel. */
2039 cds_lfht_lookup(state->channel_triggers_ht,
2040 hash_channel_key(&latest_sample.key),
2041 match_channel_trigger_list,
2042 &latest_sample.key,
2043 &iter);
2044 node = cds_lfht_iter_get_node(&iter);
2045 if (!node) {
2046 goto end_unlock;
2047 }
2048
2049 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
2050 channel_triggers_ht_node);
2051 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
2052 node) {
2053 struct lttng_condition *condition;
2054 struct lttng_action *action;
2055 struct lttng_trigger *trigger;
2056 struct notification_client_list *client_list;
2057 struct lttng_evaluation *evaluation = NULL;
2058
2059 trigger = trigger_list_element->trigger;
2060 condition = lttng_trigger_get_condition(trigger);
2061 assert(condition);
2062 action = lttng_trigger_get_action(trigger);
2063
2064 /* Notify actions are the only type currently supported. */
2065 assert(lttng_action_get_type(action) ==
2066 LTTNG_ACTION_TYPE_NOTIFY);
2067
2068 /*
2069 * Check if any client is subscribed to the result of this
2070 * evaluation.
2071 */
2072 cds_lfht_lookup(state->notification_trigger_clients_ht,
2073 lttng_condition_hash(condition),
2074 match_client_list,
2075 trigger,
2076 &iter);
2077 node = cds_lfht_iter_get_node(&iter);
2078 assert(node);
2079
2080 client_list = caa_container_of(node,
2081 struct notification_client_list,
2082 notification_trigger_ht_node);
2083 if (cds_list_empty(&client_list->list)) {
2084 /*
2085 * No clients interested in the evaluation's result,
2086 * skip it.
2087 */
2088 continue;
2089 }
2090
2091 ret = evaluate_condition(condition, &evaluation, state,
2092 previous_sample_available ? &previous_sample : NULL,
2093 &latest_sample, channel_info->capacity);
2094 if (ret) {
2095 goto end_unlock;
2096 }
2097
2098 if (!evaluation) {
2099 continue;
2100 }
2101
2102 /* Dispatch evaluation result to all clients. */
2103 ret = send_evaluation_to_clients(trigger_list_element->trigger,
2104 evaluation, client_list, state,
2105 channel_info->uid, channel_info->gid);
2106 if (ret) {
2107 goto end_unlock;
2108 }
2109 }
2110end_unlock:
2111 rcu_read_unlock();
2112end:
2113 return ret;
2114}
This page took 0.105882 seconds and 4 git commands to generate.