2 * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 #include <urcu/rculfhash.h>
22 #include <common/defaults.h>
23 #include <common/error.h>
24 #include <common/futex.h>
25 #include <common/unix.h>
26 #include <common/dynamic-buffer.h>
27 #include <common/hashtable/utils.h>
28 #include <common/sessiond-comm/sessiond-comm.h>
29 #include <common/macros.h>
30 #include <lttng/condition/condition.h>
31 #include <lttng/action/action-internal.h>
32 #include <lttng/notification/notification-internal.h>
33 #include <lttng/condition/condition-internal.h>
34 #include <lttng/condition/buffer-usage-internal.h>
35 #include <lttng/condition/session-consumed-size-internal.h>
36 #include <lttng/condition/session-rotation-internal.h>
37 #include <lttng/notification/channel-internal.h>
45 #include "notification-thread.h"
46 #include "notification-thread-events.h"
47 #include "notification-thread-commands.h"
48 #include "lttng-sessiond.h"
51 #define CLIENT_POLL_MASK_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
52 #define CLIENT_POLL_MASK_IN_OUT (CLIENT_POLL_MASK_IN | LPOLLOUT)
54 enum lttng_object_type
{
55 LTTNG_OBJECT_TYPE_UNKNOWN
,
56 LTTNG_OBJECT_TYPE_NONE
,
57 LTTNG_OBJECT_TYPE_CHANNEL
,
58 LTTNG_OBJECT_TYPE_SESSION
,
61 struct lttng_trigger_list_element
{
62 /* No ownership of the trigger object is assumed. */
63 const struct lttng_trigger
*trigger
;
64 struct cds_list_head node
;
67 struct lttng_channel_trigger_list
{
68 struct channel_key channel_key
;
69 /* List of struct lttng_trigger_list_element. */
70 struct cds_list_head list
;
71 /* Node in the channel_triggers_ht */
72 struct cds_lfht_node channel_triggers_ht_node
;
76 * List of triggers applying to a given session.
79 * - lttng_session_trigger_list_create()
80 * - lttng_session_trigger_list_build()
81 * - lttng_session_trigger_list_destroy()
82 * - lttng_session_trigger_list_add()
84 struct lttng_session_trigger_list
{
86 * Not owned by this; points to the session_info structure's
89 const char *session_name
;
90 /* List of struct lttng_trigger_list_element. */
91 struct cds_list_head list
;
92 /* Node in the session_triggers_ht */
93 struct cds_lfht_node session_triggers_ht_node
;
95 * Weak reference to the notification system's session triggers
98 * The session trigger list structure structure is owned by
99 * the session's session_info.
101 * The session_info is kept alive the the channel_infos holding a
102 * reference to it (reference counting). When those channels are
103 * destroyed (at runtime or on teardown), the reference they hold
104 * to the session_info are released. On destruction of session_info,
105 * session_info_destroy() will remove the list of triggers applying
106 * to this session from the notification system's state.
108 * This implies that the session_triggers_ht must be destroyed
109 * after the channels.
111 struct cds_lfht
*session_triggers_ht
;
112 /* Used for delayed RCU reclaim. */
113 struct rcu_head rcu_node
;
116 struct lttng_trigger_ht_element
{
117 struct lttng_trigger
*trigger
;
118 struct cds_lfht_node node
;
121 struct lttng_condition_list_element
{
122 struct lttng_condition
*condition
;
123 struct cds_list_head node
;
126 struct notification_client_list_element
{
127 struct notification_client
*client
;
128 struct cds_list_head node
;
131 struct notification_client_list
{
132 struct lttng_trigger
*trigger
;
133 struct cds_list_head list
;
134 struct cds_lfht_node notification_trigger_ht_node
;
137 struct notification_client
{
139 /* Client protocol version. */
140 uint8_t major
, minor
;
144 * Indicates if the credentials and versions of the client have been
149 * Conditions to which the client's notification channel is subscribed.
150 * List of struct lttng_condition_list_node. The condition member is
151 * owned by the client.
153 struct cds_list_head condition_list
;
154 struct cds_lfht_node client_socket_ht_node
;
158 * During the reception of a message, the reception
159 * buffers' "size" is set to contain the current
160 * message's complete payload.
162 struct lttng_dynamic_buffer buffer
;
163 /* Bytes left to receive for the current message. */
164 size_t bytes_to_receive
;
165 /* Type of the message being received. */
166 enum lttng_notification_channel_message_type msg_type
;
168 * Indicates whether or not credentials are expected
173 * Indicates whether or not credentials were received
177 /* Only used during credentials reception. */
178 lttng_sock_cred creds
;
182 * Indicates whether or not a notification addressed to
183 * this client was dropped because a command reply was
186 * A notification is dropped whenever the buffer is not
189 bool dropped_notification
;
191 * Indicates whether or not a command reply is already
192 * buffered. In this case, it means that the client is
193 * not consuming command replies before emitting a new
194 * one. This could be caused by a protocol error or a
195 * misbehaving/malicious client.
197 bool queued_command_reply
;
198 struct lttng_dynamic_buffer buffer
;
203 struct channel_state_sample
{
204 struct channel_key key
;
205 struct cds_lfht_node channel_state_ht_node
;
206 uint64_t highest_usage
;
207 uint64_t lowest_usage
;
208 uint64_t channel_total_consumed
;
211 static unsigned long hash_channel_key(struct channel_key
*key
);
212 static int evaluate_buffer_condition(const struct lttng_condition
*condition
,
213 struct lttng_evaluation
**evaluation
,
214 const struct notification_thread_state
*state
,
215 const struct channel_state_sample
*previous_sample
,
216 const struct channel_state_sample
*latest_sample
,
217 uint64_t previous_session_consumed_total
,
218 uint64_t latest_session_consumed_total
,
219 struct channel_info
*channel_info
);
221 int send_evaluation_to_clients(const struct lttng_trigger
*trigger
,
222 const struct lttng_evaluation
*evaluation
,
223 struct notification_client_list
*client_list
,
224 struct notification_thread_state
*state
,
225 uid_t channel_uid
, gid_t channel_gid
);
228 /* session_info API */
230 void session_info_destroy(void *_data
);
232 void session_info_get(struct session_info
*session_info
);
234 void session_info_put(struct session_info
*session_info
);
236 struct session_info
*session_info_create(const char *name
,
237 uid_t uid
, gid_t gid
,
238 struct lttng_session_trigger_list
*trigger_list
,
239 struct cds_lfht
*sessions_ht
);
241 void session_info_add_channel(struct session_info
*session_info
,
242 struct channel_info
*channel_info
);
244 void session_info_remove_channel(struct session_info
*session_info
,
245 struct channel_info
*channel_info
);
247 /* lttng_session_trigger_list API */
249 struct lttng_session_trigger_list
*lttng_session_trigger_list_create(
250 const char *session_name
,
251 struct cds_lfht
*session_triggers_ht
);
253 struct lttng_session_trigger_list
*lttng_session_trigger_list_build(
254 const struct notification_thread_state
*state
,
255 const char *session_name
);
257 void lttng_session_trigger_list_destroy(
258 struct lttng_session_trigger_list
*list
);
260 int lttng_session_trigger_list_add(struct lttng_session_trigger_list
*list
,
261 const struct lttng_trigger
*trigger
);
265 int match_client(struct cds_lfht_node
*node
, const void *key
)
267 /* This double-cast is intended to supress pointer-to-cast warning. */
268 int socket
= (int) (intptr_t) key
;
269 struct notification_client
*client
;
271 client
= caa_container_of(node
, struct notification_client
,
272 client_socket_ht_node
);
274 return !!(client
->socket
== socket
);
278 int match_channel_trigger_list(struct cds_lfht_node
*node
, const void *key
)
280 struct channel_key
*channel_key
= (struct channel_key
*) key
;
281 struct lttng_channel_trigger_list
*trigger_list
;
283 trigger_list
= caa_container_of(node
, struct lttng_channel_trigger_list
,
284 channel_triggers_ht_node
);
286 return !!((channel_key
->key
== trigger_list
->channel_key
.key
) &&
287 (channel_key
->domain
== trigger_list
->channel_key
.domain
));
291 int match_session_trigger_list(struct cds_lfht_node
*node
, const void *key
)
293 const char *session_name
= (const char *) key
;
294 struct lttng_session_trigger_list
*trigger_list
;
296 trigger_list
= caa_container_of(node
, struct lttng_session_trigger_list
,
297 session_triggers_ht_node
);
299 return !!(strcmp(trigger_list
->session_name
, session_name
) == 0);
303 int match_channel_state_sample(struct cds_lfht_node
*node
, const void *key
)
305 struct channel_key
*channel_key
= (struct channel_key
*) key
;
306 struct channel_state_sample
*sample
;
308 sample
= caa_container_of(node
, struct channel_state_sample
,
309 channel_state_ht_node
);
311 return !!((channel_key
->key
== sample
->key
.key
) &&
312 (channel_key
->domain
== sample
->key
.domain
));
316 int match_channel_info(struct cds_lfht_node
*node
, const void *key
)
318 struct channel_key
*channel_key
= (struct channel_key
*) key
;
319 struct channel_info
*channel_info
;
321 channel_info
= caa_container_of(node
, struct channel_info
,
324 return !!((channel_key
->key
== channel_info
->key
.key
) &&
325 (channel_key
->domain
== channel_info
->key
.domain
));
329 int match_condition(struct cds_lfht_node
*node
, const void *key
)
331 struct lttng_condition
*condition_key
= (struct lttng_condition
*) key
;
332 struct lttng_trigger_ht_element
*trigger
;
333 struct lttng_condition
*condition
;
335 trigger
= caa_container_of(node
, struct lttng_trigger_ht_element
,
337 condition
= lttng_trigger_get_condition(trigger
->trigger
);
340 return !!lttng_condition_is_equal(condition_key
, condition
);
344 int match_client_list_condition(struct cds_lfht_node
*node
, const void *key
)
346 struct lttng_condition
*condition_key
= (struct lttng_condition
*) key
;
347 struct notification_client_list
*client_list
;
348 struct lttng_condition
*condition
;
350 assert(condition_key
);
352 client_list
= caa_container_of(node
, struct notification_client_list
,
353 notification_trigger_ht_node
);
354 condition
= lttng_trigger_get_condition(client_list
->trigger
);
356 return !!lttng_condition_is_equal(condition_key
, condition
);
360 unsigned long lttng_condition_buffer_usage_hash(
361 const struct lttng_condition
*_condition
)
364 unsigned long condition_type
;
365 struct lttng_condition_buffer_usage
*condition
;
367 condition
= container_of(_condition
,
368 struct lttng_condition_buffer_usage
, parent
);
370 condition_type
= (unsigned long) condition
->parent
.type
;
371 hash
= hash_key_ulong((void *) condition_type
, lttng_ht_seed
);
372 if (condition
->session_name
) {
373 hash
^= hash_key_str(condition
->session_name
, lttng_ht_seed
);
375 if (condition
->channel_name
) {
376 hash
^= hash_key_str(condition
->channel_name
, lttng_ht_seed
);
378 if (condition
->domain
.set
) {
379 hash
^= hash_key_ulong(
380 (void *) condition
->domain
.type
,
383 if (condition
->threshold_ratio
.set
) {
386 val
= condition
->threshold_ratio
.value
* (double) UINT32_MAX
;
387 hash
^= hash_key_u64(&val
, lttng_ht_seed
);
388 } else if (condition
->threshold_bytes
.set
) {
391 val
= condition
->threshold_bytes
.value
;
392 hash
^= hash_key_u64(&val
, lttng_ht_seed
);
398 unsigned long lttng_condition_session_consumed_size_hash(
399 const struct lttng_condition
*_condition
)
402 unsigned long condition_type
=
403 (unsigned long) LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE
;
404 struct lttng_condition_session_consumed_size
*condition
;
407 condition
= container_of(_condition
,
408 struct lttng_condition_session_consumed_size
, parent
);
410 hash
= hash_key_ulong((void *) condition_type
, lttng_ht_seed
);
411 if (condition
->session_name
) {
412 hash
^= hash_key_str(condition
->session_name
, lttng_ht_seed
);
414 val
= condition
->consumed_threshold_bytes
.value
;
415 hash
^= hash_key_u64(&val
, lttng_ht_seed
);
420 unsigned long lttng_condition_session_rotation_hash(
421 const struct lttng_condition
*_condition
)
423 unsigned long hash
, condition_type
;
424 struct lttng_condition_session_rotation
*condition
;
426 condition
= container_of(_condition
,
427 struct lttng_condition_session_rotation
, parent
);
428 condition_type
= (unsigned long) condition
->parent
.type
;
429 hash
= hash_key_ulong((void *) condition_type
, lttng_ht_seed
);
430 assert(condition
->session_name
);
431 hash
^= hash_key_str(condition
->session_name
, lttng_ht_seed
);
436 * The lttng_condition hashing code is kept in this file (rather than
437 * condition.c) since it makes use of GPLv2 code (hashtable utils), which we
438 * don't want to link in liblttng-ctl.
441 unsigned long lttng_condition_hash(const struct lttng_condition
*condition
)
443 switch (condition
->type
) {
444 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW
:
445 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH
:
446 return lttng_condition_buffer_usage_hash(condition
);
447 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE
:
448 return lttng_condition_session_consumed_size_hash(condition
);
449 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING
:
450 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED
:
451 return lttng_condition_session_rotation_hash(condition
);
453 ERR("[notification-thread] Unexpected condition type caught");
459 unsigned long hash_channel_key(struct channel_key
*key
)
461 unsigned long key_hash
= hash_key_u64(&key
->key
, lttng_ht_seed
);
462 unsigned long domain_hash
= hash_key_ulong(
463 (void *) (unsigned long) key
->domain
, lttng_ht_seed
);
465 return key_hash
^ domain_hash
;
469 void channel_info_destroy(struct channel_info
*channel_info
)
475 if (channel_info
->session_info
) {
476 session_info_remove_channel(channel_info
->session_info
,
478 session_info_put(channel_info
->session_info
);
480 if (channel_info
->name
) {
481 free(channel_info
->name
);
486 /* Don't call directly, use the ref-counting mechanism. */
488 void session_info_destroy(void *_data
)
490 struct session_info
*session_info
= _data
;
493 assert(session_info
);
494 if (session_info
->channel_infos_ht
) {
495 ret
= cds_lfht_destroy(session_info
->channel_infos_ht
, NULL
);
497 ERR("[notification-thread] Failed to destroy channel information hash table");
500 lttng_session_trigger_list_destroy(session_info
->trigger_list
);
503 cds_lfht_del(session_info
->sessions_ht
,
504 &session_info
->sessions_ht_node
);
506 free(session_info
->name
);
511 void session_info_get(struct session_info
*session_info
)
516 lttng_ref_get(&session_info
->ref
);
520 void session_info_put(struct session_info
*session_info
)
525 lttng_ref_put(&session_info
->ref
);
529 struct session_info
*session_info_create(const char *name
, uid_t uid
, gid_t gid
,
530 struct lttng_session_trigger_list
*trigger_list
,
531 struct cds_lfht
*sessions_ht
)
533 struct session_info
*session_info
;
537 session_info
= zmalloc(sizeof(*session_info
));
541 lttng_ref_init(&session_info
->ref
, session_info_destroy
);
543 session_info
->channel_infos_ht
= cds_lfht_new(DEFAULT_HT_SIZE
,
544 1, 0, CDS_LFHT_AUTO_RESIZE
| CDS_LFHT_ACCOUNTING
, NULL
);
545 if (!session_info
->channel_infos_ht
) {
549 cds_lfht_node_init(&session_info
->sessions_ht_node
);
550 session_info
->name
= strdup(name
);
551 if (!session_info
->name
) {
554 session_info
->uid
= uid
;
555 session_info
->gid
= gid
;
556 session_info
->trigger_list
= trigger_list
;
557 session_info
->sessions_ht
= sessions_ht
;
561 session_info_put(session_info
);
566 void session_info_add_channel(struct session_info
*session_info
,
567 struct channel_info
*channel_info
)
570 cds_lfht_add(session_info
->channel_infos_ht
,
571 hash_channel_key(&channel_info
->key
),
572 &channel_info
->session_info_channels_ht_node
);
577 void session_info_remove_channel(struct session_info
*session_info
,
578 struct channel_info
*channel_info
)
581 cds_lfht_del(session_info
->channel_infos_ht
,
582 &channel_info
->session_info_channels_ht_node
);
587 struct channel_info
*channel_info_create(const char *channel_name
,
588 struct channel_key
*channel_key
, uint64_t channel_capacity
,
589 struct session_info
*session_info
)
591 struct channel_info
*channel_info
= zmalloc(sizeof(*channel_info
));
597 cds_lfht_node_init(&channel_info
->channels_ht_node
);
598 cds_lfht_node_init(&channel_info
->session_info_channels_ht_node
);
599 memcpy(&channel_info
->key
, channel_key
, sizeof(*channel_key
));
600 channel_info
->capacity
= channel_capacity
;
602 channel_info
->name
= strdup(channel_name
);
603 if (!channel_info
->name
) {
608 * Set the references between session and channel infos:
609 * - channel_info holds a strong reference to session_info
610 * - session_info holds a weak reference to channel_info
612 session_info_get(session_info
);
613 session_info_add_channel(session_info
, channel_info
);
614 channel_info
->session_info
= session_info
;
618 channel_info_destroy(channel_info
);
622 /* RCU read lock must be held by the caller. */
624 struct notification_client_list
*get_client_list_from_condition(
625 struct notification_thread_state
*state
,
626 const struct lttng_condition
*condition
)
628 struct cds_lfht_node
*node
;
629 struct cds_lfht_iter iter
;
631 cds_lfht_lookup(state
->notification_trigger_clients_ht
,
632 lttng_condition_hash(condition
),
633 match_client_list_condition
,
636 node
= cds_lfht_iter_get_node(&iter
);
638 return node
? caa_container_of(node
,
639 struct notification_client_list
,
640 notification_trigger_ht_node
) : NULL
;
643 /* This function must be called with the RCU read lock held. */
645 int evaluate_condition_for_client(struct lttng_trigger
*trigger
,
646 struct lttng_condition
*condition
,
647 struct notification_client
*client
,
648 struct notification_thread_state
*state
)
651 struct cds_lfht_iter iter
;
652 struct cds_lfht_node
*node
;
653 struct channel_info
*channel_info
= NULL
;
654 struct channel_key
*channel_key
= NULL
;
655 struct channel_state_sample
*last_sample
= NULL
;
656 struct lttng_channel_trigger_list
*channel_trigger_list
= NULL
;
657 struct lttng_evaluation
*evaluation
= NULL
;
658 struct notification_client_list client_list
= { 0 };
659 struct notification_client_list_element client_list_element
= { 0 };
666 /* Find the channel associated with the trigger. */
667 cds_lfht_for_each_entry(state
->channel_triggers_ht
, &iter
,
668 channel_trigger_list
, channel_triggers_ht_node
) {
669 struct lttng_trigger_list_element
*element
;
671 cds_list_for_each_entry(element
, &channel_trigger_list
->list
, node
) {
672 const struct lttng_condition
*current_condition
=
673 lttng_trigger_get_const_condition(
676 assert(current_condition
);
677 if (!lttng_condition_is_equal(condition
,
678 current_condition
)) {
682 /* Found the trigger, save the channel key. */
683 channel_key
= &channel_trigger_list
->channel_key
;
687 /* The channel key was found stop iteration. */
693 /* No channel found; normal exit. */
694 DBG("[notification-thread] No channel associated with newly subscribed-to condition");
699 /* Fetch channel info for the matching channel. */
700 cds_lfht_lookup(state
->channels_ht
,
701 hash_channel_key(channel_key
),
705 node
= cds_lfht_iter_get_node(&iter
);
707 channel_info
= caa_container_of(node
, struct channel_info
,
710 /* Retrieve the channel's last sample, if it exists. */
711 cds_lfht_lookup(state
->channel_state_ht
,
712 hash_channel_key(channel_key
),
713 match_channel_state_sample
,
716 node
= cds_lfht_iter_get_node(&iter
);
718 last_sample
= caa_container_of(node
,
719 struct channel_state_sample
,
720 channel_state_ht_node
);
722 /* Nothing to evaluate, no sample was ever taken. Normal exit */
723 DBG("[notification-thread] No channel sample associated with newly subscribed-to condition");
728 ret
= evaluate_buffer_condition(condition
, &evaluation
, state
,
730 0, channel_info
->session_info
->consumed_data_size
,
733 WARN("[notification-thread] Fatal error occurred while evaluating a newly subscribed-to condition");
738 /* Evaluation yielded nothing. Normal exit. */
739 DBG("[notification-thread] Newly subscribed-to condition evaluated to false, nothing to report to client");
745 * Create a temporary client list with the client currently
748 cds_lfht_node_init(&client_list
.notification_trigger_ht_node
);
749 CDS_INIT_LIST_HEAD(&client_list
.list
);
750 client_list
.trigger
= trigger
;
752 CDS_INIT_LIST_HEAD(&client_list_element
.node
);
753 client_list_element
.client
= client
;
754 cds_list_add(&client_list_element
.node
, &client_list
.list
);
756 /* Send evaluation result to the newly-subscribed client. */
757 DBG("[notification-thread] Newly subscribed-to condition evaluated to true, notifying client");
758 ret
= send_evaluation_to_clients(trigger
, evaluation
, &client_list
,
759 state
, channel_info
->session_info
->uid
,
760 channel_info
->session_info
->gid
);
767 int notification_thread_client_subscribe(struct notification_client
*client
,
768 struct lttng_condition
*condition
,
769 struct notification_thread_state
*state
,
770 enum lttng_notification_channel_status
*_status
)
773 struct notification_client_list
*client_list
;
774 struct lttng_condition_list_element
*condition_list_element
= NULL
;
775 struct notification_client_list_element
*client_list_element
= NULL
;
776 enum lttng_notification_channel_status status
=
777 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
780 * Ensure that the client has not already subscribed to this condition
783 cds_list_for_each_entry(condition_list_element
, &client
->condition_list
, node
) {
784 if (lttng_condition_is_equal(condition_list_element
->condition
,
786 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ALREADY_SUBSCRIBED
;
791 condition_list_element
= zmalloc(sizeof(*condition_list_element
));
792 if (!condition_list_element
) {
796 client_list_element
= zmalloc(sizeof(*client_list_element
));
797 if (!client_list_element
) {
805 * Add the newly-subscribed condition to the client's subscription list.
807 CDS_INIT_LIST_HEAD(&condition_list_element
->node
);
808 condition_list_element
->condition
= condition
;
809 cds_list_add(&condition_list_element
->node
, &client
->condition_list
);
811 client_list
= get_client_list_from_condition(state
, condition
);
814 * No notification-emiting trigger registered with this
815 * condition. We don't evaluate the condition right away
816 * since this trigger is not registered yet.
818 free(client_list_element
);
823 * The condition to which the client just subscribed is evaluated
824 * at this point so that conditions that are already TRUE result
825 * in a notification being sent out.
827 if (evaluate_condition_for_client(client_list
->trigger
, condition
,
829 WARN("[notification-thread] Evaluation of a condition on client subscription failed, aborting.");
831 free(client_list_element
);
836 * Add the client to the list of clients interested in a given trigger
837 * if a "notification" trigger with a corresponding condition was
840 client_list_element
->client
= client
;
841 CDS_INIT_LIST_HEAD(&client_list_element
->node
);
842 cds_list_add(&client_list_element
->node
, &client_list
->list
);
851 free(condition_list_element
);
852 free(client_list_element
);
857 int notification_thread_client_unsubscribe(
858 struct notification_client
*client
,
859 struct lttng_condition
*condition
,
860 struct notification_thread_state
*state
,
861 enum lttng_notification_channel_status
*_status
)
863 struct notification_client_list
*client_list
;
864 struct lttng_condition_list_element
*condition_list_element
,
866 struct notification_client_list_element
*client_list_element
,
868 bool condition_found
= false;
869 enum lttng_notification_channel_status status
=
870 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
872 /* Remove the condition from the client's condition list. */
873 cds_list_for_each_entry_safe(condition_list_element
, condition_tmp
,
874 &client
->condition_list
, node
) {
875 if (!lttng_condition_is_equal(condition_list_element
->condition
,
880 cds_list_del(&condition_list_element
->node
);
882 * The caller may be iterating on the client's conditions to
883 * tear down a client's connection. In this case, the condition
884 * will be destroyed at the end.
886 if (condition
!= condition_list_element
->condition
) {
887 lttng_condition_destroy(
888 condition_list_element
->condition
);
890 free(condition_list_element
);
891 condition_found
= true;
895 if (!condition_found
) {
896 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_UNKNOWN_CONDITION
;
901 * Remove the client from the list of clients interested the trigger
902 * matching the condition.
905 client_list
= get_client_list_from_condition(state
, condition
);
910 cds_list_for_each_entry_safe(client_list_element
, client_tmp
,
911 &client_list
->list
, node
) {
912 if (client_list_element
->client
->socket
!= client
->socket
) {
915 cds_list_del(&client_list_element
->node
);
916 free(client_list_element
);
922 lttng_condition_destroy(condition
);
930 void notification_client_destroy(struct notification_client
*client
,
931 struct notification_thread_state
*state
)
933 struct lttng_condition_list_element
*condition_list_element
, *tmp
;
939 /* Release all conditions to which the client was subscribed. */
940 cds_list_for_each_entry_safe(condition_list_element
, tmp
,
941 &client
->condition_list
, node
) {
942 (void) notification_thread_client_unsubscribe(client
,
943 condition_list_element
->condition
, state
, NULL
);
946 if (client
->socket
>= 0) {
947 (void) lttcomm_close_unix_sock(client
->socket
);
949 lttng_dynamic_buffer_reset(&client
->communication
.inbound
.buffer
);
950 lttng_dynamic_buffer_reset(&client
->communication
.outbound
.buffer
);
955 * Call with rcu_read_lock held (and hold for the lifetime of the returned
959 struct notification_client
*get_client_from_socket(int socket
,
960 struct notification_thread_state
*state
)
962 struct cds_lfht_iter iter
;
963 struct cds_lfht_node
*node
;
964 struct notification_client
*client
= NULL
;
966 cds_lfht_lookup(state
->client_socket_ht
,
967 hash_key_ulong((void *) (unsigned long) socket
, lttng_ht_seed
),
969 (void *) (unsigned long) socket
,
971 node
= cds_lfht_iter_get_node(&iter
);
976 client
= caa_container_of(node
, struct notification_client
,
977 client_socket_ht_node
);
983 bool buffer_usage_condition_applies_to_channel(
984 const struct lttng_condition
*condition
,
985 const struct channel_info
*channel_info
)
987 enum lttng_condition_status status
;
988 enum lttng_domain_type condition_domain
;
989 const char *condition_session_name
= NULL
;
990 const char *condition_channel_name
= NULL
;
992 status
= lttng_condition_buffer_usage_get_domain_type(condition
,
994 assert(status
== LTTNG_CONDITION_STATUS_OK
);
995 if (channel_info
->key
.domain
!= condition_domain
) {
999 status
= lttng_condition_buffer_usage_get_session_name(
1000 condition
, &condition_session_name
);
1001 assert((status
== LTTNG_CONDITION_STATUS_OK
) && condition_session_name
);
1003 status
= lttng_condition_buffer_usage_get_channel_name(
1004 condition
, &condition_channel_name
);
1005 assert((status
== LTTNG_CONDITION_STATUS_OK
) && condition_channel_name
);
1007 if (strcmp(channel_info
->session_info
->name
, condition_session_name
)) {
1010 if (strcmp(channel_info
->name
, condition_channel_name
)) {
1020 bool session_consumed_size_condition_applies_to_channel(
1021 const struct lttng_condition
*condition
,
1022 const struct channel_info
*channel_info
)
1024 enum lttng_condition_status status
;
1025 const char *condition_session_name
= NULL
;
1027 status
= lttng_condition_session_consumed_size_get_session_name(
1028 condition
, &condition_session_name
);
1029 assert((status
== LTTNG_CONDITION_STATUS_OK
) && condition_session_name
);
1031 if (strcmp(channel_info
->session_info
->name
, condition_session_name
)) {
1041 * Get the type of object to which a given trigger applies. Binding lets
1042 * the notification system evaluate a trigger's condition when a given
1043 * object's state is updated.
1045 * For instance, a condition bound to a channel will be evaluated everytime
1046 * the channel's state is changed by a channel monitoring sample.
1049 enum lttng_object_type
get_trigger_binding_object(
1050 const struct lttng_trigger
*trigger
)
1052 const struct lttng_condition
*condition
;
1054 condition
= lttng_trigger_get_const_condition(trigger
);
1055 switch (lttng_condition_get_type(condition
)) {
1056 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW
:
1057 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH
:
1058 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE
:
1059 return LTTNG_OBJECT_TYPE_CHANNEL
;
1060 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING
:
1061 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED
:
1062 return LTTNG_OBJECT_TYPE_SESSION
;
1064 return LTTNG_OBJECT_TYPE_UNKNOWN
;
1069 bool trigger_applies_to_channel(const struct lttng_trigger
*trigger
,
1070 const struct channel_info
*channel_info
)
1072 const struct lttng_condition
*condition
;
1073 bool trigger_applies
;
1075 condition
= lttng_trigger_get_const_condition(trigger
);
1080 switch (lttng_condition_get_type(condition
)) {
1081 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW
:
1082 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH
:
1083 trigger_applies
= buffer_usage_condition_applies_to_channel(
1084 condition
, channel_info
);
1086 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE
:
1087 trigger_applies
= session_consumed_size_condition_applies_to_channel(
1088 condition
, channel_info
);
1094 return trigger_applies
;
1100 bool trigger_applies_to_client(struct lttng_trigger
*trigger
,
1101 struct notification_client
*client
)
1103 bool applies
= false;
1104 struct lttng_condition_list_element
*condition_list_element
;
1106 cds_list_for_each_entry(condition_list_element
, &client
->condition_list
,
1108 applies
= lttng_condition_is_equal(
1109 condition_list_element
->condition
,
1110 lttng_trigger_get_condition(trigger
));
1119 int match_session(struct cds_lfht_node
*node
, const void *key
)
1121 const char *name
= key
;
1122 struct session_info
*session_info
= caa_container_of(
1123 node
, struct session_info
, sessions_ht_node
);
1125 return !strcmp(session_info
->name
, name
);
1128 /* Must be called with RCU read lock held. */
1130 struct lttng_session_trigger_list
*get_session_trigger_list(
1131 struct notification_thread_state
*state
,
1132 const char *session_name
)
1134 struct lttng_session_trigger_list
*list
= NULL
;
1135 struct cds_lfht_node
*node
;
1136 struct cds_lfht_iter iter
;
1138 cds_lfht_lookup(state
->session_triggers_ht
,
1139 hash_key_str(session_name
, lttng_ht_seed
),
1140 match_session_trigger_list
,
1143 node
= cds_lfht_iter_get_node(&iter
);
1146 * Not an error, the list of triggers applying to that session
1147 * will be initialized when the session is created.
1149 DBG("[notification-thread] No trigger list found for session \"%s\" as it is not yet known to the notification system",
1154 list
= caa_container_of(node
,
1155 struct lttng_session_trigger_list
,
1156 session_triggers_ht_node
);
1162 * Allocate an empty lttng_session_trigger_list for the session named
1165 * No ownership of 'session_name' is assumed by the session trigger list.
1166 * It is the caller's responsability to ensure the session name is alive
1167 * for as long as this list is.
1170 struct lttng_session_trigger_list
*lttng_session_trigger_list_create(
1171 const char *session_name
,
1172 struct cds_lfht
*session_triggers_ht
)
1174 struct lttng_session_trigger_list
*list
;
1176 list
= zmalloc(sizeof(*list
));
1180 list
->session_name
= session_name
;
1181 CDS_INIT_LIST_HEAD(&list
->list
);
1182 cds_lfht_node_init(&list
->session_triggers_ht_node
);
1183 list
->session_triggers_ht
= session_triggers_ht
;
1186 /* Publish the list through the session_triggers_ht. */
1187 cds_lfht_add(session_triggers_ht
,
1188 hash_key_str(session_name
, lttng_ht_seed
),
1189 &list
->session_triggers_ht_node
);
1196 void free_session_trigger_list_rcu(struct rcu_head
*node
)
1198 free(caa_container_of(node
, struct lttng_session_trigger_list
,
1203 void lttng_session_trigger_list_destroy(struct lttng_session_trigger_list
*list
)
1205 struct lttng_trigger_list_element
*trigger_list_element
, *tmp
;
1207 /* Empty the list element by element, and then free the list itself. */
1208 cds_list_for_each_entry_safe(trigger_list_element
, tmp
,
1209 &list
->list
, node
) {
1210 cds_list_del(&trigger_list_element
->node
);
1211 free(trigger_list_element
);
1214 /* Unpublish the list from the session_triggers_ht. */
1215 cds_lfht_del(list
->session_triggers_ht
,
1216 &list
->session_triggers_ht_node
);
1218 call_rcu(&list
->rcu_node
, free_session_trigger_list_rcu
);
1222 int lttng_session_trigger_list_add(struct lttng_session_trigger_list
*list
,
1223 const struct lttng_trigger
*trigger
)
1226 struct lttng_trigger_list_element
*new_element
=
1227 zmalloc(sizeof(*new_element
));
1233 CDS_INIT_LIST_HEAD(&new_element
->node
);
1234 new_element
->trigger
= trigger
;
1235 cds_list_add(&new_element
->node
, &list
->list
);
1241 bool trigger_applies_to_session(const struct lttng_trigger
*trigger
,
1242 const char *session_name
)
1244 bool applies
= false;
1245 const struct lttng_condition
*condition
;
1247 condition
= lttng_trigger_get_const_condition(trigger
);
1248 switch (lttng_condition_get_type(condition
)) {
1249 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING
:
1250 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED
:
1252 enum lttng_condition_status condition_status
;
1253 const char *condition_session_name
;
1255 condition_status
= lttng_condition_session_rotation_get_session_name(
1256 condition
, &condition_session_name
);
1257 if (condition_status
!= LTTNG_CONDITION_STATUS_OK
) {
1258 ERR("[notification-thread] Failed to retrieve session rotation condition's session name");
1262 assert(condition_session_name
);
1263 applies
= !strcmp(condition_session_name
, session_name
);
1274 * Allocate and initialize an lttng_session_trigger_list which contains
1275 * all triggers that apply to the session named 'session_name'.
1277 * No ownership of 'session_name' is assumed by the session trigger list.
1278 * It is the caller's responsability to ensure the session name is alive
1279 * for as long as this list is.
1282 struct lttng_session_trigger_list
*lttng_session_trigger_list_build(
1283 const struct notification_thread_state
*state
,
1284 const char *session_name
)
1286 int trigger_count
= 0;
1287 struct lttng_session_trigger_list
*session_trigger_list
= NULL
;
1288 struct lttng_trigger_ht_element
*trigger_ht_element
= NULL
;
1289 struct cds_lfht_iter iter
;
1291 session_trigger_list
= lttng_session_trigger_list_create(session_name
,
1292 state
->session_triggers_ht
);
1294 /* Add all triggers applying to the session named 'session_name'. */
1295 cds_lfht_for_each_entry(state
->triggers_ht
, &iter
, trigger_ht_element
,
1299 if (!trigger_applies_to_session(trigger_ht_element
->trigger
,
1304 ret
= lttng_session_trigger_list_add(session_trigger_list
,
1305 trigger_ht_element
->trigger
);
1313 DBG("[notification-thread] Found %i triggers that apply to newly created session",
1315 return session_trigger_list
;
1317 lttng_session_trigger_list_destroy(session_trigger_list
);
1322 struct session_info
*find_or_create_session_info(
1323 struct notification_thread_state
*state
,
1324 const char *name
, uid_t uid
, gid_t gid
)
1326 struct session_info
*session
= NULL
;
1327 struct cds_lfht_node
*node
;
1328 struct cds_lfht_iter iter
;
1329 struct lttng_session_trigger_list
*trigger_list
;
1332 cds_lfht_lookup(state
->sessions_ht
,
1333 hash_key_str(name
, lttng_ht_seed
),
1337 node
= cds_lfht_iter_get_node(&iter
);
1339 DBG("[notification-thread] Found session info of session \"%s\" (uid = %i, gid = %i)",
1341 session
= caa_container_of(node
, struct session_info
,
1343 assert(session
->uid
== uid
);
1344 assert(session
->gid
== gid
);
1345 session_info_get(session
);
1349 trigger_list
= lttng_session_trigger_list_build(state
, name
);
1350 if (!trigger_list
) {
1354 session
= session_info_create(name
, uid
, gid
, trigger_list
,
1355 state
->sessions_ht
);
1357 ERR("[notification-thread] Failed to allocation session info for session \"%s\" (uid = %i, gid = %i)",
1361 trigger_list
= NULL
;
1363 cds_lfht_add(state
->sessions_ht
, hash_key_str(name
, lttng_ht_seed
),
1364 &session
->sessions_ht_node
);
1370 session_info_put(session
);
1375 int handle_notification_thread_command_add_channel(
1376 struct notification_thread_state
*state
,
1377 const char *session_name
, uid_t session_uid
, gid_t session_gid
,
1378 const char *channel_name
, enum lttng_domain_type channel_domain
,
1379 uint64_t channel_key_int
, uint64_t channel_capacity
,
1380 enum lttng_error_code
*cmd_result
)
1382 struct cds_list_head trigger_list
;
1383 struct channel_info
*new_channel_info
= NULL
;
1384 struct channel_key channel_key
= {
1385 .key
= channel_key_int
,
1386 .domain
= channel_domain
,
1388 struct lttng_channel_trigger_list
*channel_trigger_list
= NULL
;
1389 struct lttng_trigger_ht_element
*trigger_ht_element
= NULL
;
1390 int trigger_count
= 0;
1391 struct cds_lfht_iter iter
;
1392 struct session_info
*session_info
= NULL
;
1394 DBG("[notification-thread] Adding channel %s from session %s, channel key = %" PRIu64
" in %s domain",
1395 channel_name
, session_name
, channel_key_int
,
1396 channel_domain
== LTTNG_DOMAIN_KERNEL
? "kernel" : "user space");
1398 CDS_INIT_LIST_HEAD(&trigger_list
);
1400 session_info
= find_or_create_session_info(state
, session_name
,
1401 session_uid
, session_gid
);
1402 if (!session_info
) {
1403 /* Allocation error or an internal error occured. */
1407 new_channel_info
= channel_info_create(channel_name
, &channel_key
,
1408 channel_capacity
, session_info
);
1409 if (!new_channel_info
) {
1413 /* Build a list of all triggers applying to the new channel. */
1414 cds_lfht_for_each_entry(state
->triggers_ht
, &iter
, trigger_ht_element
,
1416 struct lttng_trigger_list_element
*new_element
;
1418 if (!trigger_applies_to_channel(trigger_ht_element
->trigger
,
1419 new_channel_info
)) {
1423 new_element
= zmalloc(sizeof(*new_element
));
1427 CDS_INIT_LIST_HEAD(&new_element
->node
);
1428 new_element
->trigger
= trigger_ht_element
->trigger
;
1429 cds_list_add(&new_element
->node
, &trigger_list
);
1433 DBG("[notification-thread] Found %i triggers that apply to newly added channel",
1435 channel_trigger_list
= zmalloc(sizeof(*channel_trigger_list
));
1436 if (!channel_trigger_list
) {
1439 channel_trigger_list
->channel_key
= new_channel_info
->key
;
1440 CDS_INIT_LIST_HEAD(&channel_trigger_list
->list
);
1441 cds_lfht_node_init(&channel_trigger_list
->channel_triggers_ht_node
);
1442 cds_list_splice(&trigger_list
, &channel_trigger_list
->list
);
1445 /* Add channel to the channel_ht which owns the channel_infos. */
1446 cds_lfht_add(state
->channels_ht
,
1447 hash_channel_key(&new_channel_info
->key
),
1448 &new_channel_info
->channels_ht_node
);
1450 * Add the list of triggers associated with this channel to the
1451 * channel_triggers_ht.
1453 cds_lfht_add(state
->channel_triggers_ht
,
1454 hash_channel_key(&new_channel_info
->key
),
1455 &channel_trigger_list
->channel_triggers_ht_node
);
1457 session_info_put(session_info
);
1458 *cmd_result
= LTTNG_OK
;
1461 channel_info_destroy(new_channel_info
);
1462 session_info_put(session_info
);
1467 int handle_notification_thread_command_remove_channel(
1468 struct notification_thread_state
*state
,
1469 uint64_t channel_key
, enum lttng_domain_type domain
,
1470 enum lttng_error_code
*cmd_result
)
1472 struct cds_lfht_node
*node
;
1473 struct cds_lfht_iter iter
;
1474 struct lttng_channel_trigger_list
*trigger_list
;
1475 struct lttng_trigger_list_element
*trigger_list_element
, *tmp
;
1476 struct channel_key key
= { .key
= channel_key
, .domain
= domain
};
1477 struct channel_info
*channel_info
;
1479 DBG("[notification-thread] Removing channel key = %" PRIu64
" in %s domain",
1480 channel_key
, domain
== LTTNG_DOMAIN_KERNEL
? "kernel" : "user space");
1484 cds_lfht_lookup(state
->channel_triggers_ht
,
1485 hash_channel_key(&key
),
1486 match_channel_trigger_list
,
1489 node
= cds_lfht_iter_get_node(&iter
);
1491 * There is a severe internal error if we are being asked to remove a
1492 * channel that doesn't exist.
1495 ERR("[notification-thread] Channel being removed is unknown to the notification thread");
1499 /* Free the list of triggers associated with this channel. */
1500 trigger_list
= caa_container_of(node
, struct lttng_channel_trigger_list
,
1501 channel_triggers_ht_node
);
1502 cds_list_for_each_entry_safe(trigger_list_element
, tmp
,
1503 &trigger_list
->list
, node
) {
1504 cds_list_del(&trigger_list_element
->node
);
1505 free(trigger_list_element
);
1507 cds_lfht_del(state
->channel_triggers_ht
, node
);
1510 /* Free sampled channel state. */
1511 cds_lfht_lookup(state
->channel_state_ht
,
1512 hash_channel_key(&key
),
1513 match_channel_state_sample
,
1516 node
= cds_lfht_iter_get_node(&iter
);
1518 * This is expected to be NULL if the channel is destroyed before we
1519 * received a sample.
1522 struct channel_state_sample
*sample
= caa_container_of(node
,
1523 struct channel_state_sample
,
1524 channel_state_ht_node
);
1526 cds_lfht_del(state
->channel_state_ht
, node
);
1530 /* Remove the channel from the channels_ht and free it. */
1531 cds_lfht_lookup(state
->channels_ht
,
1532 hash_channel_key(&key
),
1536 node
= cds_lfht_iter_get_node(&iter
);
1538 channel_info
= caa_container_of(node
, struct channel_info
,
1540 cds_lfht_del(state
->channels_ht
, node
);
1541 channel_info_destroy(channel_info
);
1544 *cmd_result
= LTTNG_OK
;
1549 int handle_notification_thread_command_session_rotation(
1550 struct notification_thread_state
*state
,
1551 enum notification_thread_command_type cmd_type
,
1552 const char *session_name
, uid_t session_uid
, gid_t session_gid
,
1553 uint64_t trace_archive_chunk_id
,
1554 struct lttng_trace_archive_location
*location
,
1555 enum lttng_error_code
*_cmd_result
)
1558 enum lttng_error_code cmd_result
= LTTNG_OK
;
1559 struct lttng_session_trigger_list
*trigger_list
;
1560 struct lttng_trigger_list_element
*trigger_list_element
;
1561 struct session_info
*session_info
;
1565 session_info
= find_or_create_session_info(state
, session_name
,
1566 session_uid
, session_gid
);
1567 if (!session_info
) {
1568 /* Allocation error or an internal error occured. */
1570 cmd_result
= LTTNG_ERR_NOMEM
;
1574 session_info
->rotation
.ongoing
=
1575 cmd_type
== NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING
;
1576 session_info
->rotation
.id
= trace_archive_chunk_id
;
1577 trigger_list
= get_session_trigger_list(state
, session_name
);
1578 if (!trigger_list
) {
1579 DBG("[notification-thread] No triggers applying to session \"%s\" found",
1584 cds_list_for_each_entry(trigger_list_element
, &trigger_list
->list
,
1586 const struct lttng_condition
*condition
;
1587 const struct lttng_action
*action
;
1588 const struct lttng_trigger
*trigger
;
1589 struct notification_client_list
*client_list
;
1590 struct lttng_evaluation
*evaluation
= NULL
;
1591 enum lttng_condition_type condition_type
;
1593 trigger
= trigger_list_element
->trigger
;
1594 condition
= lttng_trigger_get_const_condition(trigger
);
1596 condition_type
= lttng_condition_get_type(condition
);
1598 if (condition_type
== LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING
&&
1599 cmd_type
!= NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING
) {
1601 } else if (condition_type
== LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED
&&
1602 cmd_type
!= NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED
) {
1606 action
= lttng_trigger_get_const_action(trigger
);
1608 /* Notify actions are the only type currently supported. */
1609 assert(lttng_action_get_type_const(action
) ==
1610 LTTNG_ACTION_TYPE_NOTIFY
);
1612 client_list
= get_client_list_from_condition(state
, condition
);
1613 assert(client_list
);
1615 if (cds_list_empty(&client_list
->list
)) {
1617 * No clients interested in the evaluation's result,
1623 if (cmd_type
== NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING
) {
1624 evaluation
= lttng_evaluation_session_rotation_ongoing_create(
1625 trace_archive_chunk_id
);
1627 evaluation
= lttng_evaluation_session_rotation_completed_create(
1628 trace_archive_chunk_id
, location
);
1632 /* Internal error */
1634 cmd_result
= LTTNG_ERR_UNK
;
1638 /* Dispatch evaluation result to all clients. */
1639 ret
= send_evaluation_to_clients(trigger_list_element
->trigger
,
1640 evaluation
, client_list
, state
,
1643 lttng_evaluation_destroy(evaluation
);
1644 if (caa_unlikely(ret
)) {
1649 session_info_put(session_info
);
1650 *_cmd_result
= cmd_result
;
1656 int condition_is_supported(struct lttng_condition
*condition
)
1660 switch (lttng_condition_get_type(condition
)) {
1661 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW
:
1662 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH
:
1664 enum lttng_domain_type domain
;
1666 ret
= lttng_condition_buffer_usage_get_domain_type(condition
,
1673 if (domain
!= LTTNG_DOMAIN_KERNEL
) {
1679 * Older kernel tracers don't expose the API to monitor their
1680 * buffers. Therefore, we reject triggers that require that
1681 * mechanism to be available to be evaluated.
1683 ret
= kernel_supports_ring_buffer_snapshot_sample_positions(
1694 /* Must be called with RCU read lock held. */
1696 int bind_trigger_to_matching_session(const struct lttng_trigger
*trigger
,
1697 struct notification_thread_state
*state
)
1700 const struct lttng_condition
*condition
;
1701 const char *session_name
;
1702 struct lttng_session_trigger_list
*trigger_list
;
1704 condition
= lttng_trigger_get_const_condition(trigger
);
1705 switch (lttng_condition_get_type(condition
)) {
1706 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING
:
1707 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED
:
1709 enum lttng_condition_status status
;
1711 status
= lttng_condition_session_rotation_get_session_name(
1712 condition
, &session_name
);
1713 if (status
!= LTTNG_CONDITION_STATUS_OK
) {
1714 ERR("[notification-thread] Failed to bind trigger to session: unable to get 'session_rotation' condition's session name");
1725 trigger_list
= get_session_trigger_list(state
, session_name
);
1726 if (!trigger_list
) {
1727 DBG("[notification-thread] Unable to bind trigger applying to session \"%s\" as it is not yet known to the notification system",
1733 DBG("[notification-thread] Newly registered trigger bound to session \"%s\"",
1735 ret
= lttng_session_trigger_list_add(trigger_list
, trigger
);
1740 /* Must be called with RCU read lock held. */
1742 int bind_trigger_to_matching_channels(const struct lttng_trigger
*trigger
,
1743 struct notification_thread_state
*state
)
1746 struct cds_lfht_node
*node
;
1747 struct cds_lfht_iter iter
;
1748 struct channel_info
*channel
;
1750 cds_lfht_for_each_entry(state
->channels_ht
, &iter
, channel
,
1752 struct lttng_trigger_list_element
*trigger_list_element
;
1753 struct lttng_channel_trigger_list
*trigger_list
;
1755 if (!trigger_applies_to_channel(trigger
, channel
)) {
1759 cds_lfht_lookup(state
->channel_triggers_ht
,
1760 hash_channel_key(&channel
->key
),
1761 match_channel_trigger_list
,
1764 node
= cds_lfht_iter_get_node(&iter
);
1766 trigger_list
= caa_container_of(node
,
1767 struct lttng_channel_trigger_list
,
1768 channel_triggers_ht_node
);
1770 trigger_list_element
= zmalloc(sizeof(*trigger_list_element
));
1771 if (!trigger_list_element
) {
1775 CDS_INIT_LIST_HEAD(&trigger_list_element
->node
);
1776 trigger_list_element
->trigger
= trigger
;
1777 cds_list_add(&trigger_list_element
->node
, &trigger_list
->list
);
1778 DBG("[notification-thread] Newly registered trigger bound to channel \"%s\"",
1786 * FIXME A client's credentials are not checked when registering a trigger, nor
1787 * are they stored alongside with the trigger.
1789 * The effects of this are benign since:
1790 * - The client will succeed in registering the trigger, as it is valid,
1791 * - The trigger will, internally, be bound to the channel/session,
1792 * - The notifications will not be sent since the client's credentials
1793 * are checked against the channel at that moment.
1795 * If this function returns a non-zero value, it means something is
1796 * fundamentally broken and the whole subsystem/thread will be torn down.
1798 * If a non-fatal error occurs, just set the cmd_result to the appropriate
1802 int handle_notification_thread_command_register_trigger(
1803 struct notification_thread_state
*state
,
1804 struct lttng_trigger
*trigger
,
1805 enum lttng_error_code
*cmd_result
)
1808 struct lttng_condition
*condition
;
1809 struct notification_client
*client
;
1810 struct notification_client_list
*client_list
= NULL
;
1811 struct lttng_trigger_ht_element
*trigger_ht_element
= NULL
;
1812 struct notification_client_list_element
*client_list_element
, *tmp
;
1813 struct cds_lfht_node
*node
;
1814 struct cds_lfht_iter iter
;
1815 bool free_trigger
= true;
1819 condition
= lttng_trigger_get_condition(trigger
);
1822 ret
= condition_is_supported(condition
);
1825 } else if (ret
== 0) {
1826 *cmd_result
= LTTNG_ERR_NOT_SUPPORTED
;
1829 /* Feature is supported, continue. */
1833 trigger_ht_element
= zmalloc(sizeof(*trigger_ht_element
));
1834 if (!trigger_ht_element
) {
1839 /* Add trigger to the trigger_ht. */
1840 cds_lfht_node_init(&trigger_ht_element
->node
);
1841 trigger_ht_element
->trigger
= trigger
;
1843 node
= cds_lfht_add_unique(state
->triggers_ht
,
1844 lttng_condition_hash(condition
),
1847 &trigger_ht_element
->node
);
1848 if (node
!= &trigger_ht_element
->node
) {
1849 /* Not a fatal error, simply report it to the client. */
1850 *cmd_result
= LTTNG_ERR_TRIGGER_EXISTS
;
1851 goto error_free_ht_element
;
1855 * Ownership of the trigger and of its wrapper was transfered to
1858 trigger_ht_element
= NULL
;
1859 free_trigger
= false;
1862 * The rest only applies to triggers that have a "notify" action.
1863 * It is not skipped as this is the only action type currently
1866 client_list
= zmalloc(sizeof(*client_list
));
1869 goto error_free_ht_element
;
1871 cds_lfht_node_init(&client_list
->notification_trigger_ht_node
);
1872 CDS_INIT_LIST_HEAD(&client_list
->list
);
1873 client_list
->trigger
= trigger
;
1875 /* Build a list of clients to which this new trigger applies. */
1876 cds_lfht_for_each_entry(state
->client_socket_ht
, &iter
, client
,
1877 client_socket_ht_node
) {
1878 if (!trigger_applies_to_client(trigger
, client
)) {
1882 client_list_element
= zmalloc(sizeof(*client_list_element
));
1883 if (!client_list_element
) {
1885 goto error_free_client_list
;
1887 CDS_INIT_LIST_HEAD(&client_list_element
->node
);
1888 client_list_element
->client
= client
;
1889 cds_list_add(&client_list_element
->node
, &client_list
->list
);
1892 cds_lfht_add(state
->notification_trigger_clients_ht
,
1893 lttng_condition_hash(condition
),
1894 &client_list
->notification_trigger_ht_node
);
1896 switch (get_trigger_binding_object(trigger
)) {
1897 case LTTNG_OBJECT_TYPE_SESSION
:
1898 /* Add the trigger to the list if it matches a known session. */
1899 ret
= bind_trigger_to_matching_session(trigger
, state
);
1901 goto error_free_client_list
;
1903 case LTTNG_OBJECT_TYPE_CHANNEL
:
1905 * Add the trigger to list of triggers bound to the channels
1908 ret
= bind_trigger_to_matching_channels(trigger
, state
);
1910 goto error_free_client_list
;
1913 case LTTNG_OBJECT_TYPE_NONE
:
1916 ERR("[notification-thread] Unknown object type on which to bind a newly registered trigger was encountered");
1918 goto error_free_client_list
;
1922 * Since there is nothing preventing clients from subscribing to a
1923 * condition before the corresponding trigger is registered, we have
1924 * to evaluate this new condition right away.
1926 * At some point, we were waiting for the next "evaluation" (e.g. on
1927 * reception of a channel sample) to evaluate this new condition, but
1930 * The reason it was broken is that waiting for the next sample
1931 * does not allow us to properly handle transitions for edge-triggered
1934 * Consider this example: when we handle a new channel sample, we
1935 * evaluate each conditions twice: once with the previous state, and
1936 * again with the newest state. We then use those two results to
1937 * determine whether a state change happened: a condition was false and
1938 * became true. If a state change happened, we have to notify clients.
1940 * Now, if a client subscribes to a given notification and registers
1941 * a trigger *after* that subscription, we have to make sure the
1942 * condition is evaluated at this point while considering only the
1943 * current state. Otherwise, the next evaluation cycle may only see
1944 * that the evaluations remain the same (true for samples n-1 and n) and
1945 * the client will never know that the condition has been met.
1947 cds_list_for_each_entry_safe(client_list_element
, tmp
,
1948 &client_list
->list
, node
) {
1949 ret
= evaluate_condition_for_client(trigger
, condition
,
1950 client_list_element
->client
, state
);
1952 goto error_free_client_list
;
1957 * Client list ownership transferred to the
1958 * notification_trigger_clients_ht.
1962 *cmd_result
= LTTNG_OK
;
1963 error_free_client_list
:
1965 cds_list_for_each_entry_safe(client_list_element
, tmp
,
1966 &client_list
->list
, node
) {
1967 free(client_list_element
);
1971 error_free_ht_element
:
1972 free(trigger_ht_element
);
1975 struct lttng_action
*action
= lttng_trigger_get_action(trigger
);
1977 lttng_condition_destroy(condition
);
1978 lttng_action_destroy(action
);
1979 lttng_trigger_destroy(trigger
);
1986 int handle_notification_thread_command_unregister_trigger(
1987 struct notification_thread_state
*state
,
1988 struct lttng_trigger
*trigger
,
1989 enum lttng_error_code
*_cmd_reply
)
1991 struct cds_lfht_iter iter
;
1992 struct cds_lfht_node
*triggers_ht_node
;
1993 struct lttng_channel_trigger_list
*trigger_list
;
1994 struct notification_client_list
*client_list
;
1995 struct notification_client_list_element
*client_list_element
, *tmp
;
1996 struct lttng_trigger_ht_element
*trigger_ht_element
= NULL
;
1997 struct lttng_condition
*condition
= lttng_trigger_get_condition(
1999 struct lttng_action
*action
;
2000 enum lttng_error_code cmd_reply
;
2004 cds_lfht_lookup(state
->triggers_ht
,
2005 lttng_condition_hash(condition
),
2009 triggers_ht_node
= cds_lfht_iter_get_node(&iter
);
2010 if (!triggers_ht_node
) {
2011 cmd_reply
= LTTNG_ERR_TRIGGER_NOT_FOUND
;
2014 cmd_reply
= LTTNG_OK
;
2017 /* Remove trigger from channel_triggers_ht. */
2018 cds_lfht_for_each_entry(state
->channel_triggers_ht
, &iter
, trigger_list
,
2019 channel_triggers_ht_node
) {
2020 struct lttng_trigger_list_element
*trigger_element
, *tmp
;
2022 cds_list_for_each_entry_safe(trigger_element
, tmp
,
2023 &trigger_list
->list
, node
) {
2024 const struct lttng_condition
*current_condition
=
2025 lttng_trigger_get_const_condition(
2026 trigger_element
->trigger
);
2028 assert(current_condition
);
2029 if (!lttng_condition_is_equal(condition
,
2030 current_condition
)) {
2034 DBG("[notification-thread] Removed trigger from channel_triggers_ht");
2035 cds_list_del(&trigger_element
->node
);
2036 /* A trigger can only appear once per channel */
2042 * Remove and release the client list from
2043 * notification_trigger_clients_ht.
2045 client_list
= get_client_list_from_condition(state
, condition
);
2046 assert(client_list
);
2048 cds_list_for_each_entry_safe(client_list_element
, tmp
,
2049 &client_list
->list
, node
) {
2050 free(client_list_element
);
2052 cds_lfht_del(state
->notification_trigger_clients_ht
,
2053 &client_list
->notification_trigger_ht_node
);
2056 /* Remove trigger from triggers_ht. */
2057 trigger_ht_element
= caa_container_of(triggers_ht_node
,
2058 struct lttng_trigger_ht_element
, node
);
2059 cds_lfht_del(state
->triggers_ht
, triggers_ht_node
);
2061 condition
= lttng_trigger_get_condition(trigger_ht_element
->trigger
);
2062 lttng_condition_destroy(condition
);
2063 action
= lttng_trigger_get_action(trigger_ht_element
->trigger
);
2064 lttng_action_destroy(action
);
2065 lttng_trigger_destroy(trigger_ht_element
->trigger
);
2066 free(trigger_ht_element
);
2070 *_cmd_reply
= cmd_reply
;
2075 /* Returns 0 on success, 1 on exit requested, negative value on error. */
2076 int handle_notification_thread_command(
2077 struct notification_thread_handle
*handle
,
2078 struct notification_thread_state
*state
)
2082 struct notification_thread_command
*cmd
;
2084 /* Read the event pipe to put it back into a quiescent state. */
2085 ret
= read(lttng_pipe_get_readfd(handle
->cmd_queue
.event_pipe
), &counter
,
2091 pthread_mutex_lock(&handle
->cmd_queue
.lock
);
2092 cmd
= cds_list_first_entry(&handle
->cmd_queue
.list
,
2093 struct notification_thread_command
, cmd_list_node
);
2094 switch (cmd
->type
) {
2095 case NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER
:
2096 DBG("[notification-thread] Received register trigger command");
2097 ret
= handle_notification_thread_command_register_trigger(
2098 state
, cmd
->parameters
.trigger
,
2101 case NOTIFICATION_COMMAND_TYPE_UNREGISTER_TRIGGER
:
2102 DBG("[notification-thread] Received unregister trigger command");
2103 ret
= handle_notification_thread_command_unregister_trigger(
2104 state
, cmd
->parameters
.trigger
,
2107 case NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL
:
2108 DBG("[notification-thread] Received add channel command");
2109 ret
= handle_notification_thread_command_add_channel(
2111 cmd
->parameters
.add_channel
.session
.name
,
2112 cmd
->parameters
.add_channel
.session
.uid
,
2113 cmd
->parameters
.add_channel
.session
.gid
,
2114 cmd
->parameters
.add_channel
.channel
.name
,
2115 cmd
->parameters
.add_channel
.channel
.domain
,
2116 cmd
->parameters
.add_channel
.channel
.key
,
2117 cmd
->parameters
.add_channel
.channel
.capacity
,
2120 case NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL
:
2121 DBG("[notification-thread] Received remove channel command");
2122 ret
= handle_notification_thread_command_remove_channel(
2123 state
, cmd
->parameters
.remove_channel
.key
,
2124 cmd
->parameters
.remove_channel
.domain
,
2127 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING
:
2128 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED
:
2129 DBG("[notification-thread] Received session rotation %s command",
2130 cmd
->type
== NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING
?
2131 "ongoing" : "completed");
2132 ret
= handle_notification_thread_command_session_rotation(
2135 cmd
->parameters
.session_rotation
.session_name
,
2136 cmd
->parameters
.session_rotation
.uid
,
2137 cmd
->parameters
.session_rotation
.gid
,
2138 cmd
->parameters
.session_rotation
.trace_archive_chunk_id
,
2139 cmd
->parameters
.session_rotation
.location
,
2142 case NOTIFICATION_COMMAND_TYPE_QUIT
:
2143 DBG("[notification-thread] Received quit command");
2144 cmd
->reply_code
= LTTNG_OK
;
2148 ERR("[notification-thread] Unknown internal command received");
2156 cds_list_del(&cmd
->cmd_list_node
);
2157 lttng_waiter_wake_up(&cmd
->reply_waiter
);
2158 pthread_mutex_unlock(&handle
->cmd_queue
.lock
);
2161 /* Wake-up and return a fatal error to the calling thread. */
2162 lttng_waiter_wake_up(&cmd
->reply_waiter
);
2163 pthread_mutex_unlock(&handle
->cmd_queue
.lock
);
2164 cmd
->reply_code
= LTTNG_ERR_FATAL
;
2166 /* Indicate a fatal error to the caller. */
2171 unsigned long hash_client_socket(int socket
)
2173 return hash_key_ulong((void *) (unsigned long) socket
, lttng_ht_seed
);
2177 int socket_set_non_blocking(int socket
)
2181 /* Set the pipe as non-blocking. */
2182 ret
= fcntl(socket
, F_GETFL
, 0);
2184 PERROR("fcntl get socket flags");
2189 ret
= fcntl(socket
, F_SETFL
, flags
| O_NONBLOCK
);
2191 PERROR("fcntl set O_NONBLOCK socket flag");
2194 DBG("Client socket (fd = %i) set as non-blocking", socket
);
2200 int client_reset_inbound_state(struct notification_client
*client
)
2204 ret
= lttng_dynamic_buffer_set_size(
2205 &client
->communication
.inbound
.buffer
, 0);
2208 client
->communication
.inbound
.bytes_to_receive
=
2209 sizeof(struct lttng_notification_channel_message
);
2210 client
->communication
.inbound
.msg_type
=
2211 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN
;
2212 LTTNG_SOCK_SET_UID_CRED(&client
->communication
.inbound
.creds
, -1);
2213 LTTNG_SOCK_SET_GID_CRED(&client
->communication
.inbound
.creds
, -1);
2214 ret
= lttng_dynamic_buffer_set_size(
2215 &client
->communication
.inbound
.buffer
,
2216 client
->communication
.inbound
.bytes_to_receive
);
2220 int handle_notification_thread_client_connect(
2221 struct notification_thread_state
*state
)
2224 struct notification_client
*client
;
2226 DBG("[notification-thread] Handling new notification channel client connection");
2228 client
= zmalloc(sizeof(*client
));
2234 CDS_INIT_LIST_HEAD(&client
->condition_list
);
2235 lttng_dynamic_buffer_init(&client
->communication
.inbound
.buffer
);
2236 lttng_dynamic_buffer_init(&client
->communication
.outbound
.buffer
);
2237 client
->communication
.inbound
.expect_creds
= true;
2238 ret
= client_reset_inbound_state(client
);
2240 ERR("[notification-thread] Failed to reset client communication's inbound state");
2245 ret
= lttcomm_accept_unix_sock(state
->notification_channel_socket
);
2247 ERR("[notification-thread] Failed to accept new notification channel client connection");
2252 client
->socket
= ret
;
2254 ret
= socket_set_non_blocking(client
->socket
);
2256 ERR("[notification-thread] Failed to set new notification channel client connection socket as non-blocking");
2260 ret
= lttcomm_setsockopt_creds_unix_sock(client
->socket
);
2262 ERR("[notification-thread] Failed to set socket options on new notification channel client socket");
2267 ret
= lttng_poll_add(&state
->events
, client
->socket
,
2268 LPOLLIN
| LPOLLERR
|
2269 LPOLLHUP
| LPOLLRDHUP
);
2271 ERR("[notification-thread] Failed to add notification channel client socket to poll set");
2275 DBG("[notification-thread] Added new notification channel client socket (%i) to poll set",
2279 cds_lfht_add(state
->client_socket_ht
,
2280 hash_client_socket(client
->socket
),
2281 &client
->client_socket_ht_node
);
2286 notification_client_destroy(client
, state
);
2290 int handle_notification_thread_client_disconnect(
2292 struct notification_thread_state
*state
)
2295 struct notification_client
*client
;
2298 DBG("[notification-thread] Closing client connection (socket fd = %i)",
2300 client
= get_client_from_socket(client_socket
, state
);
2302 /* Internal state corruption, fatal error. */
2303 ERR("[notification-thread] Unable to find client (socket fd = %i)",
2309 ret
= lttng_poll_del(&state
->events
, client_socket
);
2311 ERR("[notification-thread] Failed to remove client socket from poll set");
2313 cds_lfht_del(state
->client_socket_ht
,
2314 &client
->client_socket_ht_node
);
2315 notification_client_destroy(client
, state
);
2321 int handle_notification_thread_client_disconnect_all(
2322 struct notification_thread_state
*state
)
2324 struct cds_lfht_iter iter
;
2325 struct notification_client
*client
;
2326 bool error_encoutered
= false;
2329 DBG("[notification-thread] Closing all client connections");
2330 cds_lfht_for_each_entry(state
->client_socket_ht
, &iter
, client
,
2331 client_socket_ht_node
) {
2334 ret
= handle_notification_thread_client_disconnect(
2335 client
->socket
, state
);
2337 error_encoutered
= true;
2341 return error_encoutered
? 1 : 0;
2344 int handle_notification_thread_trigger_unregister_all(
2345 struct notification_thread_state
*state
)
2347 bool error_occurred
= false;
2348 struct cds_lfht_iter iter
;
2349 struct lttng_trigger_ht_element
*trigger_ht_element
;
2351 cds_lfht_for_each_entry(state
->triggers_ht
, &iter
, trigger_ht_element
,
2353 int ret
= handle_notification_thread_command_unregister_trigger(
2354 state
, trigger_ht_element
->trigger
, NULL
);
2356 error_occurred
= true;
2359 return error_occurred
? -1 : 0;
2363 int client_flush_outgoing_queue(struct notification_client
*client
,
2364 struct notification_thread_state
*state
)
2367 size_t to_send_count
;
2369 assert(client
->communication
.outbound
.buffer
.size
!= 0);
2370 to_send_count
= client
->communication
.outbound
.buffer
.size
;
2371 DBG("[notification-thread] Flushing client (socket fd = %i) outgoing queue",
2374 ret
= lttcomm_send_unix_sock_non_block(client
->socket
,
2375 client
->communication
.outbound
.buffer
.data
,
2377 if ((ret
< 0 && (errno
== EAGAIN
|| errno
== EWOULDBLOCK
)) ||
2378 (ret
> 0 && ret
< to_send_count
)) {
2379 DBG("[notification-thread] Client (socket fd = %i) outgoing queue could not be completely flushed",
2381 to_send_count
-= max(ret
, 0);
2383 memcpy(client
->communication
.outbound
.buffer
.data
,
2384 client
->communication
.outbound
.buffer
.data
+
2385 client
->communication
.outbound
.buffer
.size
- to_send_count
,
2387 ret
= lttng_dynamic_buffer_set_size(
2388 &client
->communication
.outbound
.buffer
,
2395 * We want to be notified whenever there is buffer space
2396 * available to send the rest of the payload.
2398 ret
= lttng_poll_mod(&state
->events
, client
->socket
,
2399 CLIENT_POLL_MASK_IN_OUT
);
2403 } else if (ret
< 0) {
2404 /* Generic error, disconnect the client. */
2405 ERR("[notification-thread] Failed to send flush outgoing queue, disconnecting client (socket fd = %i)",
2407 ret
= handle_notification_thread_client_disconnect(
2408 client
->socket
, state
);
2413 /* No error and flushed the queue completely. */
2414 ret
= lttng_dynamic_buffer_set_size(
2415 &client
->communication
.outbound
.buffer
, 0);
2419 ret
= lttng_poll_mod(&state
->events
, client
->socket
,
2420 CLIENT_POLL_MASK_IN
);
2425 client
->communication
.outbound
.queued_command_reply
= false;
2426 client
->communication
.outbound
.dropped_notification
= false;
2435 int client_send_command_reply(struct notification_client
*client
,
2436 struct notification_thread_state
*state
,
2437 enum lttng_notification_channel_status status
)
2440 struct lttng_notification_channel_command_reply reply
= {
2441 .status
= (int8_t) status
,
2443 struct lttng_notification_channel_message msg
= {
2444 .type
= (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY
,
2445 .size
= sizeof(reply
),
2447 char buffer
[sizeof(msg
) + sizeof(reply
)];
2449 if (client
->communication
.outbound
.queued_command_reply
) {
2450 /* Protocol error. */
2454 memcpy(buffer
, &msg
, sizeof(msg
));
2455 memcpy(buffer
+ sizeof(msg
), &reply
, sizeof(reply
));
2456 DBG("[notification-thread] Send command reply (%i)", (int) status
);
2458 /* Enqueue buffer to outgoing queue and flush it. */
2459 ret
= lttng_dynamic_buffer_append(
2460 &client
->communication
.outbound
.buffer
,
2461 buffer
, sizeof(buffer
));
2466 ret
= client_flush_outgoing_queue(client
, state
);
2471 if (client
->communication
.outbound
.buffer
.size
!= 0) {
2472 /* Queue could not be emptied. */
2473 client
->communication
.outbound
.queued_command_reply
= true;
2482 int client_dispatch_message(struct notification_client
*client
,
2483 struct notification_thread_state
*state
)
2487 if (client
->communication
.inbound
.msg_type
!=
2488 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE
&&
2489 client
->communication
.inbound
.msg_type
!=
2490 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN
&&
2491 !client
->validated
) {
2492 WARN("[notification-thread] client attempted a command before handshake");
2497 switch (client
->communication
.inbound
.msg_type
) {
2498 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN
:
2501 * Receiving message header. The function will be called again
2502 * once the rest of the message as been received and can be
2505 const struct lttng_notification_channel_message
*msg
;
2507 assert(sizeof(*msg
) ==
2508 client
->communication
.inbound
.buffer
.size
);
2509 msg
= (const struct lttng_notification_channel_message
*)
2510 client
->communication
.inbound
.buffer
.data
;
2512 if (msg
->size
== 0 || msg
->size
> DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE
) {
2513 ERR("[notification-thread] Invalid notification channel message: length = %u", msg
->size
);
2518 switch (msg
->type
) {
2519 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE
:
2520 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE
:
2521 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE
:
2525 ERR("[notification-thread] Invalid notification channel message: unexpected message type");
2529 client
->communication
.inbound
.bytes_to_receive
= msg
->size
;
2530 client
->communication
.inbound
.msg_type
=
2531 (enum lttng_notification_channel_message_type
) msg
->type
;
2532 ret
= lttng_dynamic_buffer_set_size(
2533 &client
->communication
.inbound
.buffer
, msg
->size
);
2539 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE
:
2541 struct lttng_notification_channel_command_handshake
*handshake_client
;
2542 struct lttng_notification_channel_command_handshake handshake_reply
= {
2543 .major
= LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR
,
2544 .minor
= LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR
,
2546 struct lttng_notification_channel_message msg_header
= {
2547 .type
= LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE
,
2548 .size
= sizeof(handshake_reply
),
2550 enum lttng_notification_channel_status status
=
2551 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
2552 char send_buffer
[sizeof(msg_header
) + sizeof(handshake_reply
)];
2554 memcpy(send_buffer
, &msg_header
, sizeof(msg_header
));
2555 memcpy(send_buffer
+ sizeof(msg_header
), &handshake_reply
,
2556 sizeof(handshake_reply
));
2559 (struct lttng_notification_channel_command_handshake
*)
2560 client
->communication
.inbound
.buffer
.data
;
2561 client
->major
= handshake_client
->major
;
2562 client
->minor
= handshake_client
->minor
;
2563 if (!client
->communication
.inbound
.creds_received
) {
2564 ERR("[notification-thread] No credentials received from client");
2569 client
->uid
= LTTNG_SOCK_GET_UID_CRED(
2570 &client
->communication
.inbound
.creds
);
2571 client
->gid
= LTTNG_SOCK_GET_GID_CRED(
2572 &client
->communication
.inbound
.creds
);
2573 DBG("[notification-thread] Received handshake from client (uid = %u, gid = %u) with version %i.%i",
2574 client
->uid
, client
->gid
, (int) client
->major
,
2575 (int) client
->minor
);
2577 if (handshake_client
->major
!= LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR
) {
2578 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_UNSUPPORTED_VERSION
;
2581 ret
= lttng_dynamic_buffer_append(&client
->communication
.outbound
.buffer
,
2582 send_buffer
, sizeof(send_buffer
));
2584 ERR("[notification-thread] Failed to send protocol version to notification channel client");
2588 ret
= client_flush_outgoing_queue(client
, state
);
2593 ret
= client_send_command_reply(client
, state
, status
);
2595 ERR("[notification-thread] Failed to send reply to notification channel client");
2599 /* Set reception state to receive the next message header. */
2600 ret
= client_reset_inbound_state(client
);
2602 ERR("[notification-thread] Failed to reset client communication's inbound state");
2605 client
->validated
= true;
2608 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE
:
2609 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE
:
2611 struct lttng_condition
*condition
;
2612 enum lttng_notification_channel_status status
=
2613 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
2614 const struct lttng_buffer_view condition_view
=
2615 lttng_buffer_view_from_dynamic_buffer(
2616 &client
->communication
.inbound
.buffer
,
2618 size_t expected_condition_size
=
2619 client
->communication
.inbound
.buffer
.size
;
2621 ret
= lttng_condition_create_from_buffer(&condition_view
,
2623 if (ret
!= expected_condition_size
) {
2624 ERR("[notification-thread] Malformed condition received from client");
2628 if (client
->communication
.inbound
.msg_type
==
2629 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE
) {
2630 ret
= notification_thread_client_subscribe(client
,
2631 condition
, state
, &status
);
2633 ret
= notification_thread_client_unsubscribe(client
,
2634 condition
, state
, &status
);
2640 ret
= client_send_command_reply(client
, state
, status
);
2642 ERR("[notification-thread] Failed to send reply to notification channel client");
2646 /* Set reception state to receive the next message header. */
2647 ret
= client_reset_inbound_state(client
);
2649 ERR("[notification-thread] Failed to reset client communication's inbound state");
2661 /* Incoming data from client. */
2662 int handle_notification_thread_client_in(
2663 struct notification_thread_state
*state
, int socket
)
2666 struct notification_client
*client
;
2670 client
= get_client_from_socket(socket
, state
);
2672 /* Internal error, abort. */
2677 offset
= client
->communication
.inbound
.buffer
.size
-
2678 client
->communication
.inbound
.bytes_to_receive
;
2679 if (client
->communication
.inbound
.expect_creds
) {
2680 recv_ret
= lttcomm_recv_creds_unix_sock(socket
,
2681 client
->communication
.inbound
.buffer
.data
+ offset
,
2682 client
->communication
.inbound
.bytes_to_receive
,
2683 &client
->communication
.inbound
.creds
);
2685 client
->communication
.inbound
.expect_creds
= false;
2686 client
->communication
.inbound
.creds_received
= true;
2689 recv_ret
= lttcomm_recv_unix_sock_non_block(socket
,
2690 client
->communication
.inbound
.buffer
.data
+ offset
,
2691 client
->communication
.inbound
.bytes_to_receive
);
2694 goto error_disconnect_client
;
2697 client
->communication
.inbound
.bytes_to_receive
-= recv_ret
;
2698 if (client
->communication
.inbound
.bytes_to_receive
== 0) {
2699 ret
= client_dispatch_message(client
, state
);
2702 * Only returns an error if this client must be
2705 goto error_disconnect_client
;
2712 error_disconnect_client
:
2713 ret
= handle_notification_thread_client_disconnect(socket
, state
);
2717 /* Client ready to receive outgoing data. */
2718 int handle_notification_thread_client_out(
2719 struct notification_thread_state
*state
, int socket
)
2722 struct notification_client
*client
;
2724 client
= get_client_from_socket(socket
, state
);
2726 /* Internal error, abort. */
2731 ret
= client_flush_outgoing_queue(client
, state
);
2740 bool evaluate_buffer_usage_condition(const struct lttng_condition
*condition
,
2741 const struct channel_state_sample
*sample
,
2742 uint64_t buffer_capacity
)
2744 bool result
= false;
2746 enum lttng_condition_type condition_type
;
2747 const struct lttng_condition_buffer_usage
*use_condition
= container_of(
2748 condition
, struct lttng_condition_buffer_usage
,
2751 if (use_condition
->threshold_bytes
.set
) {
2752 threshold
= use_condition
->threshold_bytes
.value
;
2755 * Threshold was expressed as a ratio.
2757 * TODO the threshold (in bytes) of conditions expressed
2758 * as a ratio of total buffer size could be cached to
2759 * forego this double-multiplication or it could be performed
2760 * as fixed-point math.
2762 * Note that caching should accomodate the case where the
2763 * condition applies to multiple channels (i.e. don't assume
2764 * that all channels matching my_chann* have the same size...)
2766 threshold
= (uint64_t) (use_condition
->threshold_ratio
.value
*
2767 (double) buffer_capacity
);
2770 condition_type
= lttng_condition_get_type(condition
);
2771 if (condition_type
== LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW
) {
2772 DBG("[notification-thread] Low buffer usage condition being evaluated: threshold = %" PRIu64
", highest usage = %" PRIu64
,
2773 threshold
, sample
->highest_usage
);
2776 * The low condition should only be triggered once _all_ of the
2777 * streams in a channel have gone below the "low" threshold.
2779 if (sample
->highest_usage
<= threshold
) {
2783 DBG("[notification-thread] High buffer usage condition being evaluated: threshold = %" PRIu64
", highest usage = %" PRIu64
,
2784 threshold
, sample
->highest_usage
);
2787 * For high buffer usage scenarios, we want to trigger whenever
2788 * _any_ of the streams has reached the "high" threshold.
2790 if (sample
->highest_usage
>= threshold
) {
2799 bool evaluate_session_consumed_size_condition(
2800 const struct lttng_condition
*condition
,
2801 uint64_t session_consumed_size
)
2804 const struct lttng_condition_session_consumed_size
*size_condition
=
2805 container_of(condition
,
2806 struct lttng_condition_session_consumed_size
,
2809 threshold
= size_condition
->consumed_threshold_bytes
.value
;
2810 DBG("[notification-thread] Session consumed size condition being evaluated: threshold = %" PRIu64
", current size = %" PRIu64
,
2811 threshold
, session_consumed_size
);
2812 return session_consumed_size
>= threshold
;
2816 int evaluate_buffer_condition(const struct lttng_condition
*condition
,
2817 struct lttng_evaluation
**evaluation
,
2818 const struct notification_thread_state
*state
,
2819 const struct channel_state_sample
*previous_sample
,
2820 const struct channel_state_sample
*latest_sample
,
2821 uint64_t previous_session_consumed_total
,
2822 uint64_t latest_session_consumed_total
,
2823 struct channel_info
*channel_info
)
2826 enum lttng_condition_type condition_type
;
2827 const bool previous_sample_available
= !!previous_sample
;
2828 bool previous_sample_result
= false;
2829 bool latest_sample_result
;
2831 condition_type
= lttng_condition_get_type(condition
);
2833 switch (condition_type
) {
2834 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW
:
2835 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH
:
2836 if (caa_likely(previous_sample_available
)) {
2837 previous_sample_result
=
2838 evaluate_buffer_usage_condition(condition
,
2839 previous_sample
, channel_info
->capacity
);
2841 latest_sample_result
= evaluate_buffer_usage_condition(
2842 condition
, latest_sample
,
2843 channel_info
->capacity
);
2845 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE
:
2846 if (caa_likely(previous_sample_available
)) {
2847 previous_sample_result
=
2848 evaluate_session_consumed_size_condition(
2850 previous_session_consumed_total
);
2852 latest_sample_result
=
2853 evaluate_session_consumed_size_condition(
2855 latest_session_consumed_total
);
2858 /* Unknown condition type; internal error. */
2862 if (!latest_sample_result
||
2863 (previous_sample_result
== latest_sample_result
)) {
2865 * Only trigger on a condition evaluation transition.
2867 * NOTE: This edge-triggered logic may not be appropriate for
2868 * future condition types.
2873 if (!evaluation
|| !latest_sample_result
) {
2877 switch (condition_type
) {
2878 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW
:
2879 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH
:
2880 *evaluation
= lttng_evaluation_buffer_usage_create(
2882 latest_sample
->highest_usage
,
2883 channel_info
->capacity
);
2885 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE
:
2886 *evaluation
= lttng_evaluation_session_consumed_size_create(
2887 latest_session_consumed_total
);
2902 int client_enqueue_dropped_notification(struct notification_client
*client
,
2903 struct notification_thread_state
*state
)
2906 struct lttng_notification_channel_message msg
= {
2907 .type
= (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED
,
2911 ret
= lttng_dynamic_buffer_append(
2912 &client
->communication
.outbound
.buffer
, &msg
,
2918 int send_evaluation_to_clients(const struct lttng_trigger
*trigger
,
2919 const struct lttng_evaluation
*evaluation
,
2920 struct notification_client_list
* client_list
,
2921 struct notification_thread_state
*state
,
2922 uid_t channel_uid
, gid_t channel_gid
)
2925 struct lttng_dynamic_buffer msg_buffer
;
2926 struct notification_client_list_element
*client_list_element
, *tmp
;
2927 const struct lttng_notification notification
= {
2928 .condition
= (struct lttng_condition
*) lttng_trigger_get_const_condition(trigger
),
2929 .evaluation
= (struct lttng_evaluation
*) evaluation
,
2931 struct lttng_notification_channel_message msg_header
= {
2932 .type
= (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION
,
2935 lttng_dynamic_buffer_init(&msg_buffer
);
2937 ret
= lttng_dynamic_buffer_append(&msg_buffer
, &msg_header
,
2938 sizeof(msg_header
));
2943 ret
= lttng_notification_serialize(¬ification
, &msg_buffer
);
2945 ERR("[notification-thread] Failed to serialize notification");
2950 /* Update payload size. */
2951 ((struct lttng_notification_channel_message
* ) msg_buffer
.data
)->size
=
2952 (uint32_t) (msg_buffer
.size
- sizeof(msg_header
));
2954 cds_list_for_each_entry_safe(client_list_element
, tmp
,
2955 &client_list
->list
, node
) {
2956 struct notification_client
*client
=
2957 client_list_element
->client
;
2959 if (client
->uid
!= channel_uid
&& client
->gid
!= channel_gid
&&
2961 /* Client is not allowed to monitor this channel. */
2962 DBG("[notification-thread] Skipping client at it does not have the permission to receive notification for this channel");
2966 DBG("[notification-thread] Sending notification to client (fd = %i, %zu bytes)",
2967 client
->socket
, msg_buffer
.size
);
2968 if (client
->communication
.outbound
.buffer
.size
) {
2970 * Outgoing data is already buffered for this client;
2971 * drop the notification and enqueue a "dropped
2972 * notification" message if this is the first dropped
2973 * notification since the socket spilled-over to the
2976 DBG("[notification-thread] Dropping notification addressed to client (socket fd = %i)",
2978 if (!client
->communication
.outbound
.dropped_notification
) {
2979 client
->communication
.outbound
.dropped_notification
= true;
2980 ret
= client_enqueue_dropped_notification(
2989 ret
= lttng_dynamic_buffer_append_buffer(
2990 &client
->communication
.outbound
.buffer
,
2996 ret
= client_flush_outgoing_queue(client
, state
);
3003 lttng_dynamic_buffer_reset(&msg_buffer
);
3007 int handle_notification_thread_channel_sample(
3008 struct notification_thread_state
*state
, int pipe
,
3009 enum lttng_domain_type domain
)
3012 struct lttcomm_consumer_channel_monitor_msg sample_msg
;
3013 struct channel_info
*channel_info
;
3014 struct cds_lfht_node
*node
;
3015 struct cds_lfht_iter iter
;
3016 struct lttng_channel_trigger_list
*trigger_list
;
3017 struct lttng_trigger_list_element
*trigger_list_element
;
3018 bool previous_sample_available
= false;
3019 struct channel_state_sample previous_sample
, latest_sample
;
3020 uint64_t previous_session_consumed_total
, latest_session_consumed_total
;
3023 * The monitoring pipe only holds messages smaller than PIPE_BUF,
3024 * ensuring that read/write of sampling messages are atomic.
3026 ret
= lttng_read(pipe
, &sample_msg
, sizeof(sample_msg
));
3027 if (ret
!= sizeof(sample_msg
)) {
3028 ERR("[notification-thread] Failed to read from monitoring pipe (fd = %i)",
3035 latest_sample
.key
.key
= sample_msg
.key
;
3036 latest_sample
.key
.domain
= domain
;
3037 latest_sample
.highest_usage
= sample_msg
.highest
;
3038 latest_sample
.lowest_usage
= sample_msg
.lowest
;
3039 latest_sample
.channel_total_consumed
= sample_msg
.total_consumed
;
3043 /* Retrieve the channel's informations */
3044 cds_lfht_lookup(state
->channels_ht
,
3045 hash_channel_key(&latest_sample
.key
),
3049 node
= cds_lfht_iter_get_node(&iter
);
3050 if (caa_unlikely(!node
)) {
3052 * Not an error since the consumer can push a sample to the pipe
3053 * and the rest of the session daemon could notify us of the
3054 * channel's destruction before we get a chance to process that
3057 DBG("[notification-thread] Received a sample for an unknown channel from consumerd, key = %" PRIu64
" in %s domain",
3058 latest_sample
.key
.key
,
3059 domain
== LTTNG_DOMAIN_KERNEL
? "kernel" :
3063 channel_info
= caa_container_of(node
, struct channel_info
,
3065 DBG("[notification-thread] Handling channel sample for channel %s (key = %" PRIu64
") in session %s (highest usage = %" PRIu64
", lowest usage = %" PRIu64
", total consumed = %" PRIu64
")",
3067 latest_sample
.key
.key
,
3068 channel_info
->session_info
->name
,
3069 latest_sample
.highest_usage
,
3070 latest_sample
.lowest_usage
,
3071 latest_sample
.channel_total_consumed
);
3073 previous_session_consumed_total
=
3074 channel_info
->session_info
->consumed_data_size
;
3076 /* Retrieve the channel's last sample, if it exists, and update it. */
3077 cds_lfht_lookup(state
->channel_state_ht
,
3078 hash_channel_key(&latest_sample
.key
),
3079 match_channel_state_sample
,
3082 node
= cds_lfht_iter_get_node(&iter
);
3083 if (caa_likely(node
)) {
3084 struct channel_state_sample
*stored_sample
;
3086 /* Update the sample stored. */
3087 stored_sample
= caa_container_of(node
,
3088 struct channel_state_sample
,
3089 channel_state_ht_node
);
3091 memcpy(&previous_sample
, stored_sample
,
3092 sizeof(previous_sample
));
3093 stored_sample
->highest_usage
= latest_sample
.highest_usage
;
3094 stored_sample
->lowest_usage
= latest_sample
.lowest_usage
;
3095 stored_sample
->channel_total_consumed
= latest_sample
.channel_total_consumed
;
3096 previous_sample_available
= true;
3098 latest_session_consumed_total
=
3099 previous_session_consumed_total
+
3100 (latest_sample
.channel_total_consumed
- previous_sample
.channel_total_consumed
);
3103 * This is the channel's first sample, allocate space for and
3104 * store the new sample.
3106 struct channel_state_sample
*stored_sample
;
3108 stored_sample
= zmalloc(sizeof(*stored_sample
));
3109 if (!stored_sample
) {
3114 memcpy(stored_sample
, &latest_sample
, sizeof(*stored_sample
));
3115 cds_lfht_node_init(&stored_sample
->channel_state_ht_node
);
3116 cds_lfht_add(state
->channel_state_ht
,
3117 hash_channel_key(&stored_sample
->key
),
3118 &stored_sample
->channel_state_ht_node
);
3120 latest_session_consumed_total
=
3121 previous_session_consumed_total
+
3122 latest_sample
.channel_total_consumed
;
3125 channel_info
->session_info
->consumed_data_size
=
3126 latest_session_consumed_total
;
3128 /* Find triggers associated with this channel. */
3129 cds_lfht_lookup(state
->channel_triggers_ht
,
3130 hash_channel_key(&latest_sample
.key
),
3131 match_channel_trigger_list
,
3134 node
= cds_lfht_iter_get_node(&iter
);
3135 if (caa_likely(!node
)) {
3139 trigger_list
= caa_container_of(node
, struct lttng_channel_trigger_list
,
3140 channel_triggers_ht_node
);
3141 cds_list_for_each_entry(trigger_list_element
, &trigger_list
->list
,
3143 const struct lttng_condition
*condition
;
3144 const struct lttng_action
*action
;
3145 const struct lttng_trigger
*trigger
;
3146 struct notification_client_list
*client_list
;
3147 struct lttng_evaluation
*evaluation
= NULL
;
3149 trigger
= trigger_list_element
->trigger
;
3150 condition
= lttng_trigger_get_const_condition(trigger
);
3152 action
= lttng_trigger_get_const_action(trigger
);
3154 /* Notify actions are the only type currently supported. */
3155 assert(lttng_action_get_type_const(action
) ==
3156 LTTNG_ACTION_TYPE_NOTIFY
);
3159 * Check if any client is subscribed to the result of this
3162 client_list
= get_client_list_from_condition(state
, condition
);
3163 assert(client_list
);
3164 if (cds_list_empty(&client_list
->list
)) {
3166 * No clients interested in the evaluation's result,
3172 ret
= evaluate_buffer_condition(condition
, &evaluation
, state
,
3173 previous_sample_available
? &previous_sample
: NULL
,
3175 previous_session_consumed_total
,
3176 latest_session_consumed_total
,
3178 if (caa_unlikely(ret
)) {
3182 if (caa_likely(!evaluation
)) {
3186 /* Dispatch evaluation result to all clients. */
3187 ret
= send_evaluation_to_clients(trigger_list_element
->trigger
,
3188 evaluation
, client_list
, state
,
3189 channel_info
->session_info
->uid
,
3190 channel_info
->session_info
->gid
);
3191 lttng_evaluation_destroy(evaluation
);
3192 if (caa_unlikely(ret
)) {