2 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
4 * SPDX-License-Identifier: LGPL-2.1-only
8 #include <lttng/notification/notification-internal.hpp>
9 #include <lttng/notification/channel-internal.hpp>
10 #include <lttng/condition/condition-internal.hpp>
11 #include <lttng/endpoint.h>
12 #include <common/defaults.hpp>
13 #include <common/error.hpp>
14 #include <common/dynamic-buffer.hpp>
15 #include <common/utils.hpp>
16 #include <common/defaults.hpp>
17 #include <common/payload.hpp>
18 #include <common/payload-view.hpp>
19 #include <common/unix.hpp>
20 #include "lttng-ctl-helper.hpp"
21 #include <common/compat/poll.hpp>
24 int handshake(struct lttng_notification_channel
*channel
);
27 * Populates the reception buffer with the next complete message.
28 * The caller must acquire the channel's lock.
31 int receive_message(struct lttng_notification_channel
*channel
)
34 struct lttng_notification_channel_message msg
;
36 lttng_payload_clear(&channel
->reception_payload
);
38 ret
= lttcomm_recv_unix_sock(channel
->socket
, &msg
, sizeof(msg
));
44 if (msg
.size
> DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE
) {
49 /* Add message header at buffer's start. */
50 ret
= lttng_dynamic_buffer_append(&channel
->reception_payload
.buffer
, &msg
,
56 /* Reserve space for the payload. */
57 ret
= lttng_dynamic_buffer_set_size(&channel
->reception_payload
.buffer
,
58 channel
->reception_payload
.buffer
.size
+ msg
.size
);
63 /* Receive message payload. */
64 ret
= lttcomm_recv_unix_sock(channel
->socket
,
65 channel
->reception_payload
.buffer
.data
+ sizeof(msg
), msg
.size
);
66 if (ret
< (ssize_t
) msg
.size
) {
71 /* Receive message fds. */
73 ret
= lttcomm_recv_payload_fds_unix_sock(channel
->socket
,
74 msg
.fds
, &channel
->reception_payload
);
75 if (ret
< sizeof(int) * msg
.fds
) {
84 lttng_payload_clear(&channel
->reception_payload
);
89 enum lttng_notification_channel_message_type
get_current_message_type(
90 struct lttng_notification_channel
*channel
)
92 struct lttng_notification_channel_message
*msg
;
94 LTTNG_ASSERT(channel
->reception_payload
.buffer
.size
>= sizeof(*msg
));
96 msg
= (struct lttng_notification_channel_message
*)
97 channel
->reception_payload
.buffer
.data
;
98 return (enum lttng_notification_channel_message_type
) msg
->type
;
102 struct lttng_notification
*create_notification_from_current_message(
103 struct lttng_notification_channel
*channel
)
106 struct lttng_notification
*notification
= NULL
;
108 if (channel
->reception_payload
.buffer
.size
<=
109 sizeof(struct lttng_notification_channel_message
)) {
114 struct lttng_payload_view view
= lttng_payload_view_from_payload(
115 &channel
->reception_payload
,
116 sizeof(struct lttng_notification_channel_message
),
119 ret
= lttng_notification_create_from_payload(
120 &view
, ¬ification
);
123 if (ret
!= channel
->reception_payload
.buffer
.size
-
124 sizeof(struct lttng_notification_channel_message
)) {
125 lttng_notification_destroy(notification
);
133 struct lttng_notification_channel
*lttng_notification_channel_create(
134 struct lttng_endpoint
*endpoint
)
137 bool is_in_tracing_group
= false, is_root
= false;
138 char *sock_path
= NULL
;
139 struct lttng_notification_channel
*channel
= NULL
;
142 endpoint
!= lttng_session_daemon_notification_endpoint
) {
146 sock_path
= (char *) zmalloc(LTTNG_PATH_MAX
);
151 channel
= (lttng_notification_channel
*) zmalloc(sizeof(struct lttng_notification_channel
));
155 channel
->socket
= -1;
156 pthread_mutex_init(&channel
->lock
, NULL
);
157 lttng_payload_init(&channel
->reception_payload
);
158 CDS_INIT_LIST_HEAD(&channel
->pending_notifications
.list
);
160 is_root
= (getuid() == 0);
162 is_in_tracing_group
= lttng_check_tracing_group();
165 if (is_root
|| is_in_tracing_group
) {
166 ret
= lttng_strncpy(sock_path
,
167 DEFAULT_GLOBAL_NOTIFICATION_CHANNEL_UNIX_SOCK
,
170 ret
= -LTTNG_ERR_INVALID
;
174 ret
= lttcomm_connect_unix_sock(sock_path
);
181 /* Fallback to local session daemon. */
182 ret
= snprintf(sock_path
, LTTNG_PATH_MAX
,
183 DEFAULT_HOME_NOTIFICATION_CHANNEL_UNIX_SOCK
,
184 utils_get_home_dir());
185 if (ret
< 0 || ret
>= LTTNG_PATH_MAX
) {
189 ret
= lttcomm_connect_unix_sock(sock_path
);
196 channel
->socket
= fd
;
198 ret
= handshake(channel
);
206 lttng_notification_channel_destroy(channel
);
211 enum lttng_notification_channel_status
212 lttng_notification_channel_get_next_notification(
213 struct lttng_notification_channel
*channel
,
214 struct lttng_notification
**_notification
)
217 struct lttng_notification
*notification
= NULL
;
218 enum lttng_notification_channel_status status
=
219 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
220 struct lttng_poll_event events
;
222 if (!channel
|| !_notification
) {
223 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID
;
227 pthread_mutex_lock(&channel
->lock
);
229 if (channel
->pending_notifications
.count
) {
230 struct pending_notification
*pending_notification
;
232 LTTNG_ASSERT(!cds_list_empty(&channel
->pending_notifications
.list
));
234 /* Deliver one of the pending notifications. */
235 pending_notification
= cds_list_first_entry(
236 &channel
->pending_notifications
.list
,
237 struct pending_notification
,
239 notification
= pending_notification
->notification
;
241 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED
;
243 cds_list_del(&pending_notification
->node
);
244 channel
->pending_notifications
.count
--;
245 free(pending_notification
);
250 * Block on interruptible epoll/poll() instead of the message reception
251 * itself as the recvmsg() wrappers always restart on EINTR. We choose
252 * to wait using interruptible epoll/poll() in order to:
253 * 1) Return if a signal occurs,
254 * 2) Not deal with partially received messages.
256 * The drawback to this approach is that we assume that messages
257 * are complete/well formed. If a message is shorter than its
258 * announced length, receive_message() will block on recvmsg()
259 * and never return (even if a signal is received).
261 ret
= lttng_poll_create(&events
, 1, LTTNG_CLOEXEC
);
263 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
266 ret
= lttng_poll_add(&events
, channel
->socket
, LPOLLIN
| LPOLLERR
);
268 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
271 ret
= lttng_poll_wait_interruptible(&events
, -1);
273 status
= (ret
== -1 && errno
== EINTR
) ?
274 LTTNG_NOTIFICATION_CHANNEL_STATUS_INTERRUPTED
:
275 LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
279 ret
= receive_message(channel
);
281 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
285 switch (get_current_message_type(channel
)) {
286 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION
:
287 notification
= create_notification_from_current_message(
290 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
294 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED
:
295 /* No payload to consume. */
296 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED
;
299 /* Protocol error. */
300 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
305 lttng_poll_clean(&events
);
307 pthread_mutex_unlock(&channel
->lock
);
308 *_notification
= notification
;
314 int enqueue_dropped_notification(
315 struct lttng_notification_channel
*channel
)
318 struct pending_notification
*pending_notification
;
319 struct cds_list_head
*last_element
=
320 channel
->pending_notifications
.list
.prev
;
322 pending_notification
= caa_container_of(last_element
,
323 struct pending_notification
, node
);
324 if (!pending_notification
->notification
) {
326 * The last enqueued notification indicates dropped
327 * notifications; there is nothing to do as we group
328 * dropped notifications together.
333 if (channel
->pending_notifications
.count
>=
334 DEFAULT_CLIENT_MAX_QUEUED_NOTIFICATIONS_COUNT
&&
335 pending_notification
->notification
) {
337 * Discard the last enqueued notification to indicate
338 * that notifications were dropped at this point.
340 lttng_notification_destroy(
341 pending_notification
->notification
);
342 pending_notification
->notification
= NULL
;
346 pending_notification
= (struct pending_notification
*) zmalloc(sizeof(*pending_notification
));
347 if (!pending_notification
) {
351 CDS_INIT_LIST_HEAD(&pending_notification
->node
);
352 cds_list_add(&pending_notification
->node
,
353 &channel
->pending_notifications
.list
);
354 channel
->pending_notifications
.count
++;
360 int enqueue_notification_from_current_message(
361 struct lttng_notification_channel
*channel
)
364 struct lttng_notification
*notification
;
365 struct pending_notification
*pending_notification
;
367 if (channel
->pending_notifications
.count
>=
368 DEFAULT_CLIENT_MAX_QUEUED_NOTIFICATIONS_COUNT
) {
369 /* Drop the notification. */
370 ret
= enqueue_dropped_notification(channel
);
374 pending_notification
= (struct pending_notification
*) zmalloc(sizeof(*pending_notification
));
375 if (!pending_notification
) {
379 CDS_INIT_LIST_HEAD(&pending_notification
->node
);
381 notification
= create_notification_from_current_message(channel
);
387 pending_notification
->notification
= notification
;
388 cds_list_add(&pending_notification
->node
,
389 &channel
->pending_notifications
.list
);
390 channel
->pending_notifications
.count
++;
394 free(pending_notification
);
398 enum lttng_notification_channel_status
399 lttng_notification_channel_has_pending_notification(
400 struct lttng_notification_channel
*channel
,
401 bool *_notification_pending
)
404 enum lttng_notification_channel_status status
=
405 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
406 struct lttng_poll_event events
;
408 if (!channel
|| !_notification_pending
) {
409 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID
;
413 pthread_mutex_lock(&channel
->lock
);
415 if (channel
->pending_notifications
.count
) {
416 *_notification_pending
= true;
420 if (channel
->socket
< 0) {
421 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED
;
426 * Check, without blocking, if data is available on the channel's
427 * socket. If there is data available, it is safe to read (blocking)
428 * on the socket for a message from the session daemon.
430 * Since all commands wait for the session daemon's reply before
431 * releasing the channel's lock, the protocol only allows for
432 * notifications and "notification dropped" messages to come
433 * through. If we receive a different message type, it is
434 * considered a protocol error.
436 * Note that this function is not guaranteed not to block. This
437 * will block until our peer (the session daemon) has sent a complete
438 * message if we see data available on the socket. If the peer does
439 * not respect the protocol, this may block indefinitely.
441 ret
= lttng_poll_create(&events
, 1, LTTNG_CLOEXEC
);
443 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
446 ret
= lttng_poll_add(&events
, channel
->socket
, LPOLLIN
| LPOLLERR
);
448 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
451 /* timeout = 0: return immediately. */
452 ret
= lttng_poll_wait_interruptible(&events
, 0);
454 /* No data available. */
455 *_notification_pending
= false;
457 } else if (ret
< 0) {
458 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
462 /* Data available on socket. */
463 ret
= receive_message(channel
);
465 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
469 switch (get_current_message_type(channel
)) {
470 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION
:
471 ret
= enqueue_notification_from_current_message(channel
);
475 *_notification_pending
= true;
477 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED
:
478 ret
= enqueue_dropped_notification(channel
);
482 *_notification_pending
= true;
485 /* Protocol error. */
486 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
491 lttng_poll_clean(&events
);
493 pthread_mutex_unlock(&channel
->lock
);
499 int receive_command_reply(struct lttng_notification_channel
*channel
,
500 enum lttng_notification_channel_status
*status
)
503 struct lttng_notification_channel_command_reply
*reply
;
506 enum lttng_notification_channel_message_type msg_type
;
508 ret
= receive_message(channel
);
513 msg_type
= get_current_message_type(channel
);
515 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY
:
517 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION
:
518 ret
= enqueue_notification_from_current_message(
524 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED
:
525 ret
= enqueue_dropped_notification(channel
);
530 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE
:
532 struct lttng_notification_channel_command_handshake
*handshake
;
534 handshake
= (struct lttng_notification_channel_command_handshake
*)
535 (channel
->reception_payload
.buffer
.data
+
536 sizeof(struct lttng_notification_channel_message
));
537 channel
->version
.major
= handshake
->major
;
538 channel
->version
.minor
= handshake
->minor
;
539 channel
->version
.set
= true;
549 if (channel
->reception_payload
.buffer
.size
<
550 (sizeof(struct lttng_notification_channel_message
) +
552 /* Invalid message received. */
557 reply
= (struct lttng_notification_channel_command_reply
*)
558 (channel
->reception_payload
.buffer
.data
+
559 sizeof(struct lttng_notification_channel_message
));
560 *status
= (enum lttng_notification_channel_status
) reply
->status
;
566 int handshake(struct lttng_notification_channel
*channel
)
569 enum lttng_notification_channel_status status
=
570 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
571 struct lttng_notification_channel_command_handshake handshake
= {
572 .major
= LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR
,
573 .minor
= LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR
,
575 struct lttng_notification_channel_message msg_header
= {
576 .type
= LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE
,
577 .size
= sizeof(handshake
),
580 char send_buffer
[sizeof(msg_header
) + sizeof(handshake
)];
582 memcpy(send_buffer
, &msg_header
, sizeof(msg_header
));
583 memcpy(send_buffer
+ sizeof(msg_header
), &handshake
, sizeof(handshake
));
585 pthread_mutex_lock(&channel
->lock
);
587 ret
= lttcomm_send_creds_unix_sock(channel
->socket
, send_buffer
,
588 sizeof(send_buffer
));
593 /* Receive handshake info from the sessiond. */
594 ret
= receive_command_reply(channel
, &status
);
599 if (!channel
->version
.set
) {
604 if (channel
->version
.major
!= LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR
) {
610 pthread_mutex_unlock(&channel
->lock
);
615 enum lttng_notification_channel_status
send_condition_command(
616 struct lttng_notification_channel
*channel
,
617 enum lttng_notification_channel_message_type type
,
618 const struct lttng_condition
*condition
)
622 enum lttng_notification_channel_status status
=
623 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
624 struct lttng_payload payload
;
625 struct lttng_notification_channel_message cmd_header
= {
626 .type
= (int8_t) type
,
631 lttng_payload_init(&payload
);
634 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID
;
638 LTTNG_ASSERT(type
== LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE
||
639 type
== LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE
);
641 pthread_mutex_lock(&channel
->lock
);
642 socket
= channel
->socket
;
644 if (!lttng_condition_validate(condition
)) {
645 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID
;
649 ret
= lttng_dynamic_buffer_append(&payload
.buffer
, &cmd_header
,
652 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
656 ret
= lttng_condition_serialize(condition
, &payload
);
658 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID
;
662 /* Update payload length. */
663 ((struct lttng_notification_channel_message
*) payload
.buffer
.data
)->size
=
664 (uint32_t) (payload
.buffer
.size
- sizeof(cmd_header
));
667 struct lttng_payload_view pv
=
668 lttng_payload_view_from_payload(
671 lttng_payload_view_get_fd_handle_count(&pv
);
673 /* Update fd count. */
674 ((struct lttng_notification_channel_message
*) payload
.buffer
.data
)->fds
=
677 ret
= lttcomm_send_unix_sock(
678 socket
, pv
.buffer
.data
, pv
.buffer
.size
);
680 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
684 /* Pass fds if present. */
686 ret
= lttcomm_send_payload_view_fds_unix_sock(socket
,
689 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
695 ret
= receive_command_reply(channel
, &status
);
697 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
701 pthread_mutex_unlock(&channel
->lock
);
703 lttng_payload_reset(&payload
);
707 enum lttng_notification_channel_status
lttng_notification_channel_subscribe(
708 struct lttng_notification_channel
*channel
,
709 const struct lttng_condition
*condition
)
711 return send_condition_command(channel
,
712 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE
,
716 enum lttng_notification_channel_status
lttng_notification_channel_unsubscribe(
717 struct lttng_notification_channel
*channel
,
718 const struct lttng_condition
*condition
)
720 return send_condition_command(channel
,
721 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE
,
725 void lttng_notification_channel_destroy(
726 struct lttng_notification_channel
*channel
)
732 if (channel
->socket
>= 0) {
733 (void) lttcomm_close_unix_sock(channel
->socket
);
735 pthread_mutex_destroy(&channel
->lock
);
736 lttng_payload_reset(&channel
->reception_payload
);