2 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
4 * SPDX-License-Identifier: LGPL-2.1-only
8 #include "lttng-ctl-helper.hpp"
10 #include <common/compat/poll.hpp>
11 #include <common/defaults.hpp>
12 #include <common/dynamic-buffer.hpp>
13 #include <common/error.hpp>
14 #include <common/payload-view.hpp>
15 #include <common/payload.hpp>
16 #include <common/unix.hpp>
17 #include <common/utils.hpp>
19 #include <lttng/condition/condition-internal.hpp>
20 #include <lttng/endpoint.h>
21 #include <lttng/notification/channel-internal.hpp>
22 #include <lttng/notification/notification-internal.hpp>
24 static int handshake(struct lttng_notification_channel
*channel
);
27 * Populates the reception buffer with the next complete message.
28 * The caller must acquire the channel's lock.
30 static int receive_message(struct lttng_notification_channel
*channel
)
33 struct lttng_notification_channel_message msg
;
35 lttng_payload_clear(&channel
->reception_payload
);
37 ret
= lttcomm_recv_unix_sock(channel
->socket
, &msg
, sizeof(msg
));
43 if (msg
.size
> DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE
) {
48 /* Add message header at buffer's start. */
49 ret
= lttng_dynamic_buffer_append(&channel
->reception_payload
.buffer
, &msg
, sizeof(msg
));
58 /* Reserve space for the payload. */
59 ret
= lttng_dynamic_buffer_set_size(&channel
->reception_payload
.buffer
,
60 channel
->reception_payload
.buffer
.size
+ msg
.size
);
65 /* Receive message payload. */
66 ret
= lttcomm_recv_unix_sock(
67 channel
->socket
, channel
->reception_payload
.buffer
.data
+ sizeof(msg
), msg
.size
);
68 if (ret
< (ssize_t
) msg
.size
) {
74 /* Receive message fds. */
76 ret
= lttcomm_recv_payload_fds_unix_sock(
77 channel
->socket
, msg
.fds
, &channel
->reception_payload
);
78 if (ret
< sizeof(int) * msg
.fds
) {
87 lttng_payload_clear(&channel
->reception_payload
);
91 static enum lttng_notification_channel_message_type
92 get_current_message_type(struct lttng_notification_channel
*channel
)
94 struct lttng_notification_channel_message
*msg
;
96 LTTNG_ASSERT(channel
->reception_payload
.buffer
.size
>= sizeof(*msg
));
98 msg
= (struct lttng_notification_channel_message
*) channel
->reception_payload
.buffer
.data
;
99 return (enum lttng_notification_channel_message_type
) msg
->type
;
102 static struct lttng_notification
*
103 create_notification_from_current_message(struct lttng_notification_channel
*channel
)
106 struct lttng_notification
*notification
= nullptr;
108 if (channel
->reception_payload
.buffer
.size
<=
109 sizeof(struct lttng_notification_channel_message
)) {
114 struct lttng_payload_view view
= lttng_payload_view_from_payload(
115 &channel
->reception_payload
,
116 sizeof(struct lttng_notification_channel_message
),
119 ret
= lttng_notification_create_from_payload(&view
, ¬ification
);
123 channel
->reception_payload
.buffer
.size
-
124 sizeof(struct lttng_notification_channel_message
)) {
125 lttng_notification_destroy(notification
);
126 notification
= nullptr;
133 struct lttng_notification_channel
*
134 lttng_notification_channel_create(struct lttng_endpoint
*endpoint
)
137 bool is_in_tracing_group
= false, is_root
= false;
138 char *sock_path
= nullptr;
139 struct lttng_notification_channel
*channel
= nullptr;
141 if (!endpoint
|| endpoint
!= lttng_session_daemon_notification_endpoint
) {
145 sock_path
= calloc
<char>(LTTNG_PATH_MAX
);
150 channel
= zmalloc
<lttng_notification_channel
>();
154 channel
->socket
= -1;
155 pthread_mutex_init(&channel
->lock
, nullptr);
156 lttng_payload_init(&channel
->reception_payload
);
157 CDS_INIT_LIST_HEAD(&channel
->pending_notifications
.list
);
159 is_root
= (getuid() == 0);
161 is_in_tracing_group
= lttng_check_tracing_group();
164 if (is_root
|| is_in_tracing_group
) {
166 sock_path
, DEFAULT_GLOBAL_NOTIFICATION_CHANNEL_UNIX_SOCK
, LTTNG_PATH_MAX
);
168 ret
= -LTTNG_ERR_INVALID
;
172 ret
= lttcomm_connect_unix_sock(sock_path
);
179 /* Fallback to local session daemon. */
180 ret
= snprintf(sock_path
,
182 DEFAULT_HOME_NOTIFICATION_CHANNEL_UNIX_SOCK
,
183 utils_get_home_dir());
184 if (ret
< 0 || ret
>= LTTNG_PATH_MAX
) {
188 ret
= lttcomm_connect_unix_sock(sock_path
);
195 channel
->socket
= fd
;
197 ret
= handshake(channel
);
205 lttng_notification_channel_destroy(channel
);
210 enum lttng_notification_channel_status
211 lttng_notification_channel_get_next_notification(struct lttng_notification_channel
*channel
,
212 struct lttng_notification
**_notification
)
215 struct lttng_notification
*notification
= nullptr;
216 enum lttng_notification_channel_status status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
217 struct lttng_poll_event events
;
219 if (!channel
|| !_notification
) {
220 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID
;
224 pthread_mutex_lock(&channel
->lock
);
226 if (channel
->pending_notifications
.count
) {
227 struct pending_notification
*pending_notification
;
229 LTTNG_ASSERT(!cds_list_empty(&channel
->pending_notifications
.list
));
231 /* Deliver one of the pending notifications. */
232 pending_notification
= cds_list_first_entry(
233 &channel
->pending_notifications
.list
, struct pending_notification
, node
);
234 notification
= pending_notification
->notification
;
236 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED
;
238 cds_list_del(&pending_notification
->node
);
239 channel
->pending_notifications
.count
--;
240 free(pending_notification
);
245 * Block on interruptible epoll/poll() instead of the message reception
246 * itself as the recvmsg() wrappers always restart on EINTR. We choose
247 * to wait using interruptible epoll/poll() in order to:
248 * 1) Return if a signal occurs,
249 * 2) Not deal with partially received messages.
251 * The drawback to this approach is that we assume that messages
252 * are complete/well formed. If a message is shorter than its
253 * announced length, receive_message() will block on recvmsg()
254 * and never return (even if a signal is received).
256 ret
= lttng_poll_create(&events
, 1, LTTNG_CLOEXEC
);
258 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
261 ret
= lttng_poll_add(&events
, channel
->socket
, LPOLLIN
);
263 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
266 ret
= lttng_poll_wait_interruptible(&events
, -1);
268 status
= (ret
== -1 && errno
== EINTR
) ?
269 LTTNG_NOTIFICATION_CHANNEL_STATUS_INTERRUPTED
:
270 LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
274 ret
= receive_message(channel
);
276 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
280 switch (get_current_message_type(channel
)) {
281 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION
:
282 notification
= create_notification_from_current_message(channel
);
284 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
288 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED
:
289 /* No payload to consume. */
290 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED
;
293 /* Protocol error. */
294 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
299 lttng_poll_clean(&events
);
301 pthread_mutex_unlock(&channel
->lock
);
302 *_notification
= notification
;
307 static int enqueue_dropped_notification(struct lttng_notification_channel
*channel
)
310 struct pending_notification
*pending_notification
;
311 struct cds_list_head
*last_element
= channel
->pending_notifications
.list
.prev
;
313 pending_notification
= caa_container_of(last_element
, struct pending_notification
, node
);
314 if (!pending_notification
->notification
) {
316 * The last enqueued notification indicates dropped
317 * notifications; there is nothing to do as we group
318 * dropped notifications together.
323 if (channel
->pending_notifications
.count
>= DEFAULT_CLIENT_MAX_QUEUED_NOTIFICATIONS_COUNT
&&
324 pending_notification
->notification
) {
326 * Discard the last enqueued notification to indicate
327 * that notifications were dropped at this point.
329 lttng_notification_destroy(pending_notification
->notification
);
330 pending_notification
->notification
= nullptr;
334 pending_notification
= zmalloc
<struct pending_notification
>();
335 if (!pending_notification
) {
339 CDS_INIT_LIST_HEAD(&pending_notification
->node
);
340 cds_list_add(&pending_notification
->node
, &channel
->pending_notifications
.list
);
341 channel
->pending_notifications
.count
++;
346 static int enqueue_notification_from_current_message(struct lttng_notification_channel
*channel
)
349 struct lttng_notification
*notification
;
350 struct pending_notification
*pending_notification
;
352 if (channel
->pending_notifications
.count
>= DEFAULT_CLIENT_MAX_QUEUED_NOTIFICATIONS_COUNT
) {
353 /* Drop the notification. */
354 ret
= enqueue_dropped_notification(channel
);
358 pending_notification
= zmalloc
<struct pending_notification
>();
359 if (!pending_notification
) {
363 CDS_INIT_LIST_HEAD(&pending_notification
->node
);
365 notification
= create_notification_from_current_message(channel
);
371 pending_notification
->notification
= notification
;
372 cds_list_add(&pending_notification
->node
, &channel
->pending_notifications
.list
);
373 channel
->pending_notifications
.count
++;
377 free(pending_notification
);
381 enum lttng_notification_channel_status
382 lttng_notification_channel_has_pending_notification(struct lttng_notification_channel
*channel
,
383 bool *_notification_pending
)
386 enum lttng_notification_channel_status status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
387 struct lttng_poll_event events
;
389 if (!channel
|| !_notification_pending
) {
390 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID
;
394 pthread_mutex_lock(&channel
->lock
);
396 if (channel
->pending_notifications
.count
) {
397 *_notification_pending
= true;
401 if (channel
->socket
< 0) {
402 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED
;
407 * Check, without blocking, if data is available on the channel's
408 * socket. If there is data available, it is safe to read (blocking)
409 * on the socket for a message from the session daemon.
411 * Since all commands wait for the session daemon's reply before
412 * releasing the channel's lock, the protocol only allows for
413 * notifications and "notification dropped" messages to come
414 * through. If we receive a different message type, it is
415 * considered a protocol error.
417 * Note that this function is not guaranteed not to block. This
418 * will block until our peer (the session daemon) has sent a complete
419 * message if we see data available on the socket. If the peer does
420 * not respect the protocol, this may block indefinitely.
422 ret
= lttng_poll_create(&events
, 1, LTTNG_CLOEXEC
);
424 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
427 ret
= lttng_poll_add(&events
, channel
->socket
, LPOLLIN
);
429 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
432 /* timeout = 0: return immediately. */
433 ret
= lttng_poll_wait_interruptible(&events
, 0);
435 /* No data available. */
436 *_notification_pending
= false;
438 } else if (ret
< 0) {
439 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
443 /* Data available on socket. */
444 ret
= receive_message(channel
);
446 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
450 switch (get_current_message_type(channel
)) {
451 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION
:
452 ret
= enqueue_notification_from_current_message(channel
);
456 *_notification_pending
= true;
458 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED
:
459 ret
= enqueue_dropped_notification(channel
);
463 *_notification_pending
= true;
466 /* Protocol error. */
467 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
472 lttng_poll_clean(&events
);
474 pthread_mutex_unlock(&channel
->lock
);
479 static int receive_command_reply(struct lttng_notification_channel
*channel
,
480 enum lttng_notification_channel_status
*status
)
483 struct lttng_notification_channel_command_reply
*reply
;
486 enum lttng_notification_channel_message_type msg_type
;
488 ret
= receive_message(channel
);
493 msg_type
= get_current_message_type(channel
);
495 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY
:
497 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION
:
498 ret
= enqueue_notification_from_current_message(channel
);
503 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED
:
504 ret
= enqueue_dropped_notification(channel
);
509 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE
:
511 struct lttng_notification_channel_command_handshake
*handshake
;
513 handshake
= (struct lttng_notification_channel_command_handshake
514 *) (channel
->reception_payload
.buffer
.data
+
515 sizeof(struct lttng_notification_channel_message
));
516 channel
->version
.major
= handshake
->major
;
517 channel
->version
.minor
= handshake
->minor
;
518 channel
->version
.set
= true;
528 if (channel
->reception_payload
.buffer
.size
<
529 (sizeof(struct lttng_notification_channel_message
) + sizeof(*reply
))) {
530 /* Invalid message received. */
535 reply
= (struct lttng_notification_channel_command_reply
536 *) (channel
->reception_payload
.buffer
.data
+
537 sizeof(struct lttng_notification_channel_message
));
538 *status
= (enum lttng_notification_channel_status
) reply
->status
;
543 static int handshake(struct lttng_notification_channel
*channel
)
546 enum lttng_notification_channel_status status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
547 struct lttng_notification_channel_command_handshake handshake
= {
548 .major
= LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR
,
549 .minor
= LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR
,
551 struct lttng_notification_channel_message msg_header
;
552 char send_buffer
[sizeof(msg_header
) + sizeof(handshake
)];
554 msg_header
.type
= LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE
;
555 msg_header
.size
= sizeof(handshake
);
558 memcpy(send_buffer
, &msg_header
, sizeof(msg_header
));
559 memcpy(send_buffer
+ sizeof(msg_header
), &handshake
, sizeof(handshake
));
561 pthread_mutex_lock(&channel
->lock
);
563 ret
= lttcomm_send_creds_unix_sock(channel
->socket
, send_buffer
, sizeof(send_buffer
));
568 /* Receive handshake info from the sessiond. */
569 ret
= receive_command_reply(channel
, &status
);
574 if (!channel
->version
.set
) {
579 if (channel
->version
.major
!= LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR
) {
585 pthread_mutex_unlock(&channel
->lock
);
589 static enum lttng_notification_channel_status
590 send_condition_command(struct lttng_notification_channel
*channel
,
591 enum lttng_notification_channel_message_type type
,
592 const struct lttng_condition
*condition
)
596 enum lttng_notification_channel_status status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
597 struct lttng_payload payload
;
598 struct lttng_notification_channel_message cmd_header
;
600 cmd_header
.type
= (int8_t) type
;
604 lttng_payload_init(&payload
);
607 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID
;
611 LTTNG_ASSERT(type
== LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE
||
612 type
== LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE
);
614 pthread_mutex_lock(&channel
->lock
);
615 socket
= channel
->socket
;
617 if (!lttng_condition_validate(condition
)) {
618 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID
;
622 ret
= lttng_dynamic_buffer_append(&payload
.buffer
, &cmd_header
, sizeof(cmd_header
));
624 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
628 ret
= lttng_condition_serialize(condition
, &payload
);
630 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID
;
634 /* Update payload length. */
635 ((struct lttng_notification_channel_message
*) payload
.buffer
.data
)->size
=
636 (uint32_t) (payload
.buffer
.size
- sizeof(cmd_header
));
639 struct lttng_payload_view pv
= lttng_payload_view_from_payload(&payload
, 0, -1);
640 const int fd_count
= lttng_payload_view_get_fd_handle_count(&pv
);
642 /* Update fd count. */
643 ((struct lttng_notification_channel_message
*) payload
.buffer
.data
)->fds
=
646 ret
= lttcomm_send_unix_sock(socket
, pv
.buffer
.data
, pv
.buffer
.size
);
648 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
652 /* Pass fds if present. */
654 ret
= lttcomm_send_payload_view_fds_unix_sock(socket
, &pv
);
656 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
662 ret
= receive_command_reply(channel
, &status
);
664 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
668 pthread_mutex_unlock(&channel
->lock
);
670 lttng_payload_reset(&payload
);
674 enum lttng_notification_channel_status
675 lttng_notification_channel_subscribe(struct lttng_notification_channel
*channel
,
676 const struct lttng_condition
*condition
)
678 return send_condition_command(
679 channel
, LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE
, condition
);
682 enum lttng_notification_channel_status
683 lttng_notification_channel_unsubscribe(struct lttng_notification_channel
*channel
,
684 const struct lttng_condition
*condition
)
686 return send_condition_command(
687 channel
, LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE
, condition
);
690 void lttng_notification_channel_destroy(struct lttng_notification_channel
*channel
)
696 if (channel
->socket
>= 0) {
697 (void) lttcomm_close_unix_sock(channel
->socket
);
699 pthread_mutex_destroy(&channel
->lock
);
700 lttng_payload_reset(&channel
->reception_payload
);