2 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
4 * SPDX-License-Identifier: LGPL-2.1-only
8 #include <lttng/notification/notification-internal.h>
9 #include <lttng/notification/channel-internal.h>
10 #include <lttng/condition/condition-internal.h>
11 #include <lttng/endpoint.h>
12 #include <common/defaults.h>
13 #include <common/error.h>
14 #include <common/dynamic-buffer.h>
15 #include <common/utils.h>
16 #include <common/defaults.h>
18 #include "lttng-ctl-helper.h"
19 #include <common/compat/poll.h>
22 int handshake(struct lttng_notification_channel
*channel
);
25 * Populates the reception buffer with the next complete message.
26 * The caller must acquire the channel's lock.
29 int receive_message(struct lttng_notification_channel
*channel
)
32 struct lttng_notification_channel_message msg
;
34 if (lttng_dynamic_buffer_set_size(&channel
->reception_buffer
, 0)) {
39 ret
= lttcomm_recv_unix_sock(channel
->socket
, &msg
, sizeof(msg
));
45 if (msg
.size
> DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE
) {
50 /* Add message header at buffer's start. */
51 ret
= lttng_dynamic_buffer_append(&channel
->reception_buffer
, &msg
,
61 /* Reserve space for the payload. */
62 ret
= lttng_dynamic_buffer_set_size(&channel
->reception_buffer
,
63 channel
->reception_buffer
.size
+ msg
.size
);
68 /* Receive message payload. */
69 ret
= lttcomm_recv_unix_sock(channel
->socket
,
70 channel
->reception_buffer
.data
+ sizeof(msg
), msg
.size
);
71 if (ret
< (ssize_t
) msg
.size
) {
81 if (lttng_dynamic_buffer_set_size(&channel
->reception_buffer
, 0)) {
88 enum lttng_notification_channel_message_type
get_current_message_type(
89 struct lttng_notification_channel
*channel
)
91 struct lttng_notification_channel_message
*msg
;
93 assert(channel
->reception_buffer
.size
>= sizeof(*msg
));
95 msg
= (struct lttng_notification_channel_message
*)
96 channel
->reception_buffer
.data
;
97 return (enum lttng_notification_channel_message_type
) msg
->type
;
101 struct lttng_notification
*create_notification_from_current_message(
102 struct lttng_notification_channel
*channel
)
105 struct lttng_notification
*notification
= NULL
;
106 struct lttng_buffer_view view
;
108 if (channel
->reception_buffer
.size
<=
109 sizeof(struct lttng_notification_channel_message
)) {
113 view
= lttng_buffer_view_from_dynamic_buffer(&channel
->reception_buffer
,
114 sizeof(struct lttng_notification_channel_message
), -1);
116 ret
= lttng_notification_create_from_buffer(&view
, ¬ification
);
117 if (ret
!= channel
->reception_buffer
.size
-
118 sizeof(struct lttng_notification_channel_message
)) {
119 lttng_notification_destroy(notification
);
127 struct lttng_notification_channel
*lttng_notification_channel_create(
128 struct lttng_endpoint
*endpoint
)
131 bool is_in_tracing_group
= false, is_root
= false;
132 char *sock_path
= NULL
;
133 struct lttng_notification_channel
*channel
= NULL
;
136 endpoint
!= lttng_session_daemon_notification_endpoint
) {
140 sock_path
= zmalloc(LTTNG_PATH_MAX
);
145 channel
= zmalloc(sizeof(struct lttng_notification_channel
));
149 channel
->socket
= -1;
150 pthread_mutex_init(&channel
->lock
, NULL
);
151 lttng_dynamic_buffer_init(&channel
->reception_buffer
);
152 CDS_INIT_LIST_HEAD(&channel
->pending_notifications
.list
);
154 is_root
= (getuid() == 0);
156 is_in_tracing_group
= lttng_check_tracing_group();
159 if (is_root
|| is_in_tracing_group
) {
160 ret
= lttng_strncpy(sock_path
,
161 DEFAULT_GLOBAL_NOTIFICATION_CHANNEL_UNIX_SOCK
,
164 ret
= -LTTNG_ERR_INVALID
;
168 ret
= lttcomm_connect_unix_sock(sock_path
);
175 /* Fallback to local session daemon. */
176 ret
= snprintf(sock_path
, LTTNG_PATH_MAX
,
177 DEFAULT_HOME_NOTIFICATION_CHANNEL_UNIX_SOCK
,
178 utils_get_home_dir());
179 if (ret
< 0 || ret
>= LTTNG_PATH_MAX
) {
183 ret
= lttcomm_connect_unix_sock(sock_path
);
190 channel
->socket
= fd
;
192 ret
= handshake(channel
);
200 lttng_notification_channel_destroy(channel
);
205 enum lttng_notification_channel_status
206 lttng_notification_channel_get_next_notification(
207 struct lttng_notification_channel
*channel
,
208 struct lttng_notification
**_notification
)
211 struct lttng_notification
*notification
= NULL
;
212 enum lttng_notification_channel_status status
=
213 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
214 struct lttng_poll_event events
;
216 if (!channel
|| !_notification
) {
217 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID
;
221 pthread_mutex_lock(&channel
->lock
);
223 if (channel
->pending_notifications
.count
) {
224 struct pending_notification
*pending_notification
;
226 assert(!cds_list_empty(&channel
->pending_notifications
.list
));
228 /* Deliver one of the pending notifications. */
229 pending_notification
= cds_list_first_entry(
230 &channel
->pending_notifications
.list
,
231 struct pending_notification
,
233 notification
= pending_notification
->notification
;
235 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED
;
237 cds_list_del(&pending_notification
->node
);
238 channel
->pending_notifications
.count
--;
239 free(pending_notification
);
244 * Block on interruptible epoll/poll() instead of the message reception
245 * itself as the recvmsg() wrappers always restart on EINTR. We choose
246 * to wait using interruptible epoll/poll() in order to:
247 * 1) Return if a signal occurs,
248 * 2) Not deal with partially received messages.
250 * The drawback to this approach is that we assume that messages
251 * are complete/well formed. If a message is shorter than its
252 * announced length, receive_message() will block on recvmsg()
253 * and never return (even if a signal is received).
255 ret
= lttng_poll_create(&events
, 1, LTTNG_CLOEXEC
);
257 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
260 ret
= lttng_poll_add(&events
, channel
->socket
, LPOLLIN
| LPOLLERR
);
262 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
265 ret
= lttng_poll_wait_interruptible(&events
, -1);
267 status
= (ret
== -1 && errno
== EINTR
) ?
268 LTTNG_NOTIFICATION_CHANNEL_STATUS_INTERRUPTED
:
269 LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
273 ret
= receive_message(channel
);
275 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
279 switch (get_current_message_type(channel
)) {
280 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION
:
281 notification
= create_notification_from_current_message(
284 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
288 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED
:
289 /* No payload to consume. */
290 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED
;
293 /* Protocol error. */
294 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
299 lttng_poll_clean(&events
);
301 pthread_mutex_unlock(&channel
->lock
);
302 *_notification
= notification
;
308 int enqueue_dropped_notification(
309 struct lttng_notification_channel
*channel
)
312 struct pending_notification
*pending_notification
;
313 struct cds_list_head
*last_element
=
314 channel
->pending_notifications
.list
.prev
;
316 pending_notification
= caa_container_of(last_element
,
317 struct pending_notification
, node
);
318 if (!pending_notification
->notification
) {
320 * The last enqueued notification indicates dropped
321 * notifications; there is nothing to do as we group
322 * dropped notifications together.
327 if (channel
->pending_notifications
.count
>=
328 DEFAULT_CLIENT_MAX_QUEUED_NOTIFICATIONS_COUNT
&&
329 pending_notification
->notification
) {
331 * Discard the last enqueued notification to indicate
332 * that notifications were dropped at this point.
334 lttng_notification_destroy(
335 pending_notification
->notification
);
336 pending_notification
->notification
= NULL
;
340 pending_notification
= zmalloc(sizeof(*pending_notification
));
341 if (!pending_notification
) {
345 CDS_INIT_LIST_HEAD(&pending_notification
->node
);
346 cds_list_add(&pending_notification
->node
,
347 &channel
->pending_notifications
.list
);
348 channel
->pending_notifications
.count
++;
354 int enqueue_notification_from_current_message(
355 struct lttng_notification_channel
*channel
)
358 struct lttng_notification
*notification
;
359 struct pending_notification
*pending_notification
;
361 if (channel
->pending_notifications
.count
>=
362 DEFAULT_CLIENT_MAX_QUEUED_NOTIFICATIONS_COUNT
) {
363 /* Drop the notification. */
364 ret
= enqueue_dropped_notification(channel
);
368 pending_notification
= zmalloc(sizeof(*pending_notification
));
369 if (!pending_notification
) {
373 CDS_INIT_LIST_HEAD(&pending_notification
->node
);
375 notification
= create_notification_from_current_message(channel
);
381 pending_notification
->notification
= notification
;
382 cds_list_add(&pending_notification
->node
,
383 &channel
->pending_notifications
.list
);
384 channel
->pending_notifications
.count
++;
388 free(pending_notification
);
392 enum lttng_notification_channel_status
393 lttng_notification_channel_has_pending_notification(
394 struct lttng_notification_channel
*channel
,
395 bool *_notification_pending
)
398 enum lttng_notification_channel_status status
=
399 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
400 struct lttng_poll_event events
;
402 if (!channel
|| !_notification_pending
) {
403 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID
;
407 pthread_mutex_lock(&channel
->lock
);
409 if (channel
->pending_notifications
.count
) {
410 *_notification_pending
= true;
414 if (channel
->socket
< 0) {
415 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED
;
420 * Check, without blocking, if data is available on the channel's
421 * socket. If there is data available, it is safe to read (blocking)
422 * on the socket for a message from the session daemon.
424 * Since all commands wait for the session daemon's reply before
425 * releasing the channel's lock, the protocol only allows for
426 * notifications and "notification dropped" messages to come
427 * through. If we receive a different message type, it is
428 * considered a protocol error.
430 * Note that this function is not guaranteed not to block. This
431 * will block until our peer (the session daemon) has sent a complete
432 * message if we see data available on the socket. If the peer does
433 * not respect the protocol, this may block indefinitely.
435 ret
= lttng_poll_create(&events
, 1, LTTNG_CLOEXEC
);
437 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
440 ret
= lttng_poll_add(&events
, channel
->socket
, LPOLLIN
| LPOLLERR
);
442 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
445 /* timeout = 0: return immediately. */
446 ret
= lttng_poll_wait_interruptible(&events
, 0);
448 /* No data available. */
449 *_notification_pending
= false;
451 } else if (ret
< 0) {
452 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
456 /* Data available on socket. */
457 ret
= receive_message(channel
);
459 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
463 switch (get_current_message_type(channel
)) {
464 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION
:
465 ret
= enqueue_notification_from_current_message(channel
);
469 *_notification_pending
= true;
471 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED
:
472 ret
= enqueue_dropped_notification(channel
);
476 *_notification_pending
= true;
479 /* Protocol error. */
480 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
485 lttng_poll_clean(&events
);
487 pthread_mutex_unlock(&channel
->lock
);
493 int receive_command_reply(struct lttng_notification_channel
*channel
,
494 enum lttng_notification_channel_status
*status
)
497 struct lttng_notification_channel_command_reply
*reply
;
500 enum lttng_notification_channel_message_type msg_type
;
502 ret
= receive_message(channel
);
507 msg_type
= get_current_message_type(channel
);
509 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY
:
511 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION
:
512 ret
= enqueue_notification_from_current_message(
518 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED
:
519 ret
= enqueue_dropped_notification(channel
);
524 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE
:
526 struct lttng_notification_channel_command_handshake
*handshake
;
528 handshake
= (struct lttng_notification_channel_command_handshake
*)
529 (channel
->reception_buffer
.data
+
530 sizeof(struct lttng_notification_channel_message
));
531 channel
->version
.major
= handshake
->major
;
532 channel
->version
.minor
= handshake
->minor
;
533 channel
->version
.set
= true;
543 if (channel
->reception_buffer
.size
<
544 (sizeof(struct lttng_notification_channel_message
) +
546 /* Invalid message received. */
551 reply
= (struct lttng_notification_channel_command_reply
*)
552 (channel
->reception_buffer
.data
+
553 sizeof(struct lttng_notification_channel_message
));
554 *status
= (enum lttng_notification_channel_status
) reply
->status
;
560 int handshake(struct lttng_notification_channel
*channel
)
563 enum lttng_notification_channel_status status
=
564 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
565 struct lttng_notification_channel_command_handshake handshake
= {
566 .major
= LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR
,
567 .minor
= LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR
,
569 struct lttng_notification_channel_message msg_header
= {
570 .type
= LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE
,
571 .size
= sizeof(handshake
),
573 char send_buffer
[sizeof(msg_header
) + sizeof(handshake
)];
575 memcpy(send_buffer
, &msg_header
, sizeof(msg_header
));
576 memcpy(send_buffer
+ sizeof(msg_header
), &handshake
, sizeof(handshake
));
578 pthread_mutex_lock(&channel
->lock
);
580 ret
= lttcomm_send_creds_unix_sock(channel
->socket
, send_buffer
,
581 sizeof(send_buffer
));
586 /* Receive handshake info from the sessiond. */
587 ret
= receive_command_reply(channel
, &status
);
592 if (!channel
->version
.set
) {
597 if (channel
->version
.major
!= LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR
) {
603 pthread_mutex_unlock(&channel
->lock
);
608 enum lttng_notification_channel_status
send_condition_command(
609 struct lttng_notification_channel
*channel
,
610 enum lttng_notification_channel_message_type type
,
611 const struct lttng_condition
*condition
)
615 enum lttng_notification_channel_status status
=
616 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
617 struct lttng_dynamic_buffer buffer
;
618 struct lttng_notification_channel_message cmd_header
= {
619 .type
= (int8_t) type
,
622 lttng_dynamic_buffer_init(&buffer
);
625 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID
;
629 assert(type
== LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE
||
630 type
== LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE
);
632 pthread_mutex_lock(&channel
->lock
);
633 socket
= channel
->socket
;
634 if (!lttng_condition_validate(condition
)) {
635 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID
;
639 ret
= lttng_dynamic_buffer_append(&buffer
, &cmd_header
,
642 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
646 ret
= lttng_condition_serialize(condition
, &buffer
);
648 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID
;
652 /* Update payload length. */
653 ((struct lttng_notification_channel_message
*) buffer
.data
)->size
=
654 (uint32_t) (buffer
.size
- sizeof(cmd_header
));
656 ret
= lttcomm_send_unix_sock(socket
, buffer
.data
, buffer
.size
);
658 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
662 ret
= receive_command_reply(channel
, &status
);
664 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
668 pthread_mutex_unlock(&channel
->lock
);
670 lttng_dynamic_buffer_reset(&buffer
);
674 enum lttng_notification_channel_status
lttng_notification_channel_subscribe(
675 struct lttng_notification_channel
*channel
,
676 const struct lttng_condition
*condition
)
678 return send_condition_command(channel
,
679 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE
,
683 enum lttng_notification_channel_status
lttng_notification_channel_unsubscribe(
684 struct lttng_notification_channel
*channel
,
685 const struct lttng_condition
*condition
)
687 return send_condition_command(channel
,
688 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE
,
692 void lttng_notification_channel_destroy(
693 struct lttng_notification_channel
*channel
)
699 if (channel
->socket
>= 0) {
700 (void) lttcomm_close_unix_sock(channel
->socket
);
702 pthread_mutex_destroy(&channel
->lock
);
703 lttng_dynamic_buffer_reset(&channel
->reception_buffer
);