bin: compile lttng-consumerd as a C++
[lttng-tools.git] / src / lib / lttng-ctl / channel.c
1 /*
2 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * SPDX-License-Identifier: LGPL-2.1-only
5 *
6 */
7
8 #include <lttng/notification/notification-internal.h>
9 #include <lttng/notification/channel-internal.h>
10 #include <lttng/condition/condition-internal.h>
11 #include <lttng/endpoint.h>
12 #include <common/defaults.h>
13 #include <common/error.h>
14 #include <common/dynamic-buffer.h>
15 #include <common/utils.h>
16 #include <common/defaults.h>
17 #include <common/payload.h>
18 #include <common/payload-view.h>
19 #include <common/unix.h>
20 #include "lttng-ctl-helper.h"
21 #include <common/compat/poll.h>
22
23 static
24 int handshake(struct lttng_notification_channel *channel);
25
26 /*
27 * Populates the reception buffer with the next complete message.
28 * The caller must acquire the channel's lock.
29 */
30 static
31 int receive_message(struct lttng_notification_channel *channel)
32 {
33 ssize_t ret;
34 struct lttng_notification_channel_message msg;
35
36 lttng_payload_clear(&channel->reception_payload);
37
38 ret = lttcomm_recv_unix_sock(channel->socket, &msg, sizeof(msg));
39 if (ret <= 0) {
40 ret = -1;
41 goto error;
42 }
43
44 if (msg.size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) {
45 ret = -1;
46 goto error;
47 }
48
49 /* Add message header at buffer's start. */
50 ret = lttng_dynamic_buffer_append(&channel->reception_payload.buffer, &msg,
51 sizeof(msg));
52 if (ret) {
53 goto error;
54 }
55
56 /* Reserve space for the payload. */
57 ret = lttng_dynamic_buffer_set_size(&channel->reception_payload.buffer,
58 channel->reception_payload.buffer.size + msg.size);
59 if (ret) {
60 goto error;
61 }
62
63 /* Receive message payload. */
64 ret = lttcomm_recv_unix_sock(channel->socket,
65 channel->reception_payload.buffer.data + sizeof(msg), msg.size);
66 if (ret < (ssize_t) msg.size) {
67 ret = -1;
68 goto error;
69 }
70
71 /* Receive message fds. */
72 if (msg.fds != 0) {
73 ret = lttcomm_recv_payload_fds_unix_sock(channel->socket,
74 msg.fds, &channel->reception_payload);
75 if (ret < sizeof(int) * msg.fds) {
76 ret = -1;
77 goto error;
78 }
79 }
80 ret = 0;
81 end:
82 return ret;
83 error:
84 lttng_payload_clear(&channel->reception_payload);
85 goto end;
86 }
87
88 static
89 enum lttng_notification_channel_message_type get_current_message_type(
90 struct lttng_notification_channel *channel)
91 {
92 struct lttng_notification_channel_message *msg;
93
94 LTTNG_ASSERT(channel->reception_payload.buffer.size >= sizeof(*msg));
95
96 msg = (struct lttng_notification_channel_message *)
97 channel->reception_payload.buffer.data;
98 return (enum lttng_notification_channel_message_type) msg->type;
99 }
100
101 static
102 struct lttng_notification *create_notification_from_current_message(
103 struct lttng_notification_channel *channel)
104 {
105 ssize_t ret;
106 struct lttng_notification *notification = NULL;
107
108 if (channel->reception_payload.buffer.size <=
109 sizeof(struct lttng_notification_channel_message)) {
110 goto end;
111 }
112
113 {
114 struct lttng_payload_view view = lttng_payload_view_from_payload(
115 &channel->reception_payload,
116 sizeof(struct lttng_notification_channel_message),
117 -1);
118
119 ret = lttng_notification_create_from_payload(
120 &view, &notification);
121 }
122
123 if (ret != channel->reception_payload.buffer.size -
124 sizeof(struct lttng_notification_channel_message)) {
125 lttng_notification_destroy(notification);
126 notification = NULL;
127 goto end;
128 }
129 end:
130 return notification;
131 }
132
133 struct lttng_notification_channel *lttng_notification_channel_create(
134 struct lttng_endpoint *endpoint)
135 {
136 int fd, ret;
137 bool is_in_tracing_group = false, is_root = false;
138 char *sock_path = NULL;
139 struct lttng_notification_channel *channel = NULL;
140
141 if (!endpoint ||
142 endpoint != lttng_session_daemon_notification_endpoint) {
143 goto end;
144 }
145
146 sock_path = zmalloc(LTTNG_PATH_MAX);
147 if (!sock_path) {
148 goto end;
149 }
150
151 channel = zmalloc(sizeof(struct lttng_notification_channel));
152 if (!channel) {
153 goto end;
154 }
155 channel->socket = -1;
156 pthread_mutex_init(&channel->lock, NULL);
157 lttng_payload_init(&channel->reception_payload);
158 CDS_INIT_LIST_HEAD(&channel->pending_notifications.list);
159
160 is_root = (getuid() == 0);
161 if (!is_root) {
162 is_in_tracing_group = lttng_check_tracing_group();
163 }
164
165 if (is_root || is_in_tracing_group) {
166 ret = lttng_strncpy(sock_path,
167 DEFAULT_GLOBAL_NOTIFICATION_CHANNEL_UNIX_SOCK,
168 LTTNG_PATH_MAX);
169 if (ret) {
170 ret = -LTTNG_ERR_INVALID;
171 goto error;
172 }
173
174 ret = lttcomm_connect_unix_sock(sock_path);
175 if (ret >= 0) {
176 fd = ret;
177 goto set_fd;
178 }
179 }
180
181 /* Fallback to local session daemon. */
182 ret = snprintf(sock_path, LTTNG_PATH_MAX,
183 DEFAULT_HOME_NOTIFICATION_CHANNEL_UNIX_SOCK,
184 utils_get_home_dir());
185 if (ret < 0 || ret >= LTTNG_PATH_MAX) {
186 goto error;
187 }
188
189 ret = lttcomm_connect_unix_sock(sock_path);
190 if (ret < 0) {
191 goto error;
192 }
193 fd = ret;
194
195 set_fd:
196 channel->socket = fd;
197
198 ret = handshake(channel);
199 if (ret) {
200 goto error;
201 }
202 end:
203 free(sock_path);
204 return channel;
205 error:
206 lttng_notification_channel_destroy(channel);
207 channel = NULL;
208 goto end;
209 }
210
211 enum lttng_notification_channel_status
212 lttng_notification_channel_get_next_notification(
213 struct lttng_notification_channel *channel,
214 struct lttng_notification **_notification)
215 {
216 int ret;
217 struct lttng_notification *notification = NULL;
218 enum lttng_notification_channel_status status =
219 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
220 struct lttng_poll_event events;
221
222 if (!channel || !_notification) {
223 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
224 goto end;
225 }
226
227 pthread_mutex_lock(&channel->lock);
228
229 if (channel->pending_notifications.count) {
230 struct pending_notification *pending_notification;
231
232 LTTNG_ASSERT(!cds_list_empty(&channel->pending_notifications.list));
233
234 /* Deliver one of the pending notifications. */
235 pending_notification = cds_list_first_entry(
236 &channel->pending_notifications.list,
237 struct pending_notification,
238 node);
239 notification = pending_notification->notification;
240 if (!notification) {
241 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED;
242 }
243 cds_list_del(&pending_notification->node);
244 channel->pending_notifications.count--;
245 free(pending_notification);
246 goto end_unlock;
247 }
248
249 /*
250 * Block on interruptible epoll/poll() instead of the message reception
251 * itself as the recvmsg() wrappers always restart on EINTR. We choose
252 * to wait using interruptible epoll/poll() in order to:
253 * 1) Return if a signal occurs,
254 * 2) Not deal with partially received messages.
255 *
256 * The drawback to this approach is that we assume that messages
257 * are complete/well formed. If a message is shorter than its
258 * announced length, receive_message() will block on recvmsg()
259 * and never return (even if a signal is received).
260 */
261 ret = lttng_poll_create(&events, 1, LTTNG_CLOEXEC);
262 if (ret < 0) {
263 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
264 goto end_unlock;
265 }
266 ret = lttng_poll_add(&events, channel->socket, LPOLLIN | LPOLLERR);
267 if (ret < 0) {
268 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
269 goto end_clean_poll;
270 }
271 ret = lttng_poll_wait_interruptible(&events, -1);
272 if (ret <= 0) {
273 status = (ret == -1 && errno == EINTR) ?
274 LTTNG_NOTIFICATION_CHANNEL_STATUS_INTERRUPTED :
275 LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
276 goto end_clean_poll;
277 }
278
279 ret = receive_message(channel);
280 if (ret) {
281 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
282 goto end_clean_poll;
283 }
284
285 switch (get_current_message_type(channel)) {
286 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION:
287 notification = create_notification_from_current_message(
288 channel);
289 if (!notification) {
290 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
291 goto end_clean_poll;
292 }
293 break;
294 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED:
295 /* No payload to consume. */
296 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED;
297 break;
298 default:
299 /* Protocol error. */
300 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
301 goto end_clean_poll;
302 }
303
304 end_clean_poll:
305 lttng_poll_clean(&events);
306 end_unlock:
307 pthread_mutex_unlock(&channel->lock);
308 *_notification = notification;
309 end:
310 return status;
311 }
312
313 static
314 int enqueue_dropped_notification(
315 struct lttng_notification_channel *channel)
316 {
317 int ret = 0;
318 struct pending_notification *pending_notification;
319 struct cds_list_head *last_element =
320 channel->pending_notifications.list.prev;
321
322 pending_notification = caa_container_of(last_element,
323 struct pending_notification, node);
324 if (!pending_notification->notification) {
325 /*
326 * The last enqueued notification indicates dropped
327 * notifications; there is nothing to do as we group
328 * dropped notifications together.
329 */
330 goto end;
331 }
332
333 if (channel->pending_notifications.count >=
334 DEFAULT_CLIENT_MAX_QUEUED_NOTIFICATIONS_COUNT &&
335 pending_notification->notification) {
336 /*
337 * Discard the last enqueued notification to indicate
338 * that notifications were dropped at this point.
339 */
340 lttng_notification_destroy(
341 pending_notification->notification);
342 pending_notification->notification = NULL;
343 goto end;
344 }
345
346 pending_notification = zmalloc(sizeof(*pending_notification));
347 if (!pending_notification) {
348 ret = -1;
349 goto end;
350 }
351 CDS_INIT_LIST_HEAD(&pending_notification->node);
352 cds_list_add(&pending_notification->node,
353 &channel->pending_notifications.list);
354 channel->pending_notifications.count++;
355 end:
356 return ret;
357 }
358
359 static
360 int enqueue_notification_from_current_message(
361 struct lttng_notification_channel *channel)
362 {
363 int ret = 0;
364 struct lttng_notification *notification;
365 struct pending_notification *pending_notification;
366
367 if (channel->pending_notifications.count >=
368 DEFAULT_CLIENT_MAX_QUEUED_NOTIFICATIONS_COUNT) {
369 /* Drop the notification. */
370 ret = enqueue_dropped_notification(channel);
371 goto end;
372 }
373
374 pending_notification = zmalloc(sizeof(*pending_notification));
375 if (!pending_notification) {
376 ret = -1;
377 goto error;
378 }
379 CDS_INIT_LIST_HEAD(&pending_notification->node);
380
381 notification = create_notification_from_current_message(channel);
382 if (!notification) {
383 ret = -1;
384 goto error;
385 }
386
387 pending_notification->notification = notification;
388 cds_list_add(&pending_notification->node,
389 &channel->pending_notifications.list);
390 channel->pending_notifications.count++;
391 end:
392 return ret;
393 error:
394 free(pending_notification);
395 goto end;
396 }
397
398 enum lttng_notification_channel_status
399 lttng_notification_channel_has_pending_notification(
400 struct lttng_notification_channel *channel,
401 bool *_notification_pending)
402 {
403 int ret;
404 enum lttng_notification_channel_status status =
405 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
406 struct lttng_poll_event events;
407
408 if (!channel || !_notification_pending) {
409 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
410 goto end;
411 }
412
413 pthread_mutex_lock(&channel->lock);
414
415 if (channel->pending_notifications.count) {
416 *_notification_pending = true;
417 goto end_unlock;
418 }
419
420 if (channel->socket < 0) {
421 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED;
422 goto end_unlock;
423 }
424
425 /*
426 * Check, without blocking, if data is available on the channel's
427 * socket. If there is data available, it is safe to read (blocking)
428 * on the socket for a message from the session daemon.
429 *
430 * Since all commands wait for the session daemon's reply before
431 * releasing the channel's lock, the protocol only allows for
432 * notifications and "notification dropped" messages to come
433 * through. If we receive a different message type, it is
434 * considered a protocol error.
435 *
436 * Note that this function is not guaranteed not to block. This
437 * will block until our peer (the session daemon) has sent a complete
438 * message if we see data available on the socket. If the peer does
439 * not respect the protocol, this may block indefinitely.
440 */
441 ret = lttng_poll_create(&events, 1, LTTNG_CLOEXEC);
442 if (ret < 0) {
443 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
444 goto end_unlock;
445 }
446 ret = lttng_poll_add(&events, channel->socket, LPOLLIN | LPOLLERR);
447 if (ret < 0) {
448 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
449 goto end_clean_poll;
450 }
451 /* timeout = 0: return immediately. */
452 ret = lttng_poll_wait_interruptible(&events, 0);
453 if (ret == 0) {
454 /* No data available. */
455 *_notification_pending = false;
456 goto end_clean_poll;
457 } else if (ret < 0) {
458 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
459 goto end_clean_poll;
460 }
461
462 /* Data available on socket. */
463 ret = receive_message(channel);
464 if (ret) {
465 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
466 goto end_clean_poll;
467 }
468
469 switch (get_current_message_type(channel)) {
470 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION:
471 ret = enqueue_notification_from_current_message(channel);
472 if (ret) {
473 goto end_clean_poll;
474 }
475 *_notification_pending = true;
476 break;
477 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED:
478 ret = enqueue_dropped_notification(channel);
479 if (ret) {
480 goto end_clean_poll;
481 }
482 *_notification_pending = true;
483 break;
484 default:
485 /* Protocol error. */
486 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
487 goto end_clean_poll;
488 }
489
490 end_clean_poll:
491 lttng_poll_clean(&events);
492 end_unlock:
493 pthread_mutex_unlock(&channel->lock);
494 end:
495 return status;
496 }
497
498 static
499 int receive_command_reply(struct lttng_notification_channel *channel,
500 enum lttng_notification_channel_status *status)
501 {
502 int ret;
503 struct lttng_notification_channel_command_reply *reply;
504
505 while (true) {
506 enum lttng_notification_channel_message_type msg_type;
507
508 ret = receive_message(channel);
509 if (ret) {
510 goto end;
511 }
512
513 msg_type = get_current_message_type(channel);
514 switch (msg_type) {
515 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY:
516 goto exit_loop;
517 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION:
518 ret = enqueue_notification_from_current_message(
519 channel);
520 if (ret) {
521 goto end;
522 }
523 break;
524 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED:
525 ret = enqueue_dropped_notification(channel);
526 if (ret) {
527 goto end;
528 }
529 break;
530 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
531 {
532 struct lttng_notification_channel_command_handshake *handshake;
533
534 handshake = (struct lttng_notification_channel_command_handshake *)
535 (channel->reception_payload.buffer.data +
536 sizeof(struct lttng_notification_channel_message));
537 channel->version.major = handshake->major;
538 channel->version.minor = handshake->minor;
539 channel->version.set = true;
540 break;
541 }
542 default:
543 ret = -1;
544 goto end;
545 }
546 }
547
548 exit_loop:
549 if (channel->reception_payload.buffer.size <
550 (sizeof(struct lttng_notification_channel_message) +
551 sizeof(*reply))) {
552 /* Invalid message received. */
553 ret = -1;
554 goto end;
555 }
556
557 reply = (struct lttng_notification_channel_command_reply *)
558 (channel->reception_payload.buffer.data +
559 sizeof(struct lttng_notification_channel_message));
560 *status = (enum lttng_notification_channel_status) reply->status;
561 end:
562 return ret;
563 }
564
565 static
566 int handshake(struct lttng_notification_channel *channel)
567 {
568 ssize_t ret;
569 enum lttng_notification_channel_status status =
570 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
571 struct lttng_notification_channel_command_handshake handshake = {
572 .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
573 .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
574 };
575 struct lttng_notification_channel_message msg_header = {
576 .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE,
577 .size = sizeof(handshake),
578 };
579 char send_buffer[sizeof(msg_header) + sizeof(handshake)];
580
581 memcpy(send_buffer, &msg_header, sizeof(msg_header));
582 memcpy(send_buffer + sizeof(msg_header), &handshake, sizeof(handshake));
583
584 pthread_mutex_lock(&channel->lock);
585
586 ret = lttcomm_send_creds_unix_sock(channel->socket, send_buffer,
587 sizeof(send_buffer));
588 if (ret < 0) {
589 goto end_unlock;
590 }
591
592 /* Receive handshake info from the sessiond. */
593 ret = receive_command_reply(channel, &status);
594 if (ret < 0) {
595 goto end_unlock;
596 }
597
598 if (!channel->version.set) {
599 ret = -1;
600 goto end_unlock;
601 }
602
603 if (channel->version.major != LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) {
604 ret = -1;
605 goto end_unlock;
606 }
607
608 end_unlock:
609 pthread_mutex_unlock(&channel->lock);
610 return ret;
611 }
612
613 static
614 enum lttng_notification_channel_status send_condition_command(
615 struct lttng_notification_channel *channel,
616 enum lttng_notification_channel_message_type type,
617 const struct lttng_condition *condition)
618 {
619 int socket;
620 ssize_t ret;
621 enum lttng_notification_channel_status status =
622 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
623 struct lttng_payload payload;
624 struct lttng_notification_channel_message cmd_header = {
625 .type = (int8_t) type,
626 };
627
628 lttng_payload_init(&payload);
629
630 if (!channel) {
631 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
632 goto end;
633 }
634
635 LTTNG_ASSERT(type == LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE ||
636 type == LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE);
637
638 pthread_mutex_lock(&channel->lock);
639 socket = channel->socket;
640
641 if (!lttng_condition_validate(condition)) {
642 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
643 goto end_unlock;
644 }
645
646 ret = lttng_dynamic_buffer_append(&payload.buffer, &cmd_header,
647 sizeof(cmd_header));
648 if (ret) {
649 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
650 goto end_unlock;
651 }
652
653 ret = lttng_condition_serialize(condition, &payload);
654 if (ret) {
655 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
656 goto end_unlock;
657 }
658
659 /* Update payload length. */
660 ((struct lttng_notification_channel_message *) payload.buffer.data)->size =
661 (uint32_t) (payload.buffer.size - sizeof(cmd_header));
662
663 {
664 struct lttng_payload_view pv =
665 lttng_payload_view_from_payload(
666 &payload, 0, -1);
667 const int fd_count =
668 lttng_payload_view_get_fd_handle_count(&pv);
669
670 /* Update fd count. */
671 ((struct lttng_notification_channel_message *) payload.buffer.data)->fds =
672 (uint32_t) fd_count;
673
674 ret = lttcomm_send_unix_sock(
675 socket, pv.buffer.data, pv.buffer.size);
676 if (ret < 0) {
677 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
678 goto end_unlock;
679 }
680
681 /* Pass fds if present. */
682 if (fd_count > 0) {
683 ret = lttcomm_send_payload_view_fds_unix_sock(socket,
684 &pv);
685 if (ret < 0) {
686 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
687 goto end_unlock;
688 }
689 }
690 }
691
692 ret = receive_command_reply(channel, &status);
693 if (ret < 0) {
694 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
695 goto end_unlock;
696 }
697 end_unlock:
698 pthread_mutex_unlock(&channel->lock);
699 end:
700 lttng_payload_reset(&payload);
701 return status;
702 }
703
704 enum lttng_notification_channel_status lttng_notification_channel_subscribe(
705 struct lttng_notification_channel *channel,
706 const struct lttng_condition *condition)
707 {
708 return send_condition_command(channel,
709 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE,
710 condition);
711 }
712
713 enum lttng_notification_channel_status lttng_notification_channel_unsubscribe(
714 struct lttng_notification_channel *channel,
715 const struct lttng_condition *condition)
716 {
717 return send_condition_command(channel,
718 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE,
719 condition);
720 }
721
722 void lttng_notification_channel_destroy(
723 struct lttng_notification_channel *channel)
724 {
725 if (!channel) {
726 return;
727 }
728
729 if (channel->socket >= 0) {
730 (void) lttcomm_close_unix_sock(channel->socket);
731 }
732 pthread_mutex_destroy(&channel->lock);
733 lttng_payload_reset(&channel->reception_payload);
734 free(channel);
735 }
This page took 0.043725 seconds and 4 git commands to generate.