2 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
4 * SPDX-License-Identifier: GPL-2.0-only
10 #include <urcu/rculfhash.h>
12 #include <common/defaults.h>
13 #include <common/error.h>
14 #include <common/futex.h>
15 #include <common/unix.h>
16 #include <common/dynamic-buffer.h>
17 #include <common/hashtable/utils.h>
18 #include <common/sessiond-comm/sessiond-comm.h>
19 #include <common/macros.h>
20 #include <lttng/condition/condition.h>
21 #include <lttng/action/action-internal.h>
22 #include <lttng/notification/notification-internal.h>
23 #include <lttng/condition/condition-internal.h>
24 #include <lttng/condition/buffer-usage-internal.h>
25 #include <lttng/condition/session-consumed-size-internal.h>
26 #include <lttng/condition/session-rotation-internal.h>
27 #include <lttng/notification/channel-internal.h>
35 #include "notification-thread.h"
36 #include "notification-thread-events.h"
37 #include "notification-thread-commands.h"
38 #include "lttng-sessiond.h"
41 #define CLIENT_POLL_MASK_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
42 #define CLIENT_POLL_MASK_IN_OUT (CLIENT_POLL_MASK_IN | LPOLLOUT)
44 enum lttng_object_type
{
45 LTTNG_OBJECT_TYPE_UNKNOWN
,
46 LTTNG_OBJECT_TYPE_NONE
,
47 LTTNG_OBJECT_TYPE_CHANNEL
,
48 LTTNG_OBJECT_TYPE_SESSION
,
51 struct lttng_trigger_list_element
{
52 /* No ownership of the trigger object is assumed. */
53 const struct lttng_trigger
*trigger
;
54 struct cds_list_head node
;
57 struct lttng_channel_trigger_list
{
58 struct channel_key channel_key
;
59 /* List of struct lttng_trigger_list_element. */
60 struct cds_list_head list
;
61 /* Node in the channel_triggers_ht */
62 struct cds_lfht_node channel_triggers_ht_node
;
63 /* call_rcu delayed reclaim. */
64 struct rcu_head rcu_node
;
68 * List of triggers applying to a given session.
71 * - lttng_session_trigger_list_create()
72 * - lttng_session_trigger_list_build()
73 * - lttng_session_trigger_list_destroy()
74 * - lttng_session_trigger_list_add()
76 struct lttng_session_trigger_list
{
78 * Not owned by this; points to the session_info structure's
81 const char *session_name
;
82 /* List of struct lttng_trigger_list_element. */
83 struct cds_list_head list
;
84 /* Node in the session_triggers_ht */
85 struct cds_lfht_node session_triggers_ht_node
;
87 * Weak reference to the notification system's session triggers
90 * The session trigger list structure structure is owned by
91 * the session's session_info.
93 * The session_info is kept alive the the channel_infos holding a
94 * reference to it (reference counting). When those channels are
95 * destroyed (at runtime or on teardown), the reference they hold
96 * to the session_info are released. On destruction of session_info,
97 * session_info_destroy() will remove the list of triggers applying
98 * to this session from the notification system's state.
100 * This implies that the session_triggers_ht must be destroyed
101 * after the channels.
103 struct cds_lfht
*session_triggers_ht
;
104 /* Used for delayed RCU reclaim. */
105 struct rcu_head rcu_node
;
108 struct lttng_trigger_ht_element
{
109 struct lttng_trigger
*trigger
;
110 struct cds_lfht_node node
;
111 /* call_rcu delayed reclaim. */
112 struct rcu_head rcu_node
;
115 struct lttng_condition_list_element
{
116 struct lttng_condition
*condition
;
117 struct cds_list_head node
;
120 struct notification_client_list_element
{
121 struct notification_client
*client
;
122 struct cds_list_head node
;
125 struct notification_client_list
{
126 const struct lttng_trigger
*trigger
;
127 struct cds_list_head list
;
128 struct cds_lfht_node notification_trigger_ht_node
;
129 /* call_rcu delayed reclaim. */
130 struct rcu_head rcu_node
;
133 struct notification_client
{
134 notification_client_id id
;
136 /* Client protocol version. */
137 uint8_t major
, minor
;
141 * Indicates if the credentials and versions of the client have been
146 * Conditions to which the client's notification channel is subscribed.
147 * List of struct lttng_condition_list_node. The condition member is
148 * owned by the client.
150 struct cds_list_head condition_list
;
151 struct cds_lfht_node client_socket_ht_node
;
152 struct cds_lfht_node client_id_ht_node
;
155 * If a client's communication is inactive, it means a fatal
156 * error (either a protocol error or the socket API returned
157 * a fatal error). No further communication should be attempted;
158 * the client is queued for clean-up.
163 * During the reception of a message, the reception
164 * buffers' "size" is set to contain the current
165 * message's complete payload.
167 struct lttng_dynamic_buffer buffer
;
168 /* Bytes left to receive for the current message. */
169 size_t bytes_to_receive
;
170 /* Type of the message being received. */
171 enum lttng_notification_channel_message_type msg_type
;
173 * Indicates whether or not credentials are expected
178 * Indicates whether or not credentials were received
182 /* Only used during credentials reception. */
183 lttng_sock_cred creds
;
187 * Indicates whether or not a notification addressed to
188 * this client was dropped because a command reply was
191 * A notification is dropped whenever the buffer is not
194 bool dropped_notification
;
196 * Indicates whether or not a command reply is already
197 * buffered. In this case, it means that the client is
198 * not consuming command replies before emitting a new
199 * one. This could be caused by a protocol error or a
200 * misbehaving/malicious client.
202 bool queued_command_reply
;
203 struct lttng_dynamic_buffer buffer
;
206 /* call_rcu delayed reclaim. */
207 struct rcu_head rcu_node
;
210 struct channel_state_sample
{
211 struct channel_key key
;
212 struct cds_lfht_node channel_state_ht_node
;
213 uint64_t highest_usage
;
214 uint64_t lowest_usage
;
215 uint64_t channel_total_consumed
;
216 /* call_rcu delayed reclaim. */
217 struct rcu_head rcu_node
;
220 static unsigned long hash_channel_key(struct channel_key
*key
);
221 static int evaluate_buffer_condition(const struct lttng_condition
*condition
,
222 struct lttng_evaluation
**evaluation
,
223 const struct notification_thread_state
*state
,
224 const struct channel_state_sample
*previous_sample
,
225 const struct channel_state_sample
*latest_sample
,
226 uint64_t previous_session_consumed_total
,
227 uint64_t latest_session_consumed_total
,
228 struct channel_info
*channel_info
);
230 int send_evaluation_to_clients(const struct lttng_trigger
*trigger
,
231 const struct lttng_evaluation
*evaluation
,
232 struct notification_client_list
*client_list
,
233 struct notification_thread_state
*state
,
234 uid_t channel_uid
, gid_t channel_gid
);
237 /* session_info API */
239 void session_info_destroy(void *_data
);
241 void session_info_get(struct session_info
*session_info
);
243 void session_info_put(struct session_info
*session_info
);
245 struct session_info
*session_info_create(const char *name
,
246 uid_t uid
, gid_t gid
,
247 struct lttng_session_trigger_list
*trigger_list
,
248 struct cds_lfht
*sessions_ht
);
250 void session_info_add_channel(struct session_info
*session_info
,
251 struct channel_info
*channel_info
);
253 void session_info_remove_channel(struct session_info
*session_info
,
254 struct channel_info
*channel_info
);
256 /* lttng_session_trigger_list API */
258 struct lttng_session_trigger_list
*lttng_session_trigger_list_create(
259 const char *session_name
,
260 struct cds_lfht
*session_triggers_ht
);
262 struct lttng_session_trigger_list
*lttng_session_trigger_list_build(
263 const struct notification_thread_state
*state
,
264 const char *session_name
);
266 void lttng_session_trigger_list_destroy(
267 struct lttng_session_trigger_list
*list
);
269 int lttng_session_trigger_list_add(struct lttng_session_trigger_list
*list
,
270 const struct lttng_trigger
*trigger
);
274 int match_client_socket(struct cds_lfht_node
*node
, const void *key
)
276 /* This double-cast is intended to supress pointer-to-cast warning. */
277 const int socket
= (int) (intptr_t) key
;
278 const struct notification_client
*client
= caa_container_of(node
,
279 struct notification_client
, client_socket_ht_node
);
281 return client
->socket
== socket
;
285 int match_client_id(struct cds_lfht_node
*node
, const void *key
)
287 /* This double-cast is intended to supress pointer-to-cast warning. */
288 const notification_client_id id
= *((notification_client_id
*) key
);
289 const struct notification_client
*client
= caa_container_of(
290 node
, struct notification_client
, client_id_ht_node
);
292 return client
->id
== id
;
296 int match_channel_trigger_list(struct cds_lfht_node
*node
, const void *key
)
298 struct channel_key
*channel_key
= (struct channel_key
*) key
;
299 struct lttng_channel_trigger_list
*trigger_list
;
301 trigger_list
= caa_container_of(node
, struct lttng_channel_trigger_list
,
302 channel_triggers_ht_node
);
304 return !!((channel_key
->key
== trigger_list
->channel_key
.key
) &&
305 (channel_key
->domain
== trigger_list
->channel_key
.domain
));
309 int match_session_trigger_list(struct cds_lfht_node
*node
, const void *key
)
311 const char *session_name
= (const char *) key
;
312 struct lttng_session_trigger_list
*trigger_list
;
314 trigger_list
= caa_container_of(node
, struct lttng_session_trigger_list
,
315 session_triggers_ht_node
);
317 return !!(strcmp(trigger_list
->session_name
, session_name
) == 0);
321 int match_channel_state_sample(struct cds_lfht_node
*node
, const void *key
)
323 struct channel_key
*channel_key
= (struct channel_key
*) key
;
324 struct channel_state_sample
*sample
;
326 sample
= caa_container_of(node
, struct channel_state_sample
,
327 channel_state_ht_node
);
329 return !!((channel_key
->key
== sample
->key
.key
) &&
330 (channel_key
->domain
== sample
->key
.domain
));
334 int match_channel_info(struct cds_lfht_node
*node
, const void *key
)
336 struct channel_key
*channel_key
= (struct channel_key
*) key
;
337 struct channel_info
*channel_info
;
339 channel_info
= caa_container_of(node
, struct channel_info
,
342 return !!((channel_key
->key
== channel_info
->key
.key
) &&
343 (channel_key
->domain
== channel_info
->key
.domain
));
347 int match_condition(struct cds_lfht_node
*node
, const void *key
)
349 struct lttng_condition
*condition_key
= (struct lttng_condition
*) key
;
350 struct lttng_trigger_ht_element
*trigger
;
351 struct lttng_condition
*condition
;
353 trigger
= caa_container_of(node
, struct lttng_trigger_ht_element
,
355 condition
= lttng_trigger_get_condition(trigger
->trigger
);
358 return !!lttng_condition_is_equal(condition_key
, condition
);
362 int match_client_list_condition(struct cds_lfht_node
*node
, const void *key
)
364 struct lttng_condition
*condition_key
= (struct lttng_condition
*) key
;
365 struct notification_client_list
*client_list
;
366 const struct lttng_condition
*condition
;
368 assert(condition_key
);
370 client_list
= caa_container_of(node
, struct notification_client_list
,
371 notification_trigger_ht_node
);
372 condition
= lttng_trigger_get_const_condition(client_list
->trigger
);
374 return !!lttng_condition_is_equal(condition_key
, condition
);
378 int match_session(struct cds_lfht_node
*node
, const void *key
)
380 const char *name
= key
;
381 struct session_info
*session_info
= caa_container_of(
382 node
, struct session_info
, sessions_ht_node
);
384 return !strcmp(session_info
->name
, name
);
388 unsigned long lttng_condition_buffer_usage_hash(
389 const struct lttng_condition
*_condition
)
392 unsigned long condition_type
;
393 struct lttng_condition_buffer_usage
*condition
;
395 condition
= container_of(_condition
,
396 struct lttng_condition_buffer_usage
, parent
);
398 condition_type
= (unsigned long) condition
->parent
.type
;
399 hash
= hash_key_ulong((void *) condition_type
, lttng_ht_seed
);
400 if (condition
->session_name
) {
401 hash
^= hash_key_str(condition
->session_name
, lttng_ht_seed
);
403 if (condition
->channel_name
) {
404 hash
^= hash_key_str(condition
->channel_name
, lttng_ht_seed
);
406 if (condition
->domain
.set
) {
407 hash
^= hash_key_ulong(
408 (void *) condition
->domain
.type
,
411 if (condition
->threshold_ratio
.set
) {
414 val
= condition
->threshold_ratio
.value
* (double) UINT32_MAX
;
415 hash
^= hash_key_u64(&val
, lttng_ht_seed
);
416 } else if (condition
->threshold_bytes
.set
) {
419 val
= condition
->threshold_bytes
.value
;
420 hash
^= hash_key_u64(&val
, lttng_ht_seed
);
426 unsigned long lttng_condition_session_consumed_size_hash(
427 const struct lttng_condition
*_condition
)
430 unsigned long condition_type
=
431 (unsigned long) LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE
;
432 struct lttng_condition_session_consumed_size
*condition
;
435 condition
= container_of(_condition
,
436 struct lttng_condition_session_consumed_size
, parent
);
438 hash
= hash_key_ulong((void *) condition_type
, lttng_ht_seed
);
439 if (condition
->session_name
) {
440 hash
^= hash_key_str(condition
->session_name
, lttng_ht_seed
);
442 val
= condition
->consumed_threshold_bytes
.value
;
443 hash
^= hash_key_u64(&val
, lttng_ht_seed
);
448 unsigned long lttng_condition_session_rotation_hash(
449 const struct lttng_condition
*_condition
)
451 unsigned long hash
, condition_type
;
452 struct lttng_condition_session_rotation
*condition
;
454 condition
= container_of(_condition
,
455 struct lttng_condition_session_rotation
, parent
);
456 condition_type
= (unsigned long) condition
->parent
.type
;
457 hash
= hash_key_ulong((void *) condition_type
, lttng_ht_seed
);
458 assert(condition
->session_name
);
459 hash
^= hash_key_str(condition
->session_name
, lttng_ht_seed
);
464 * The lttng_condition hashing code is kept in this file (rather than
465 * condition.c) since it makes use of GPLv2 code (hashtable utils), which we
466 * don't want to link in liblttng-ctl.
469 unsigned long lttng_condition_hash(const struct lttng_condition
*condition
)
471 switch (condition
->type
) {
472 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW
:
473 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH
:
474 return lttng_condition_buffer_usage_hash(condition
);
475 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE
:
476 return lttng_condition_session_consumed_size_hash(condition
);
477 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING
:
478 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED
:
479 return lttng_condition_session_rotation_hash(condition
);
481 ERR("[notification-thread] Unexpected condition type caught");
487 unsigned long hash_channel_key(struct channel_key
*key
)
489 unsigned long key_hash
= hash_key_u64(&key
->key
, lttng_ht_seed
);
490 unsigned long domain_hash
= hash_key_ulong(
491 (void *) (unsigned long) key
->domain
, lttng_ht_seed
);
493 return key_hash
^ domain_hash
;
497 unsigned long hash_client_socket(int socket
)
499 return hash_key_ulong((void *) (unsigned long) socket
, lttng_ht_seed
);
503 unsigned long hash_client_id(notification_client_id id
)
505 return hash_key_u64(&id
, lttng_ht_seed
);
509 * Get the type of object to which a given condition applies. Bindings let
510 * the notification system evaluate a trigger's condition when a given
511 * object's state is updated.
513 * For instance, a condition bound to a channel will be evaluated everytime
514 * the channel's state is changed by a channel monitoring sample.
517 enum lttng_object_type
get_condition_binding_object(
518 const struct lttng_condition
*condition
)
520 switch (lttng_condition_get_type(condition
)) {
521 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW
:
522 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH
:
523 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE
:
524 return LTTNG_OBJECT_TYPE_CHANNEL
;
525 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING
:
526 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED
:
527 return LTTNG_OBJECT_TYPE_SESSION
;
529 return LTTNG_OBJECT_TYPE_UNKNOWN
;
534 void free_channel_info_rcu(struct rcu_head
*node
)
536 free(caa_container_of(node
, struct channel_info
, rcu_node
));
540 void channel_info_destroy(struct channel_info
*channel_info
)
546 if (channel_info
->session_info
) {
547 session_info_remove_channel(channel_info
->session_info
,
549 session_info_put(channel_info
->session_info
);
551 if (channel_info
->name
) {
552 free(channel_info
->name
);
554 call_rcu(&channel_info
->rcu_node
, free_channel_info_rcu
);
558 void free_session_info_rcu(struct rcu_head
*node
)
560 free(caa_container_of(node
, struct session_info
, rcu_node
));
563 /* Don't call directly, use the ref-counting mechanism. */
565 void session_info_destroy(void *_data
)
567 struct session_info
*session_info
= _data
;
570 assert(session_info
);
571 if (session_info
->channel_infos_ht
) {
572 ret
= cds_lfht_destroy(session_info
->channel_infos_ht
, NULL
);
574 ERR("[notification-thread] Failed to destroy channel information hash table");
577 lttng_session_trigger_list_destroy(session_info
->trigger_list
);
580 cds_lfht_del(session_info
->sessions_ht
,
581 &session_info
->sessions_ht_node
);
583 free(session_info
->name
);
584 call_rcu(&session_info
->rcu_node
, free_session_info_rcu
);
588 void session_info_get(struct session_info
*session_info
)
593 lttng_ref_get(&session_info
->ref
);
597 void session_info_put(struct session_info
*session_info
)
602 lttng_ref_put(&session_info
->ref
);
606 struct session_info
*session_info_create(const char *name
, uid_t uid
, gid_t gid
,
607 struct lttng_session_trigger_list
*trigger_list
,
608 struct cds_lfht
*sessions_ht
)
610 struct session_info
*session_info
;
614 session_info
= zmalloc(sizeof(*session_info
));
618 lttng_ref_init(&session_info
->ref
, session_info_destroy
);
620 session_info
->channel_infos_ht
= cds_lfht_new(DEFAULT_HT_SIZE
,
621 1, 0, CDS_LFHT_AUTO_RESIZE
| CDS_LFHT_ACCOUNTING
, NULL
);
622 if (!session_info
->channel_infos_ht
) {
626 cds_lfht_node_init(&session_info
->sessions_ht_node
);
627 session_info
->name
= strdup(name
);
628 if (!session_info
->name
) {
631 session_info
->uid
= uid
;
632 session_info
->gid
= gid
;
633 session_info
->trigger_list
= trigger_list
;
634 session_info
->sessions_ht
= sessions_ht
;
638 session_info_put(session_info
);
643 void session_info_add_channel(struct session_info
*session_info
,
644 struct channel_info
*channel_info
)
647 cds_lfht_add(session_info
->channel_infos_ht
,
648 hash_channel_key(&channel_info
->key
),
649 &channel_info
->session_info_channels_ht_node
);
654 void session_info_remove_channel(struct session_info
*session_info
,
655 struct channel_info
*channel_info
)
658 cds_lfht_del(session_info
->channel_infos_ht
,
659 &channel_info
->session_info_channels_ht_node
);
664 struct channel_info
*channel_info_create(const char *channel_name
,
665 struct channel_key
*channel_key
, uint64_t channel_capacity
,
666 struct session_info
*session_info
)
668 struct channel_info
*channel_info
= zmalloc(sizeof(*channel_info
));
674 cds_lfht_node_init(&channel_info
->channels_ht_node
);
675 cds_lfht_node_init(&channel_info
->session_info_channels_ht_node
);
676 memcpy(&channel_info
->key
, channel_key
, sizeof(*channel_key
));
677 channel_info
->capacity
= channel_capacity
;
679 channel_info
->name
= strdup(channel_name
);
680 if (!channel_info
->name
) {
685 * Set the references between session and channel infos:
686 * - channel_info holds a strong reference to session_info
687 * - session_info holds a weak reference to channel_info
689 session_info_get(session_info
);
690 session_info_add_channel(session_info
, channel_info
);
691 channel_info
->session_info
= session_info
;
695 channel_info_destroy(channel_info
);
699 /* RCU read lock must be held by the caller. */
701 struct notification_client_list
*get_client_list_from_condition(
702 struct notification_thread_state
*state
,
703 const struct lttng_condition
*condition
)
705 struct cds_lfht_node
*node
;
706 struct cds_lfht_iter iter
;
708 cds_lfht_lookup(state
->notification_trigger_clients_ht
,
709 lttng_condition_hash(condition
),
710 match_client_list_condition
,
713 node
= cds_lfht_iter_get_node(&iter
);
715 return node
? caa_container_of(node
,
716 struct notification_client_list
,
717 notification_trigger_ht_node
) : NULL
;
720 /* This function must be called with the RCU read lock held. */
722 int evaluate_channel_condition_for_client(
723 const struct lttng_condition
*condition
,
724 struct notification_thread_state
*state
,
725 struct lttng_evaluation
**evaluation
,
726 uid_t
*session_uid
, gid_t
*session_gid
)
729 struct cds_lfht_iter iter
;
730 struct cds_lfht_node
*node
;
731 struct channel_info
*channel_info
= NULL
;
732 struct channel_key
*channel_key
= NULL
;
733 struct channel_state_sample
*last_sample
= NULL
;
734 struct lttng_channel_trigger_list
*channel_trigger_list
= NULL
;
736 /* Find the channel associated with the condition. */
737 cds_lfht_for_each_entry(state
->channel_triggers_ht
, &iter
,
738 channel_trigger_list
, channel_triggers_ht_node
) {
739 struct lttng_trigger_list_element
*element
;
741 cds_list_for_each_entry(element
, &channel_trigger_list
->list
, node
) {
742 const struct lttng_condition
*current_condition
=
743 lttng_trigger_get_const_condition(
746 assert(current_condition
);
747 if (!lttng_condition_is_equal(condition
,
748 current_condition
)) {
752 /* Found the trigger, save the channel key. */
753 channel_key
= &channel_trigger_list
->channel_key
;
757 /* The channel key was found stop iteration. */
763 /* No channel found; normal exit. */
764 DBG("[notification-thread] No known channel associated with newly subscribed-to condition");
769 /* Fetch channel info for the matching channel. */
770 cds_lfht_lookup(state
->channels_ht
,
771 hash_channel_key(channel_key
),
775 node
= cds_lfht_iter_get_node(&iter
);
777 channel_info
= caa_container_of(node
, struct channel_info
,
780 /* Retrieve the channel's last sample, if it exists. */
781 cds_lfht_lookup(state
->channel_state_ht
,
782 hash_channel_key(channel_key
),
783 match_channel_state_sample
,
786 node
= cds_lfht_iter_get_node(&iter
);
788 last_sample
= caa_container_of(node
,
789 struct channel_state_sample
,
790 channel_state_ht_node
);
792 /* Nothing to evaluate, no sample was ever taken. Normal exit */
793 DBG("[notification-thread] No channel sample associated with newly subscribed-to condition");
798 ret
= evaluate_buffer_condition(condition
, evaluation
, state
,
800 0, channel_info
->session_info
->consumed_data_size
,
803 WARN("[notification-thread] Fatal error occurred while evaluating a newly subscribed-to condition");
807 *session_uid
= channel_info
->session_info
->uid
;
808 *session_gid
= channel_info
->session_info
->gid
;
814 const char *get_condition_session_name(const struct lttng_condition
*condition
)
816 const char *session_name
= NULL
;
817 enum lttng_condition_status status
;
819 switch (lttng_condition_get_type(condition
)) {
820 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW
:
821 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH
:
822 status
= lttng_condition_buffer_usage_get_session_name(
823 condition
, &session_name
);
825 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE
:
826 status
= lttng_condition_session_consumed_size_get_session_name(
827 condition
, &session_name
);
829 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING
:
830 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED
:
831 status
= lttng_condition_session_rotation_get_session_name(
832 condition
, &session_name
);
837 if (status
!= LTTNG_CONDITION_STATUS_OK
) {
838 ERR("[notification-thread] Failed to retrieve session rotation condition's session name");
845 /* This function must be called with the RCU read lock held. */
847 int evaluate_session_condition_for_client(
848 const struct lttng_condition
*condition
,
849 struct notification_thread_state
*state
,
850 struct lttng_evaluation
**evaluation
,
851 uid_t
*session_uid
, gid_t
*session_gid
)
854 struct cds_lfht_iter iter
;
855 struct cds_lfht_node
*node
;
856 const char *session_name
;
857 struct session_info
*session_info
= NULL
;
859 session_name
= get_condition_session_name(condition
);
861 /* Find the session associated with the trigger. */
862 cds_lfht_lookup(state
->sessions_ht
,
863 hash_key_str(session_name
, lttng_ht_seed
),
867 node
= cds_lfht_iter_get_node(&iter
);
869 DBG("[notification-thread] No known session matching name \"%s\"",
875 session_info
= caa_container_of(node
, struct session_info
,
877 session_info_get(session_info
);
880 * Evaluation is performed in-line here since only one type of
881 * session-bound condition is handled for the moment.
883 switch (lttng_condition_get_type(condition
)) {
884 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING
:
885 if (!session_info
->rotation
.ongoing
) {
887 goto end_session_put
;
890 *evaluation
= lttng_evaluation_session_rotation_ongoing_create(
891 session_info
->rotation
.id
);
894 ERR("[notification-thread] Failed to create session rotation ongoing evaluation for session \"%s\"",
897 goto end_session_put
;
903 goto end_session_put
;
906 *session_uid
= session_info
->uid
;
907 *session_gid
= session_info
->gid
;
910 session_info_put(session_info
);
915 /* This function must be called with the RCU read lock held. */
917 int evaluate_condition_for_client(const struct lttng_trigger
*trigger
,
918 const struct lttng_condition
*condition
,
919 struct notification_client
*client
,
920 struct notification_thread_state
*state
)
923 struct lttng_evaluation
*evaluation
= NULL
;
924 struct notification_client_list client_list
= { 0 };
925 struct notification_client_list_element client_list_element
= { 0 };
926 uid_t object_uid
= 0;
927 gid_t object_gid
= 0;
934 switch (get_condition_binding_object(condition
)) {
935 case LTTNG_OBJECT_TYPE_SESSION
:
936 ret
= evaluate_session_condition_for_client(condition
, state
,
937 &evaluation
, &object_uid
, &object_gid
);
939 case LTTNG_OBJECT_TYPE_CHANNEL
:
940 ret
= evaluate_channel_condition_for_client(condition
, state
,
941 &evaluation
, &object_uid
, &object_gid
);
943 case LTTNG_OBJECT_TYPE_NONE
:
946 case LTTNG_OBJECT_TYPE_UNKNOWN
:
956 /* Evaluation yielded nothing. Normal exit. */
957 DBG("[notification-thread] Newly subscribed-to condition evaluated to false, nothing to report to client");
963 * Create a temporary client list with the client currently
966 cds_lfht_node_init(&client_list
.notification_trigger_ht_node
);
967 CDS_INIT_LIST_HEAD(&client_list
.list
);
968 client_list
.trigger
= trigger
;
970 CDS_INIT_LIST_HEAD(&client_list_element
.node
);
971 client_list_element
.client
= client
;
972 cds_list_add(&client_list_element
.node
, &client_list
.list
);
974 /* Send evaluation result to the newly-subscribed client. */
975 DBG("[notification-thread] Newly subscribed-to condition evaluated to true, notifying client");
976 ret
= send_evaluation_to_clients(trigger
, evaluation
, &client_list
,
977 state
, object_uid
, object_gid
);
984 int notification_thread_client_subscribe(struct notification_client
*client
,
985 struct lttng_condition
*condition
,
986 struct notification_thread_state
*state
,
987 enum lttng_notification_channel_status
*_status
)
990 struct notification_client_list
*client_list
;
991 struct lttng_condition_list_element
*condition_list_element
= NULL
;
992 struct notification_client_list_element
*client_list_element
= NULL
;
993 enum lttng_notification_channel_status status
=
994 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
997 * Ensure that the client has not already subscribed to this condition
1000 cds_list_for_each_entry(condition_list_element
, &client
->condition_list
, node
) {
1001 if (lttng_condition_is_equal(condition_list_element
->condition
,
1003 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ALREADY_SUBSCRIBED
;
1008 condition_list_element
= zmalloc(sizeof(*condition_list_element
));
1009 if (!condition_list_element
) {
1013 client_list_element
= zmalloc(sizeof(*client_list_element
));
1014 if (!client_list_element
) {
1022 * Add the newly-subscribed condition to the client's subscription list.
1024 CDS_INIT_LIST_HEAD(&condition_list_element
->node
);
1025 condition_list_element
->condition
= condition
;
1026 cds_list_add(&condition_list_element
->node
, &client
->condition_list
);
1028 client_list
= get_client_list_from_condition(state
, condition
);
1031 * No notification-emiting trigger registered with this
1032 * condition. We don't evaluate the condition right away
1033 * since this trigger is not registered yet.
1035 free(client_list_element
);
1040 * The condition to which the client just subscribed is evaluated
1041 * at this point so that conditions that are already TRUE result
1042 * in a notification being sent out.
1044 if (evaluate_condition_for_client(client_list
->trigger
, condition
,
1046 WARN("[notification-thread] Evaluation of a condition on client subscription failed, aborting.");
1048 free(client_list_element
);
1053 * Add the client to the list of clients interested in a given trigger
1054 * if a "notification" trigger with a corresponding condition was
1057 client_list_element
->client
= client
;
1058 CDS_INIT_LIST_HEAD(&client_list_element
->node
);
1059 cds_list_add(&client_list_element
->node
, &client_list
->list
);
1068 free(condition_list_element
);
1069 free(client_list_element
);
1074 int notification_thread_client_unsubscribe(
1075 struct notification_client
*client
,
1076 struct lttng_condition
*condition
,
1077 struct notification_thread_state
*state
,
1078 enum lttng_notification_channel_status
*_status
)
1080 struct notification_client_list
*client_list
;
1081 struct lttng_condition_list_element
*condition_list_element
,
1083 struct notification_client_list_element
*client_list_element
,
1085 bool condition_found
= false;
1086 enum lttng_notification_channel_status status
=
1087 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
1089 /* Remove the condition from the client's condition list. */
1090 cds_list_for_each_entry_safe(condition_list_element
, condition_tmp
,
1091 &client
->condition_list
, node
) {
1092 if (!lttng_condition_is_equal(condition_list_element
->condition
,
1097 cds_list_del(&condition_list_element
->node
);
1099 * The caller may be iterating on the client's conditions to
1100 * tear down a client's connection. In this case, the condition
1101 * will be destroyed at the end.
1103 if (condition
!= condition_list_element
->condition
) {
1104 lttng_condition_destroy(
1105 condition_list_element
->condition
);
1107 free(condition_list_element
);
1108 condition_found
= true;
1112 if (!condition_found
) {
1113 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_UNKNOWN_CONDITION
;
1118 * Remove the client from the list of clients interested the trigger
1119 * matching the condition.
1122 client_list
= get_client_list_from_condition(state
, condition
);
1127 cds_list_for_each_entry_safe(client_list_element
, client_tmp
,
1128 &client_list
->list
, node
) {
1129 if (client_list_element
->client
->socket
!= client
->socket
) {
1132 cds_list_del(&client_list_element
->node
);
1133 free(client_list_element
);
1139 lttng_condition_destroy(condition
);
1147 void free_notification_client_rcu(struct rcu_head
*node
)
1149 free(caa_container_of(node
, struct notification_client
, rcu_node
));
1153 void notification_client_destroy(struct notification_client
*client
,
1154 struct notification_thread_state
*state
)
1156 struct lttng_condition_list_element
*condition_list_element
, *tmp
;
1162 /* Release all conditions to which the client was subscribed. */
1163 cds_list_for_each_entry_safe(condition_list_element
, tmp
,
1164 &client
->condition_list
, node
) {
1165 (void) notification_thread_client_unsubscribe(client
,
1166 condition_list_element
->condition
, state
, NULL
);
1169 if (client
->socket
>= 0) {
1170 (void) lttcomm_close_unix_sock(client
->socket
);
1171 client
->socket
= -1;
1173 lttng_dynamic_buffer_reset(&client
->communication
.inbound
.buffer
);
1174 lttng_dynamic_buffer_reset(&client
->communication
.outbound
.buffer
);
1175 call_rcu(&client
->rcu_node
, free_notification_client_rcu
);
1179 * Call with rcu_read_lock held (and hold for the lifetime of the returned
1183 struct notification_client
*get_client_from_socket(int socket
,
1184 struct notification_thread_state
*state
)
1186 struct cds_lfht_iter iter
;
1187 struct cds_lfht_node
*node
;
1188 struct notification_client
*client
= NULL
;
1190 cds_lfht_lookup(state
->client_socket_ht
,
1191 hash_client_socket(socket
),
1192 match_client_socket
,
1193 (void *) (unsigned long) socket
,
1195 node
= cds_lfht_iter_get_node(&iter
);
1200 client
= caa_container_of(node
, struct notification_client
,
1201 client_socket_ht_node
);
1207 * Call with rcu_read_lock held (and hold for the lifetime of the returned
1211 struct notification_client
*get_client_from_id(notification_client_id id
,
1212 struct notification_thread_state
*state
)
1214 struct cds_lfht_iter iter
;
1215 struct cds_lfht_node
*node
;
1216 struct notification_client
*client
= NULL
;
1218 cds_lfht_lookup(state
->client_id_ht
,
1223 node
= cds_lfht_iter_get_node(&iter
);
1228 client
= caa_container_of(node
, struct notification_client
,
1235 bool buffer_usage_condition_applies_to_channel(
1236 const struct lttng_condition
*condition
,
1237 const struct channel_info
*channel_info
)
1239 enum lttng_condition_status status
;
1240 enum lttng_domain_type condition_domain
;
1241 const char *condition_session_name
= NULL
;
1242 const char *condition_channel_name
= NULL
;
1244 status
= lttng_condition_buffer_usage_get_domain_type(condition
,
1246 assert(status
== LTTNG_CONDITION_STATUS_OK
);
1247 if (channel_info
->key
.domain
!= condition_domain
) {
1251 status
= lttng_condition_buffer_usage_get_session_name(
1252 condition
, &condition_session_name
);
1253 assert((status
== LTTNG_CONDITION_STATUS_OK
) && condition_session_name
);
1255 status
= lttng_condition_buffer_usage_get_channel_name(
1256 condition
, &condition_channel_name
);
1257 assert((status
== LTTNG_CONDITION_STATUS_OK
) && condition_channel_name
);
1259 if (strcmp(channel_info
->session_info
->name
, condition_session_name
)) {
1262 if (strcmp(channel_info
->name
, condition_channel_name
)) {
1272 bool session_consumed_size_condition_applies_to_channel(
1273 const struct lttng_condition
*condition
,
1274 const struct channel_info
*channel_info
)
1276 enum lttng_condition_status status
;
1277 const char *condition_session_name
= NULL
;
1279 status
= lttng_condition_session_consumed_size_get_session_name(
1280 condition
, &condition_session_name
);
1281 assert((status
== LTTNG_CONDITION_STATUS_OK
) && condition_session_name
);
1283 if (strcmp(channel_info
->session_info
->name
, condition_session_name
)) {
1293 bool trigger_applies_to_channel(const struct lttng_trigger
*trigger
,
1294 const struct channel_info
*channel_info
)
1296 const struct lttng_condition
*condition
;
1297 bool trigger_applies
;
1299 condition
= lttng_trigger_get_const_condition(trigger
);
1304 switch (lttng_condition_get_type(condition
)) {
1305 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW
:
1306 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH
:
1307 trigger_applies
= buffer_usage_condition_applies_to_channel(
1308 condition
, channel_info
);
1310 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE
:
1311 trigger_applies
= session_consumed_size_condition_applies_to_channel(
1312 condition
, channel_info
);
1318 return trigger_applies
;
1324 bool trigger_applies_to_client(struct lttng_trigger
*trigger
,
1325 struct notification_client
*client
)
1327 bool applies
= false;
1328 struct lttng_condition_list_element
*condition_list_element
;
1330 cds_list_for_each_entry(condition_list_element
, &client
->condition_list
,
1332 applies
= lttng_condition_is_equal(
1333 condition_list_element
->condition
,
1334 lttng_trigger_get_condition(trigger
));
1342 /* Must be called with RCU read lock held. */
1344 struct lttng_session_trigger_list
*get_session_trigger_list(
1345 struct notification_thread_state
*state
,
1346 const char *session_name
)
1348 struct lttng_session_trigger_list
*list
= NULL
;
1349 struct cds_lfht_node
*node
;
1350 struct cds_lfht_iter iter
;
1352 cds_lfht_lookup(state
->session_triggers_ht
,
1353 hash_key_str(session_name
, lttng_ht_seed
),
1354 match_session_trigger_list
,
1357 node
= cds_lfht_iter_get_node(&iter
);
1360 * Not an error, the list of triggers applying to that session
1361 * will be initialized when the session is created.
1363 DBG("[notification-thread] No trigger list found for session \"%s\" as it is not yet known to the notification system",
1368 list
= caa_container_of(node
,
1369 struct lttng_session_trigger_list
,
1370 session_triggers_ht_node
);
1376 * Allocate an empty lttng_session_trigger_list for the session named
1379 * No ownership of 'session_name' is assumed by the session trigger list.
1380 * It is the caller's responsability to ensure the session name is alive
1381 * for as long as this list is.
1384 struct lttng_session_trigger_list
*lttng_session_trigger_list_create(
1385 const char *session_name
,
1386 struct cds_lfht
*session_triggers_ht
)
1388 struct lttng_session_trigger_list
*list
;
1390 list
= zmalloc(sizeof(*list
));
1394 list
->session_name
= session_name
;
1395 CDS_INIT_LIST_HEAD(&list
->list
);
1396 cds_lfht_node_init(&list
->session_triggers_ht_node
);
1397 list
->session_triggers_ht
= session_triggers_ht
;
1400 /* Publish the list through the session_triggers_ht. */
1401 cds_lfht_add(session_triggers_ht
,
1402 hash_key_str(session_name
, lttng_ht_seed
),
1403 &list
->session_triggers_ht_node
);
1410 void free_session_trigger_list_rcu(struct rcu_head
*node
)
1412 free(caa_container_of(node
, struct lttng_session_trigger_list
,
1417 void lttng_session_trigger_list_destroy(struct lttng_session_trigger_list
*list
)
1419 struct lttng_trigger_list_element
*trigger_list_element
, *tmp
;
1421 /* Empty the list element by element, and then free the list itself. */
1422 cds_list_for_each_entry_safe(trigger_list_element
, tmp
,
1423 &list
->list
, node
) {
1424 cds_list_del(&trigger_list_element
->node
);
1425 free(trigger_list_element
);
1428 /* Unpublish the list from the session_triggers_ht. */
1429 cds_lfht_del(list
->session_triggers_ht
,
1430 &list
->session_triggers_ht_node
);
1432 call_rcu(&list
->rcu_node
, free_session_trigger_list_rcu
);
1436 int lttng_session_trigger_list_add(struct lttng_session_trigger_list
*list
,
1437 const struct lttng_trigger
*trigger
)
1440 struct lttng_trigger_list_element
*new_element
=
1441 zmalloc(sizeof(*new_element
));
1447 CDS_INIT_LIST_HEAD(&new_element
->node
);
1448 new_element
->trigger
= trigger
;
1449 cds_list_add(&new_element
->node
, &list
->list
);
1455 bool trigger_applies_to_session(const struct lttng_trigger
*trigger
,
1456 const char *session_name
)
1458 bool applies
= false;
1459 const struct lttng_condition
*condition
;
1461 condition
= lttng_trigger_get_const_condition(trigger
);
1462 switch (lttng_condition_get_type(condition
)) {
1463 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING
:
1464 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED
:
1466 enum lttng_condition_status condition_status
;
1467 const char *condition_session_name
;
1469 condition_status
= lttng_condition_session_rotation_get_session_name(
1470 condition
, &condition_session_name
);
1471 if (condition_status
!= LTTNG_CONDITION_STATUS_OK
) {
1472 ERR("[notification-thread] Failed to retrieve session rotation condition's session name");
1476 assert(condition_session_name
);
1477 applies
= !strcmp(condition_session_name
, session_name
);
1488 * Allocate and initialize an lttng_session_trigger_list which contains
1489 * all triggers that apply to the session named 'session_name'.
1491 * No ownership of 'session_name' is assumed by the session trigger list.
1492 * It is the caller's responsability to ensure the session name is alive
1493 * for as long as this list is.
1496 struct lttng_session_trigger_list
*lttng_session_trigger_list_build(
1497 const struct notification_thread_state
*state
,
1498 const char *session_name
)
1500 int trigger_count
= 0;
1501 struct lttng_session_trigger_list
*session_trigger_list
= NULL
;
1502 struct lttng_trigger_ht_element
*trigger_ht_element
= NULL
;
1503 struct cds_lfht_iter iter
;
1505 session_trigger_list
= lttng_session_trigger_list_create(session_name
,
1506 state
->session_triggers_ht
);
1508 /* Add all triggers applying to the session named 'session_name'. */
1509 cds_lfht_for_each_entry(state
->triggers_ht
, &iter
, trigger_ht_element
,
1513 if (!trigger_applies_to_session(trigger_ht_element
->trigger
,
1518 ret
= lttng_session_trigger_list_add(session_trigger_list
,
1519 trigger_ht_element
->trigger
);
1527 DBG("[notification-thread] Found %i triggers that apply to newly created session",
1529 return session_trigger_list
;
1531 lttng_session_trigger_list_destroy(session_trigger_list
);
1536 struct session_info
*find_or_create_session_info(
1537 struct notification_thread_state
*state
,
1538 const char *name
, uid_t uid
, gid_t gid
)
1540 struct session_info
*session
= NULL
;
1541 struct cds_lfht_node
*node
;
1542 struct cds_lfht_iter iter
;
1543 struct lttng_session_trigger_list
*trigger_list
;
1546 cds_lfht_lookup(state
->sessions_ht
,
1547 hash_key_str(name
, lttng_ht_seed
),
1551 node
= cds_lfht_iter_get_node(&iter
);
1553 DBG("[notification-thread] Found session info of session \"%s\" (uid = %i, gid = %i)",
1555 session
= caa_container_of(node
, struct session_info
,
1557 assert(session
->uid
== uid
);
1558 assert(session
->gid
== gid
);
1559 session_info_get(session
);
1563 trigger_list
= lttng_session_trigger_list_build(state
, name
);
1564 if (!trigger_list
) {
1568 session
= session_info_create(name
, uid
, gid
, trigger_list
,
1569 state
->sessions_ht
);
1571 ERR("[notification-thread] Failed to allocation session info for session \"%s\" (uid = %i, gid = %i)",
1573 lttng_session_trigger_list_destroy(trigger_list
);
1576 trigger_list
= NULL
;
1578 cds_lfht_add(state
->sessions_ht
, hash_key_str(name
, lttng_ht_seed
),
1579 &session
->sessions_ht_node
);
1585 session_info_put(session
);
1590 int handle_notification_thread_command_add_channel(
1591 struct notification_thread_state
*state
,
1592 const char *session_name
, uid_t session_uid
, gid_t session_gid
,
1593 const char *channel_name
, enum lttng_domain_type channel_domain
,
1594 uint64_t channel_key_int
, uint64_t channel_capacity
,
1595 enum lttng_error_code
*cmd_result
)
1597 struct cds_list_head trigger_list
;
1598 struct channel_info
*new_channel_info
= NULL
;
1599 struct channel_key channel_key
= {
1600 .key
= channel_key_int
,
1601 .domain
= channel_domain
,
1603 struct lttng_channel_trigger_list
*channel_trigger_list
= NULL
;
1604 struct lttng_trigger_ht_element
*trigger_ht_element
= NULL
;
1605 int trigger_count
= 0;
1606 struct cds_lfht_iter iter
;
1607 struct session_info
*session_info
= NULL
;
1609 DBG("[notification-thread] Adding channel %s from session %s, channel key = %" PRIu64
" in %s domain",
1610 channel_name
, session_name
, channel_key_int
,
1611 channel_domain
== LTTNG_DOMAIN_KERNEL
? "kernel" : "user space");
1613 CDS_INIT_LIST_HEAD(&trigger_list
);
1615 session_info
= find_or_create_session_info(state
, session_name
,
1616 session_uid
, session_gid
);
1617 if (!session_info
) {
1618 /* Allocation error or an internal error occurred. */
1622 new_channel_info
= channel_info_create(channel_name
, &channel_key
,
1623 channel_capacity
, session_info
);
1624 if (!new_channel_info
) {
1629 /* Build a list of all triggers applying to the new channel. */
1630 cds_lfht_for_each_entry(state
->triggers_ht
, &iter
, trigger_ht_element
,
1632 struct lttng_trigger_list_element
*new_element
;
1634 if (!trigger_applies_to_channel(trigger_ht_element
->trigger
,
1635 new_channel_info
)) {
1639 new_element
= zmalloc(sizeof(*new_element
));
1644 CDS_INIT_LIST_HEAD(&new_element
->node
);
1645 new_element
->trigger
= trigger_ht_element
->trigger
;
1646 cds_list_add(&new_element
->node
, &trigger_list
);
1651 DBG("[notification-thread] Found %i triggers that apply to newly added channel",
1653 channel_trigger_list
= zmalloc(sizeof(*channel_trigger_list
));
1654 if (!channel_trigger_list
) {
1657 channel_trigger_list
->channel_key
= new_channel_info
->key
;
1658 CDS_INIT_LIST_HEAD(&channel_trigger_list
->list
);
1659 cds_lfht_node_init(&channel_trigger_list
->channel_triggers_ht_node
);
1660 cds_list_splice(&trigger_list
, &channel_trigger_list
->list
);
1663 /* Add channel to the channel_ht which owns the channel_infos. */
1664 cds_lfht_add(state
->channels_ht
,
1665 hash_channel_key(&new_channel_info
->key
),
1666 &new_channel_info
->channels_ht_node
);
1668 * Add the list of triggers associated with this channel to the
1669 * channel_triggers_ht.
1671 cds_lfht_add(state
->channel_triggers_ht
,
1672 hash_channel_key(&new_channel_info
->key
),
1673 &channel_trigger_list
->channel_triggers_ht_node
);
1675 session_info_put(session_info
);
1676 *cmd_result
= LTTNG_OK
;
1679 channel_info_destroy(new_channel_info
);
1680 session_info_put(session_info
);
1685 void free_channel_trigger_list_rcu(struct rcu_head
*node
)
1687 free(caa_container_of(node
, struct lttng_channel_trigger_list
,
1692 void free_channel_state_sample_rcu(struct rcu_head
*node
)
1694 free(caa_container_of(node
, struct channel_state_sample
,
1699 int handle_notification_thread_command_remove_channel(
1700 struct notification_thread_state
*state
,
1701 uint64_t channel_key
, enum lttng_domain_type domain
,
1702 enum lttng_error_code
*cmd_result
)
1704 struct cds_lfht_node
*node
;
1705 struct cds_lfht_iter iter
;
1706 struct lttng_channel_trigger_list
*trigger_list
;
1707 struct lttng_trigger_list_element
*trigger_list_element
, *tmp
;
1708 struct channel_key key
= { .key
= channel_key
, .domain
= domain
};
1709 struct channel_info
*channel_info
;
1711 DBG("[notification-thread] Removing channel key = %" PRIu64
" in %s domain",
1712 channel_key
, domain
== LTTNG_DOMAIN_KERNEL
? "kernel" : "user space");
1716 cds_lfht_lookup(state
->channel_triggers_ht
,
1717 hash_channel_key(&key
),
1718 match_channel_trigger_list
,
1721 node
= cds_lfht_iter_get_node(&iter
);
1723 * There is a severe internal error if we are being asked to remove a
1724 * channel that doesn't exist.
1727 ERR("[notification-thread] Channel being removed is unknown to the notification thread");
1731 /* Free the list of triggers associated with this channel. */
1732 trigger_list
= caa_container_of(node
, struct lttng_channel_trigger_list
,
1733 channel_triggers_ht_node
);
1734 cds_list_for_each_entry_safe(trigger_list_element
, tmp
,
1735 &trigger_list
->list
, node
) {
1736 cds_list_del(&trigger_list_element
->node
);
1737 free(trigger_list_element
);
1739 cds_lfht_del(state
->channel_triggers_ht
, node
);
1740 call_rcu(&trigger_list
->rcu_node
, free_channel_trigger_list_rcu
);
1742 /* Free sampled channel state. */
1743 cds_lfht_lookup(state
->channel_state_ht
,
1744 hash_channel_key(&key
),
1745 match_channel_state_sample
,
1748 node
= cds_lfht_iter_get_node(&iter
);
1750 * This is expected to be NULL if the channel is destroyed before we
1751 * received a sample.
1754 struct channel_state_sample
*sample
= caa_container_of(node
,
1755 struct channel_state_sample
,
1756 channel_state_ht_node
);
1758 cds_lfht_del(state
->channel_state_ht
, node
);
1759 call_rcu(&sample
->rcu_node
, free_channel_state_sample_rcu
);
1762 /* Remove the channel from the channels_ht and free it. */
1763 cds_lfht_lookup(state
->channels_ht
,
1764 hash_channel_key(&key
),
1768 node
= cds_lfht_iter_get_node(&iter
);
1770 channel_info
= caa_container_of(node
, struct channel_info
,
1772 cds_lfht_del(state
->channels_ht
, node
);
1773 channel_info_destroy(channel_info
);
1776 *cmd_result
= LTTNG_OK
;
1781 int handle_notification_thread_command_session_rotation(
1782 struct notification_thread_state
*state
,
1783 enum notification_thread_command_type cmd_type
,
1784 const char *session_name
, uid_t session_uid
, gid_t session_gid
,
1785 uint64_t trace_archive_chunk_id
,
1786 struct lttng_trace_archive_location
*location
,
1787 enum lttng_error_code
*_cmd_result
)
1790 enum lttng_error_code cmd_result
= LTTNG_OK
;
1791 struct lttng_session_trigger_list
*trigger_list
;
1792 struct lttng_trigger_list_element
*trigger_list_element
;
1793 struct session_info
*session_info
;
1797 session_info
= find_or_create_session_info(state
, session_name
,
1798 session_uid
, session_gid
);
1799 if (!session_info
) {
1800 /* Allocation error or an internal error occurred. */
1802 cmd_result
= LTTNG_ERR_NOMEM
;
1806 session_info
->rotation
.ongoing
=
1807 cmd_type
== NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING
;
1808 session_info
->rotation
.id
= trace_archive_chunk_id
;
1809 trigger_list
= get_session_trigger_list(state
, session_name
);
1810 if (!trigger_list
) {
1811 DBG("[notification-thread] No triggers applying to session \"%s\" found",
1816 cds_list_for_each_entry(trigger_list_element
, &trigger_list
->list
,
1818 const struct lttng_condition
*condition
;
1819 const struct lttng_action
*action
;
1820 const struct lttng_trigger
*trigger
;
1821 struct notification_client_list
*client_list
;
1822 struct lttng_evaluation
*evaluation
= NULL
;
1823 enum lttng_condition_type condition_type
;
1825 trigger
= trigger_list_element
->trigger
;
1826 condition
= lttng_trigger_get_const_condition(trigger
);
1828 condition_type
= lttng_condition_get_type(condition
);
1830 if (condition_type
== LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING
&&
1831 cmd_type
!= NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING
) {
1833 } else if (condition_type
== LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED
&&
1834 cmd_type
!= NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED
) {
1838 action
= lttng_trigger_get_const_action(trigger
);
1840 /* Notify actions are the only type currently supported. */
1841 assert(lttng_action_get_type_const(action
) ==
1842 LTTNG_ACTION_TYPE_NOTIFY
);
1844 client_list
= get_client_list_from_condition(state
, condition
);
1845 assert(client_list
);
1847 if (cds_list_empty(&client_list
->list
)) {
1849 * No clients interested in the evaluation's result,
1855 if (cmd_type
== NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING
) {
1856 evaluation
= lttng_evaluation_session_rotation_ongoing_create(
1857 trace_archive_chunk_id
);
1859 evaluation
= lttng_evaluation_session_rotation_completed_create(
1860 trace_archive_chunk_id
, location
);
1864 /* Internal error */
1866 cmd_result
= LTTNG_ERR_UNK
;
1870 /* Dispatch evaluation result to all clients. */
1871 ret
= send_evaluation_to_clients(trigger_list_element
->trigger
,
1872 evaluation
, client_list
, state
,
1875 lttng_evaluation_destroy(evaluation
);
1876 if (caa_unlikely(ret
)) {
1881 session_info_put(session_info
);
1882 *_cmd_result
= cmd_result
;
1888 int condition_is_supported(struct lttng_condition
*condition
)
1892 switch (lttng_condition_get_type(condition
)) {
1893 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW
:
1894 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH
:
1896 enum lttng_domain_type domain
;
1898 ret
= lttng_condition_buffer_usage_get_domain_type(condition
,
1905 if (domain
!= LTTNG_DOMAIN_KERNEL
) {
1911 * Older kernel tracers don't expose the API to monitor their
1912 * buffers. Therefore, we reject triggers that require that
1913 * mechanism to be available to be evaluated.
1915 ret
= kernel_supports_ring_buffer_snapshot_sample_positions();
1925 /* Must be called with RCU read lock held. */
1927 int bind_trigger_to_matching_session(const struct lttng_trigger
*trigger
,
1928 struct notification_thread_state
*state
)
1931 const struct lttng_condition
*condition
;
1932 const char *session_name
;
1933 struct lttng_session_trigger_list
*trigger_list
;
1935 condition
= lttng_trigger_get_const_condition(trigger
);
1936 switch (lttng_condition_get_type(condition
)) {
1937 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING
:
1938 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED
:
1940 enum lttng_condition_status status
;
1942 status
= lttng_condition_session_rotation_get_session_name(
1943 condition
, &session_name
);
1944 if (status
!= LTTNG_CONDITION_STATUS_OK
) {
1945 ERR("[notification-thread] Failed to bind trigger to session: unable to get 'session_rotation' condition's session name");
1956 trigger_list
= get_session_trigger_list(state
, session_name
);
1957 if (!trigger_list
) {
1958 DBG("[notification-thread] Unable to bind trigger applying to session \"%s\" as it is not yet known to the notification system",
1964 DBG("[notification-thread] Newly registered trigger bound to session \"%s\"",
1966 ret
= lttng_session_trigger_list_add(trigger_list
, trigger
);
1971 /* Must be called with RCU read lock held. */
1973 int bind_trigger_to_matching_channels(const struct lttng_trigger
*trigger
,
1974 struct notification_thread_state
*state
)
1977 struct cds_lfht_node
*node
;
1978 struct cds_lfht_iter iter
;
1979 struct channel_info
*channel
;
1981 cds_lfht_for_each_entry(state
->channels_ht
, &iter
, channel
,
1983 struct lttng_trigger_list_element
*trigger_list_element
;
1984 struct lttng_channel_trigger_list
*trigger_list
;
1985 struct cds_lfht_iter lookup_iter
;
1987 if (!trigger_applies_to_channel(trigger
, channel
)) {
1991 cds_lfht_lookup(state
->channel_triggers_ht
,
1992 hash_channel_key(&channel
->key
),
1993 match_channel_trigger_list
,
1996 node
= cds_lfht_iter_get_node(&lookup_iter
);
1998 trigger_list
= caa_container_of(node
,
1999 struct lttng_channel_trigger_list
,
2000 channel_triggers_ht_node
);
2002 trigger_list_element
= zmalloc(sizeof(*trigger_list_element
));
2003 if (!trigger_list_element
) {
2007 CDS_INIT_LIST_HEAD(&trigger_list_element
->node
);
2008 trigger_list_element
->trigger
= trigger
;
2009 cds_list_add(&trigger_list_element
->node
, &trigger_list
->list
);
2010 DBG("[notification-thread] Newly registered trigger bound to channel \"%s\"",
2018 * FIXME A client's credentials are not checked when registering a trigger, nor
2019 * are they stored alongside with the trigger.
2021 * The effects of this are benign since:
2022 * - The client will succeed in registering the trigger, as it is valid,
2023 * - The trigger will, internally, be bound to the channel/session,
2024 * - The notifications will not be sent since the client's credentials
2025 * are checked against the channel at that moment.
2027 * If this function returns a non-zero value, it means something is
2028 * fundamentally broken and the whole subsystem/thread will be torn down.
2030 * If a non-fatal error occurs, just set the cmd_result to the appropriate
2034 int handle_notification_thread_command_register_trigger(
2035 struct notification_thread_state
*state
,
2036 struct lttng_trigger
*trigger
,
2037 enum lttng_error_code
*cmd_result
)
2040 struct lttng_condition
*condition
;
2041 struct notification_client
*client
;
2042 struct notification_client_list
*client_list
= NULL
;
2043 struct lttng_trigger_ht_element
*trigger_ht_element
= NULL
;
2044 struct notification_client_list_element
*client_list_element
, *tmp
;
2045 struct cds_lfht_node
*node
;
2046 struct cds_lfht_iter iter
;
2047 bool free_trigger
= true;
2051 condition
= lttng_trigger_get_condition(trigger
);
2054 ret
= condition_is_supported(condition
);
2057 } else if (ret
== 0) {
2058 *cmd_result
= LTTNG_ERR_NOT_SUPPORTED
;
2061 /* Feature is supported, continue. */
2065 trigger_ht_element
= zmalloc(sizeof(*trigger_ht_element
));
2066 if (!trigger_ht_element
) {
2071 /* Add trigger to the trigger_ht. */
2072 cds_lfht_node_init(&trigger_ht_element
->node
);
2073 trigger_ht_element
->trigger
= trigger
;
2075 node
= cds_lfht_add_unique(state
->triggers_ht
,
2076 lttng_condition_hash(condition
),
2079 &trigger_ht_element
->node
);
2080 if (node
!= &trigger_ht_element
->node
) {
2081 /* Not a fatal error, simply report it to the client. */
2082 *cmd_result
= LTTNG_ERR_TRIGGER_EXISTS
;
2083 goto error_free_ht_element
;
2087 * Ownership of the trigger and of its wrapper was transfered to
2090 trigger_ht_element
= NULL
;
2091 free_trigger
= false;
2094 * The rest only applies to triggers that have a "notify" action.
2095 * It is not skipped as this is the only action type currently
2098 client_list
= zmalloc(sizeof(*client_list
));
2101 goto error_free_ht_element
;
2103 cds_lfht_node_init(&client_list
->notification_trigger_ht_node
);
2104 CDS_INIT_LIST_HEAD(&client_list
->list
);
2105 client_list
->trigger
= trigger
;
2107 /* Build a list of clients to which this new trigger applies. */
2108 cds_lfht_for_each_entry(state
->client_socket_ht
, &iter
, client
,
2109 client_socket_ht_node
) {
2110 if (!trigger_applies_to_client(trigger
, client
)) {
2114 client_list_element
= zmalloc(sizeof(*client_list_element
));
2115 if (!client_list_element
) {
2117 goto error_free_client_list
;
2119 CDS_INIT_LIST_HEAD(&client_list_element
->node
);
2120 client_list_element
->client
= client
;
2121 cds_list_add(&client_list_element
->node
, &client_list
->list
);
2124 cds_lfht_add(state
->notification_trigger_clients_ht
,
2125 lttng_condition_hash(condition
),
2126 &client_list
->notification_trigger_ht_node
);
2128 switch (get_condition_binding_object(condition
)) {
2129 case LTTNG_OBJECT_TYPE_SESSION
:
2130 /* Add the trigger to the list if it matches a known session. */
2131 ret
= bind_trigger_to_matching_session(trigger
, state
);
2133 goto error_free_client_list
;
2136 case LTTNG_OBJECT_TYPE_CHANNEL
:
2138 * Add the trigger to list of triggers bound to the channels
2141 ret
= bind_trigger_to_matching_channels(trigger
, state
);
2143 goto error_free_client_list
;
2146 case LTTNG_OBJECT_TYPE_NONE
:
2149 ERR("[notification-thread] Unknown object type on which to bind a newly registered trigger was encountered");
2151 goto error_free_client_list
;
2155 * Since there is nothing preventing clients from subscribing to a
2156 * condition before the corresponding trigger is registered, we have
2157 * to evaluate this new condition right away.
2159 * At some point, we were waiting for the next "evaluation" (e.g. on
2160 * reception of a channel sample) to evaluate this new condition, but
2163 * The reason it was broken is that waiting for the next sample
2164 * does not allow us to properly handle transitions for edge-triggered
2167 * Consider this example: when we handle a new channel sample, we
2168 * evaluate each conditions twice: once with the previous state, and
2169 * again with the newest state. We then use those two results to
2170 * determine whether a state change happened: a condition was false and
2171 * became true. If a state change happened, we have to notify clients.
2173 * Now, if a client subscribes to a given notification and registers
2174 * a trigger *after* that subscription, we have to make sure the
2175 * condition is evaluated at this point while considering only the
2176 * current state. Otherwise, the next evaluation cycle may only see
2177 * that the evaluations remain the same (true for samples n-1 and n) and
2178 * the client will never know that the condition has been met.
2180 cds_list_for_each_entry_safe(client_list_element
, tmp
,
2181 &client_list
->list
, node
) {
2182 ret
= evaluate_condition_for_client(trigger
, condition
,
2183 client_list_element
->client
, state
);
2185 goto error_free_client_list
;
2190 * Client list ownership transferred to the
2191 * notification_trigger_clients_ht.
2195 *cmd_result
= LTTNG_OK
;
2196 error_free_client_list
:
2198 cds_list_for_each_entry_safe(client_list_element
, tmp
,
2199 &client_list
->list
, node
) {
2200 free(client_list_element
);
2204 error_free_ht_element
:
2205 free(trigger_ht_element
);
2208 lttng_trigger_destroy(trigger
);
2215 void free_notification_client_list_rcu(struct rcu_head
*node
)
2217 free(caa_container_of(node
, struct notification_client_list
,
2222 void free_lttng_trigger_ht_element_rcu(struct rcu_head
*node
)
2224 free(caa_container_of(node
, struct lttng_trigger_ht_element
,
2229 int handle_notification_thread_command_unregister_trigger(
2230 struct notification_thread_state
*state
,
2231 struct lttng_trigger
*trigger
,
2232 enum lttng_error_code
*_cmd_reply
)
2234 struct cds_lfht_iter iter
;
2235 struct cds_lfht_node
*triggers_ht_node
;
2236 struct lttng_channel_trigger_list
*trigger_list
;
2237 struct notification_client_list
*client_list
;
2238 struct notification_client_list_element
*client_list_element
, *tmp
;
2239 struct lttng_trigger_ht_element
*trigger_ht_element
= NULL
;
2240 struct lttng_condition
*condition
= lttng_trigger_get_condition(
2242 enum lttng_error_code cmd_reply
;
2246 cds_lfht_lookup(state
->triggers_ht
,
2247 lttng_condition_hash(condition
),
2251 triggers_ht_node
= cds_lfht_iter_get_node(&iter
);
2252 if (!triggers_ht_node
) {
2253 cmd_reply
= LTTNG_ERR_TRIGGER_NOT_FOUND
;
2256 cmd_reply
= LTTNG_OK
;
2259 /* Remove trigger from channel_triggers_ht. */
2260 cds_lfht_for_each_entry(state
->channel_triggers_ht
, &iter
, trigger_list
,
2261 channel_triggers_ht_node
) {
2262 struct lttng_trigger_list_element
*trigger_element
, *tmp
;
2264 cds_list_for_each_entry_safe(trigger_element
, tmp
,
2265 &trigger_list
->list
, node
) {
2266 const struct lttng_condition
*current_condition
=
2267 lttng_trigger_get_const_condition(
2268 trigger_element
->trigger
);
2270 assert(current_condition
);
2271 if (!lttng_condition_is_equal(condition
,
2272 current_condition
)) {
2276 DBG("[notification-thread] Removed trigger from channel_triggers_ht");
2277 cds_list_del(&trigger_element
->node
);
2278 /* A trigger can only appear once per channel */
2284 * Remove and release the client list from
2285 * notification_trigger_clients_ht.
2287 client_list
= get_client_list_from_condition(state
, condition
);
2288 assert(client_list
);
2290 cds_list_for_each_entry_safe(client_list_element
, tmp
,
2291 &client_list
->list
, node
) {
2292 free(client_list_element
);
2294 cds_lfht_del(state
->notification_trigger_clients_ht
,
2295 &client_list
->notification_trigger_ht_node
);
2296 call_rcu(&client_list
->rcu_node
, free_notification_client_list_rcu
);
2298 /* Remove trigger from triggers_ht. */
2299 trigger_ht_element
= caa_container_of(triggers_ht_node
,
2300 struct lttng_trigger_ht_element
, node
);
2301 cds_lfht_del(state
->triggers_ht
, triggers_ht_node
);
2303 /* Release the ownership of the trigger. */
2304 lttng_trigger_destroy(trigger_ht_element
->trigger
);
2305 call_rcu(&trigger_ht_element
->rcu_node
, free_lttng_trigger_ht_element_rcu
);
2309 *_cmd_reply
= cmd_reply
;
2314 /* Returns 0 on success, 1 on exit requested, negative value on error. */
2315 int handle_notification_thread_command(
2316 struct notification_thread_handle
*handle
,
2317 struct notification_thread_state
*state
)
2321 struct notification_thread_command
*cmd
;
2323 /* Read the event pipe to put it back into a quiescent state. */
2324 ret
= lttng_read(lttng_pipe_get_readfd(handle
->cmd_queue
.event_pipe
), &counter
,
2326 if (ret
!= sizeof(counter
)) {
2330 pthread_mutex_lock(&handle
->cmd_queue
.lock
);
2331 cmd
= cds_list_first_entry(&handle
->cmd_queue
.list
,
2332 struct notification_thread_command
, cmd_list_node
);
2333 switch (cmd
->type
) {
2334 case NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER
:
2335 DBG("[notification-thread] Received register trigger command");
2336 ret
= handle_notification_thread_command_register_trigger(
2337 state
, cmd
->parameters
.trigger
,
2340 case NOTIFICATION_COMMAND_TYPE_UNREGISTER_TRIGGER
:
2341 DBG("[notification-thread] Received unregister trigger command");
2342 ret
= handle_notification_thread_command_unregister_trigger(
2343 state
, cmd
->parameters
.trigger
,
2346 case NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL
:
2347 DBG("[notification-thread] Received add channel command");
2348 ret
= handle_notification_thread_command_add_channel(
2350 cmd
->parameters
.add_channel
.session
.name
,
2351 cmd
->parameters
.add_channel
.session
.uid
,
2352 cmd
->parameters
.add_channel
.session
.gid
,
2353 cmd
->parameters
.add_channel
.channel
.name
,
2354 cmd
->parameters
.add_channel
.channel
.domain
,
2355 cmd
->parameters
.add_channel
.channel
.key
,
2356 cmd
->parameters
.add_channel
.channel
.capacity
,
2359 case NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL
:
2360 DBG("[notification-thread] Received remove channel command");
2361 ret
= handle_notification_thread_command_remove_channel(
2362 state
, cmd
->parameters
.remove_channel
.key
,
2363 cmd
->parameters
.remove_channel
.domain
,
2366 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING
:
2367 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED
:
2368 DBG("[notification-thread] Received session rotation %s command",
2369 cmd
->type
== NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING
?
2370 "ongoing" : "completed");
2371 ret
= handle_notification_thread_command_session_rotation(
2374 cmd
->parameters
.session_rotation
.session_name
,
2375 cmd
->parameters
.session_rotation
.uid
,
2376 cmd
->parameters
.session_rotation
.gid
,
2377 cmd
->parameters
.session_rotation
.trace_archive_chunk_id
,
2378 cmd
->parameters
.session_rotation
.location
,
2381 case NOTIFICATION_COMMAND_TYPE_QUIT
:
2382 DBG("[notification-thread] Received quit command");
2383 cmd
->reply_code
= LTTNG_OK
;
2387 ERR("[notification-thread] Unknown internal command received");
2395 cds_list_del(&cmd
->cmd_list_node
);
2396 lttng_waiter_wake_up(&cmd
->reply_waiter
);
2397 pthread_mutex_unlock(&handle
->cmd_queue
.lock
);
2400 /* Wake-up and return a fatal error to the calling thread. */
2401 lttng_waiter_wake_up(&cmd
->reply_waiter
);
2402 pthread_mutex_unlock(&handle
->cmd_queue
.lock
);
2403 cmd
->reply_code
= LTTNG_ERR_FATAL
;
2405 /* Indicate a fatal error to the caller. */
2410 int socket_set_non_blocking(int socket
)
2414 /* Set the pipe as non-blocking. */
2415 ret
= fcntl(socket
, F_GETFL
, 0);
2417 PERROR("fcntl get socket flags");
2422 ret
= fcntl(socket
, F_SETFL
, flags
| O_NONBLOCK
);
2424 PERROR("fcntl set O_NONBLOCK socket flag");
2427 DBG("Client socket (fd = %i) set as non-blocking", socket
);
2433 int client_reset_inbound_state(struct notification_client
*client
)
2437 ret
= lttng_dynamic_buffer_set_size(
2438 &client
->communication
.inbound
.buffer
, 0);
2441 client
->communication
.inbound
.bytes_to_receive
=
2442 sizeof(struct lttng_notification_channel_message
);
2443 client
->communication
.inbound
.msg_type
=
2444 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN
;
2445 LTTNG_SOCK_SET_UID_CRED(&client
->communication
.inbound
.creds
, -1);
2446 LTTNG_SOCK_SET_GID_CRED(&client
->communication
.inbound
.creds
, -1);
2447 ret
= lttng_dynamic_buffer_set_size(
2448 &client
->communication
.inbound
.buffer
,
2449 client
->communication
.inbound
.bytes_to_receive
);
2453 int handle_notification_thread_client_connect(
2454 struct notification_thread_state
*state
)
2457 struct notification_client
*client
;
2459 DBG("[notification-thread] Handling new notification channel client connection");
2461 client
= zmalloc(sizeof(*client
));
2467 client
->id
= state
->next_notification_client_id
++;
2468 CDS_INIT_LIST_HEAD(&client
->condition_list
);
2469 lttng_dynamic_buffer_init(&client
->communication
.inbound
.buffer
);
2470 lttng_dynamic_buffer_init(&client
->communication
.outbound
.buffer
);
2471 client
->communication
.inbound
.expect_creds
= true;
2472 ret
= client_reset_inbound_state(client
);
2474 ERR("[notification-thread] Failed to reset client communication's inbound state");
2479 ret
= lttcomm_accept_unix_sock(state
->notification_channel_socket
);
2481 ERR("[notification-thread] Failed to accept new notification channel client connection");
2486 client
->socket
= ret
;
2488 ret
= socket_set_non_blocking(client
->socket
);
2490 ERR("[notification-thread] Failed to set new notification channel client connection socket as non-blocking");
2494 ret
= lttcomm_setsockopt_creds_unix_sock(client
->socket
);
2496 ERR("[notification-thread] Failed to set socket options on new notification channel client socket");
2501 ret
= lttng_poll_add(&state
->events
, client
->socket
,
2502 LPOLLIN
| LPOLLERR
|
2503 LPOLLHUP
| LPOLLRDHUP
);
2505 ERR("[notification-thread] Failed to add notification channel client socket to poll set");
2509 DBG("[notification-thread] Added new notification channel client socket (%i) to poll set",
2513 cds_lfht_add(state
->client_socket_ht
,
2514 hash_client_socket(client
->socket
),
2515 &client
->client_socket_ht_node
);
2516 cds_lfht_add(state
->client_id_ht
,
2517 hash_client_id(client
->id
),
2518 &client
->client_id_ht_node
);
2523 notification_client_destroy(client
, state
);
2527 int handle_notification_thread_client_disconnect(
2529 struct notification_thread_state
*state
)
2532 struct notification_client
*client
;
2535 DBG("[notification-thread] Closing client connection (socket fd = %i)",
2537 client
= get_client_from_socket(client_socket
, state
);
2539 /* Internal state corruption, fatal error. */
2540 ERR("[notification-thread] Unable to find client (socket fd = %i)",
2546 ret
= lttng_poll_del(&state
->events
, client_socket
);
2548 ERR("[notification-thread] Failed to remove client socket from poll set");
2550 cds_lfht_del(state
->client_socket_ht
,
2551 &client
->client_socket_ht_node
);
2552 cds_lfht_del(state
->client_id_ht
,
2553 &client
->client_id_ht_node
);
2554 notification_client_destroy(client
, state
);
2560 int handle_notification_thread_client_disconnect_all(
2561 struct notification_thread_state
*state
)
2563 struct cds_lfht_iter iter
;
2564 struct notification_client
*client
;
2565 bool error_encoutered
= false;
2568 DBG("[notification-thread] Closing all client connections");
2569 cds_lfht_for_each_entry(state
->client_socket_ht
, &iter
, client
,
2570 client_socket_ht_node
) {
2573 ret
= handle_notification_thread_client_disconnect(
2574 client
->socket
, state
);
2576 error_encoutered
= true;
2580 return error_encoutered
? 1 : 0;
2583 int handle_notification_thread_trigger_unregister_all(
2584 struct notification_thread_state
*state
)
2586 bool error_occurred
= false;
2587 struct cds_lfht_iter iter
;
2588 struct lttng_trigger_ht_element
*trigger_ht_element
;
2591 cds_lfht_for_each_entry(state
->triggers_ht
, &iter
, trigger_ht_element
,
2593 int ret
= handle_notification_thread_command_unregister_trigger(
2594 state
, trigger_ht_element
->trigger
, NULL
);
2596 error_occurred
= true;
2600 return error_occurred
? -1 : 0;
2604 int client_flush_outgoing_queue(struct notification_client
*client
,
2605 struct notification_thread_state
*state
)
2608 size_t to_send_count
;
2610 assert(client
->communication
.outbound
.buffer
.size
!= 0);
2611 to_send_count
= client
->communication
.outbound
.buffer
.size
;
2612 DBG("[notification-thread] Flushing client (socket fd = %i) outgoing queue",
2615 ret
= lttcomm_send_unix_sock_non_block(client
->socket
,
2616 client
->communication
.outbound
.buffer
.data
,
2618 if ((ret
>= 0 && ret
< to_send_count
)) {
2619 DBG("[notification-thread] Client (socket fd = %i) outgoing queue could not be completely flushed",
2621 to_send_count
-= max(ret
, 0);
2623 memcpy(client
->communication
.outbound
.buffer
.data
,
2624 client
->communication
.outbound
.buffer
.data
+
2625 client
->communication
.outbound
.buffer
.size
- to_send_count
,
2627 ret
= lttng_dynamic_buffer_set_size(
2628 &client
->communication
.outbound
.buffer
,
2635 * We want to be notified whenever there is buffer space
2636 * available to send the rest of the payload.
2638 ret
= lttng_poll_mod(&state
->events
, client
->socket
,
2639 CLIENT_POLL_MASK_IN_OUT
);
2643 } else if (ret
< 0) {
2644 /* Generic error, disconnect the client. */
2645 ERR("[notification-thread] Failed to send flush outgoing queue, disconnecting client (socket fd = %i)",
2647 ret
= handle_notification_thread_client_disconnect(
2648 client
->socket
, state
);
2653 /* No error and flushed the queue completely. */
2654 ret
= lttng_dynamic_buffer_set_size(
2655 &client
->communication
.outbound
.buffer
, 0);
2659 ret
= lttng_poll_mod(&state
->events
, client
->socket
,
2660 CLIENT_POLL_MASK_IN
);
2665 client
->communication
.outbound
.queued_command_reply
= false;
2666 client
->communication
.outbound
.dropped_notification
= false;
2675 int client_send_command_reply(struct notification_client
*client
,
2676 struct notification_thread_state
*state
,
2677 enum lttng_notification_channel_status status
)
2680 struct lttng_notification_channel_command_reply reply
= {
2681 .status
= (int8_t) status
,
2683 struct lttng_notification_channel_message msg
= {
2684 .type
= (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY
,
2685 .size
= sizeof(reply
),
2687 char buffer
[sizeof(msg
) + sizeof(reply
)];
2689 if (client
->communication
.outbound
.queued_command_reply
) {
2690 /* Protocol error. */
2694 memcpy(buffer
, &msg
, sizeof(msg
));
2695 memcpy(buffer
+ sizeof(msg
), &reply
, sizeof(reply
));
2696 DBG("[notification-thread] Send command reply (%i)", (int) status
);
2698 /* Enqueue buffer to outgoing queue and flush it. */
2699 ret
= lttng_dynamic_buffer_append(
2700 &client
->communication
.outbound
.buffer
,
2701 buffer
, sizeof(buffer
));
2706 ret
= client_flush_outgoing_queue(client
, state
);
2711 if (client
->communication
.outbound
.buffer
.size
!= 0) {
2712 /* Queue could not be emptied. */
2713 client
->communication
.outbound
.queued_command_reply
= true;
2722 int client_dispatch_message(struct notification_client
*client
,
2723 struct notification_thread_state
*state
)
2727 if (client
->communication
.inbound
.msg_type
!=
2728 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE
&&
2729 client
->communication
.inbound
.msg_type
!=
2730 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN
&&
2731 !client
->validated
) {
2732 WARN("[notification-thread] client attempted a command before handshake");
2737 switch (client
->communication
.inbound
.msg_type
) {
2738 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN
:
2741 * Receiving message header. The function will be called again
2742 * once the rest of the message as been received and can be
2745 const struct lttng_notification_channel_message
*msg
;
2747 assert(sizeof(*msg
) ==
2748 client
->communication
.inbound
.buffer
.size
);
2749 msg
= (const struct lttng_notification_channel_message
*)
2750 client
->communication
.inbound
.buffer
.data
;
2752 if (msg
->size
== 0 || msg
->size
> DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE
) {
2753 ERR("[notification-thread] Invalid notification channel message: length = %u", msg
->size
);
2758 switch (msg
->type
) {
2759 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE
:
2760 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE
:
2761 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE
:
2765 ERR("[notification-thread] Invalid notification channel message: unexpected message type");
2769 client
->communication
.inbound
.bytes_to_receive
= msg
->size
;
2770 client
->communication
.inbound
.msg_type
=
2771 (enum lttng_notification_channel_message_type
) msg
->type
;
2772 ret
= lttng_dynamic_buffer_set_size(
2773 &client
->communication
.inbound
.buffer
, msg
->size
);
2779 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE
:
2781 struct lttng_notification_channel_command_handshake
*handshake_client
;
2782 struct lttng_notification_channel_command_handshake handshake_reply
= {
2783 .major
= LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR
,
2784 .minor
= LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR
,
2786 struct lttng_notification_channel_message msg_header
= {
2787 .type
= LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE
,
2788 .size
= sizeof(handshake_reply
),
2790 enum lttng_notification_channel_status status
=
2791 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
2792 char send_buffer
[sizeof(msg_header
) + sizeof(handshake_reply
)];
2794 memcpy(send_buffer
, &msg_header
, sizeof(msg_header
));
2795 memcpy(send_buffer
+ sizeof(msg_header
), &handshake_reply
,
2796 sizeof(handshake_reply
));
2799 (struct lttng_notification_channel_command_handshake
*)
2800 client
->communication
.inbound
.buffer
.data
;
2801 client
->major
= handshake_client
->major
;
2802 client
->minor
= handshake_client
->minor
;
2803 if (!client
->communication
.inbound
.creds_received
) {
2804 ERR("[notification-thread] No credentials received from client");
2809 client
->uid
= LTTNG_SOCK_GET_UID_CRED(
2810 &client
->communication
.inbound
.creds
);
2811 client
->gid
= LTTNG_SOCK_GET_GID_CRED(
2812 &client
->communication
.inbound
.creds
);
2813 DBG("[notification-thread] Received handshake from client (uid = %u, gid = %u) with version %i.%i",
2814 client
->uid
, client
->gid
, (int) client
->major
,
2815 (int) client
->minor
);
2817 if (handshake_client
->major
!= LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR
) {
2818 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_UNSUPPORTED_VERSION
;
2821 ret
= lttng_dynamic_buffer_append(&client
->communication
.outbound
.buffer
,
2822 send_buffer
, sizeof(send_buffer
));
2824 ERR("[notification-thread] Failed to send protocol version to notification channel client");
2828 ret
= client_flush_outgoing_queue(client
, state
);
2833 ret
= client_send_command_reply(client
, state
, status
);
2835 ERR("[notification-thread] Failed to send reply to notification channel client");
2839 /* Set reception state to receive the next message header. */
2840 ret
= client_reset_inbound_state(client
);
2842 ERR("[notification-thread] Failed to reset client communication's inbound state");
2845 client
->validated
= true;
2846 client
->communication
.active
= true;
2849 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE
:
2850 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE
:
2852 struct lttng_condition
*condition
;
2853 enum lttng_notification_channel_status status
=
2854 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
2855 struct lttng_payload_view condition_view
=
2856 lttng_payload_view_from_dynamic_buffer(
2857 &client
->communication
.inbound
.buffer
,
2859 size_t expected_condition_size
=
2860 client
->communication
.inbound
.buffer
.size
;
2862 ret
= lttng_condition_create_from_payload(&condition_view
,
2864 if (ret
!= expected_condition_size
) {
2865 ERR("[notification-thread] Malformed condition received from client");
2869 if (client
->communication
.inbound
.msg_type
==
2870 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE
) {
2871 ret
= notification_thread_client_subscribe(client
,
2872 condition
, state
, &status
);
2874 ret
= notification_thread_client_unsubscribe(client
,
2875 condition
, state
, &status
);
2881 ret
= client_send_command_reply(client
, state
, status
);
2883 ERR("[notification-thread] Failed to send reply to notification channel client");
2887 /* Set reception state to receive the next message header. */
2888 ret
= client_reset_inbound_state(client
);
2890 ERR("[notification-thread] Failed to reset client communication's inbound state");
2902 /* Incoming data from client. */
2903 int handle_notification_thread_client_in(
2904 struct notification_thread_state
*state
, int socket
)
2907 struct notification_client
*client
;
2911 client
= get_client_from_socket(socket
, state
);
2913 /* Internal error, abort. */
2918 offset
= client
->communication
.inbound
.buffer
.size
-
2919 client
->communication
.inbound
.bytes_to_receive
;
2920 if (client
->communication
.inbound
.expect_creds
) {
2921 recv_ret
= lttcomm_recv_creds_unix_sock(socket
,
2922 client
->communication
.inbound
.buffer
.data
+ offset
,
2923 client
->communication
.inbound
.bytes_to_receive
,
2924 &client
->communication
.inbound
.creds
);
2926 client
->communication
.inbound
.expect_creds
= false;
2927 client
->communication
.inbound
.creds_received
= true;
2930 recv_ret
= lttcomm_recv_unix_sock_non_block(socket
,
2931 client
->communication
.inbound
.buffer
.data
+ offset
,
2932 client
->communication
.inbound
.bytes_to_receive
);
2935 goto error_disconnect_client
;
2938 client
->communication
.inbound
.bytes_to_receive
-= recv_ret
;
2939 if (client
->communication
.inbound
.bytes_to_receive
== 0) {
2940 ret
= client_dispatch_message(client
, state
);
2943 * Only returns an error if this client must be
2946 goto error_disconnect_client
;
2953 error_disconnect_client
:
2954 ret
= handle_notification_thread_client_disconnect(socket
, state
);
2958 /* Client ready to receive outgoing data. */
2959 int handle_notification_thread_client_out(
2960 struct notification_thread_state
*state
, int socket
)
2963 struct notification_client
*client
;
2965 client
= get_client_from_socket(socket
, state
);
2967 /* Internal error, abort. */
2972 ret
= client_flush_outgoing_queue(client
, state
);
2981 bool evaluate_buffer_usage_condition(const struct lttng_condition
*condition
,
2982 const struct channel_state_sample
*sample
,
2983 uint64_t buffer_capacity
)
2985 bool result
= false;
2987 enum lttng_condition_type condition_type
;
2988 const struct lttng_condition_buffer_usage
*use_condition
= container_of(
2989 condition
, struct lttng_condition_buffer_usage
,
2992 if (use_condition
->threshold_bytes
.set
) {
2993 threshold
= use_condition
->threshold_bytes
.value
;
2996 * Threshold was expressed as a ratio.
2998 * TODO the threshold (in bytes) of conditions expressed
2999 * as a ratio of total buffer size could be cached to
3000 * forego this double-multiplication or it could be performed
3001 * as fixed-point math.
3003 * Note that caching should accommodates the case where the
3004 * condition applies to multiple channels (i.e. don't assume
3005 * that all channels matching my_chann* have the same size...)
3007 threshold
= (uint64_t) (use_condition
->threshold_ratio
.value
*
3008 (double) buffer_capacity
);
3011 condition_type
= lttng_condition_get_type(condition
);
3012 if (condition_type
== LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW
) {
3013 DBG("[notification-thread] Low buffer usage condition being evaluated: threshold = %" PRIu64
", highest usage = %" PRIu64
,
3014 threshold
, sample
->highest_usage
);
3017 * The low condition should only be triggered once _all_ of the
3018 * streams in a channel have gone below the "low" threshold.
3020 if (sample
->highest_usage
<= threshold
) {
3024 DBG("[notification-thread] High buffer usage condition being evaluated: threshold = %" PRIu64
", highest usage = %" PRIu64
,