Prepare for '-Wmissing-field-initializers'
[lttng-tools.git] / src / lib / lttng-ctl / channel.cpp
... / ...
CommitLineData
1/*
2 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * SPDX-License-Identifier: LGPL-2.1-only
5 *
6 */
7
8#include <lttng/notification/notification-internal.h>
9#include <lttng/notification/channel-internal.h>
10#include <lttng/condition/condition-internal.h>
11#include <lttng/endpoint.h>
12#include <common/defaults.h>
13#include <common/error.h>
14#include <common/dynamic-buffer.h>
15#include <common/utils.h>
16#include <common/defaults.h>
17#include <common/payload.h>
18#include <common/payload-view.h>
19#include <common/unix.h>
20#include "lttng-ctl-helper.h"
21#include <common/compat/poll.h>
22
23static
24int handshake(struct lttng_notification_channel *channel);
25
26/*
27 * Populates the reception buffer with the next complete message.
28 * The caller must acquire the channel's lock.
29 */
30static
31int receive_message(struct lttng_notification_channel *channel)
32{
33 ssize_t ret;
34 struct lttng_notification_channel_message msg;
35
36 lttng_payload_clear(&channel->reception_payload);
37
38 ret = lttcomm_recv_unix_sock(channel->socket, &msg, sizeof(msg));
39 if (ret <= 0) {
40 ret = -1;
41 goto error;
42 }
43
44 if (msg.size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) {
45 ret = -1;
46 goto error;
47 }
48
49 /* Add message header at buffer's start. */
50 ret = lttng_dynamic_buffer_append(&channel->reception_payload.buffer, &msg,
51 sizeof(msg));
52 if (ret) {
53 goto error;
54 }
55
56 /* Reserve space for the payload. */
57 ret = lttng_dynamic_buffer_set_size(&channel->reception_payload.buffer,
58 channel->reception_payload.buffer.size + msg.size);
59 if (ret) {
60 goto error;
61 }
62
63 /* Receive message payload. */
64 ret = lttcomm_recv_unix_sock(channel->socket,
65 channel->reception_payload.buffer.data + sizeof(msg), msg.size);
66 if (ret < (ssize_t) msg.size) {
67 ret = -1;
68 goto error;
69 }
70
71 /* Receive message fds. */
72 if (msg.fds != 0) {
73 ret = lttcomm_recv_payload_fds_unix_sock(channel->socket,
74 msg.fds, &channel->reception_payload);
75 if (ret < sizeof(int) * msg.fds) {
76 ret = -1;
77 goto error;
78 }
79 }
80 ret = 0;
81end:
82 return ret;
83error:
84 lttng_payload_clear(&channel->reception_payload);
85 goto end;
86}
87
88static
89enum lttng_notification_channel_message_type get_current_message_type(
90 struct lttng_notification_channel *channel)
91{
92 struct lttng_notification_channel_message *msg;
93
94 LTTNG_ASSERT(channel->reception_payload.buffer.size >= sizeof(*msg));
95
96 msg = (struct lttng_notification_channel_message *)
97 channel->reception_payload.buffer.data;
98 return (enum lttng_notification_channel_message_type) msg->type;
99}
100
101static
102struct lttng_notification *create_notification_from_current_message(
103 struct lttng_notification_channel *channel)
104{
105 ssize_t ret;
106 struct lttng_notification *notification = NULL;
107
108 if (channel->reception_payload.buffer.size <=
109 sizeof(struct lttng_notification_channel_message)) {
110 goto end;
111 }
112
113 {
114 struct lttng_payload_view view = lttng_payload_view_from_payload(
115 &channel->reception_payload,
116 sizeof(struct lttng_notification_channel_message),
117 -1);
118
119 ret = lttng_notification_create_from_payload(
120 &view, &notification);
121 }
122
123 if (ret != channel->reception_payload.buffer.size -
124 sizeof(struct lttng_notification_channel_message)) {
125 lttng_notification_destroy(notification);
126 notification = NULL;
127 goto end;
128 }
129end:
130 return notification;
131}
132
133struct lttng_notification_channel *lttng_notification_channel_create(
134 struct lttng_endpoint *endpoint)
135{
136 int fd, ret;
137 bool is_in_tracing_group = false, is_root = false;
138 char *sock_path = NULL;
139 struct lttng_notification_channel *channel = NULL;
140
141 if (!endpoint ||
142 endpoint != lttng_session_daemon_notification_endpoint) {
143 goto end;
144 }
145
146 sock_path = (char *) zmalloc(LTTNG_PATH_MAX);
147 if (!sock_path) {
148 goto end;
149 }
150
151 channel = (lttng_notification_channel *) zmalloc(sizeof(struct lttng_notification_channel));
152 if (!channel) {
153 goto end;
154 }
155 channel->socket = -1;
156 pthread_mutex_init(&channel->lock, NULL);
157 lttng_payload_init(&channel->reception_payload);
158 CDS_INIT_LIST_HEAD(&channel->pending_notifications.list);
159
160 is_root = (getuid() == 0);
161 if (!is_root) {
162 is_in_tracing_group = lttng_check_tracing_group();
163 }
164
165 if (is_root || is_in_tracing_group) {
166 ret = lttng_strncpy(sock_path,
167 DEFAULT_GLOBAL_NOTIFICATION_CHANNEL_UNIX_SOCK,
168 LTTNG_PATH_MAX);
169 if (ret) {
170 ret = -LTTNG_ERR_INVALID;
171 goto error;
172 }
173
174 ret = lttcomm_connect_unix_sock(sock_path);
175 if (ret >= 0) {
176 fd = ret;
177 goto set_fd;
178 }
179 }
180
181 /* Fallback to local session daemon. */
182 ret = snprintf(sock_path, LTTNG_PATH_MAX,
183 DEFAULT_HOME_NOTIFICATION_CHANNEL_UNIX_SOCK,
184 utils_get_home_dir());
185 if (ret < 0 || ret >= LTTNG_PATH_MAX) {
186 goto error;
187 }
188
189 ret = lttcomm_connect_unix_sock(sock_path);
190 if (ret < 0) {
191 goto error;
192 }
193 fd = ret;
194
195set_fd:
196 channel->socket = fd;
197
198 ret = handshake(channel);
199 if (ret) {
200 goto error;
201 }
202end:
203 free(sock_path);
204 return channel;
205error:
206 lttng_notification_channel_destroy(channel);
207 channel = NULL;
208 goto end;
209}
210
211enum lttng_notification_channel_status
212lttng_notification_channel_get_next_notification(
213 struct lttng_notification_channel *channel,
214 struct lttng_notification **_notification)
215{
216 int ret;
217 struct lttng_notification *notification = NULL;
218 enum lttng_notification_channel_status status =
219 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
220 struct lttng_poll_event events;
221
222 if (!channel || !_notification) {
223 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
224 goto end;
225 }
226
227 pthread_mutex_lock(&channel->lock);
228
229 if (channel->pending_notifications.count) {
230 struct pending_notification *pending_notification;
231
232 LTTNG_ASSERT(!cds_list_empty(&channel->pending_notifications.list));
233
234 /* Deliver one of the pending notifications. */
235 pending_notification = cds_list_first_entry(
236 &channel->pending_notifications.list,
237 struct pending_notification,
238 node);
239 notification = pending_notification->notification;
240 if (!notification) {
241 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED;
242 }
243 cds_list_del(&pending_notification->node);
244 channel->pending_notifications.count--;
245 free(pending_notification);
246 goto end_unlock;
247 }
248
249 /*
250 * Block on interruptible epoll/poll() instead of the message reception
251 * itself as the recvmsg() wrappers always restart on EINTR. We choose
252 * to wait using interruptible epoll/poll() in order to:
253 * 1) Return if a signal occurs,
254 * 2) Not deal with partially received messages.
255 *
256 * The drawback to this approach is that we assume that messages
257 * are complete/well formed. If a message is shorter than its
258 * announced length, receive_message() will block on recvmsg()
259 * and never return (even if a signal is received).
260 */
261 ret = lttng_poll_create(&events, 1, LTTNG_CLOEXEC);
262 if (ret < 0) {
263 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
264 goto end_unlock;
265 }
266 ret = lttng_poll_add(&events, channel->socket, LPOLLIN | LPOLLERR);
267 if (ret < 0) {
268 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
269 goto end_clean_poll;
270 }
271 ret = lttng_poll_wait_interruptible(&events, -1);
272 if (ret <= 0) {
273 status = (ret == -1 && errno == EINTR) ?
274 LTTNG_NOTIFICATION_CHANNEL_STATUS_INTERRUPTED :
275 LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
276 goto end_clean_poll;
277 }
278
279 ret = receive_message(channel);
280 if (ret) {
281 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
282 goto end_clean_poll;
283 }
284
285 switch (get_current_message_type(channel)) {
286 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION:
287 notification = create_notification_from_current_message(
288 channel);
289 if (!notification) {
290 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
291 goto end_clean_poll;
292 }
293 break;
294 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED:
295 /* No payload to consume. */
296 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED;
297 break;
298 default:
299 /* Protocol error. */
300 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
301 goto end_clean_poll;
302 }
303
304end_clean_poll:
305 lttng_poll_clean(&events);
306end_unlock:
307 pthread_mutex_unlock(&channel->lock);
308 *_notification = notification;
309end:
310 return status;
311}
312
313static
314int enqueue_dropped_notification(
315 struct lttng_notification_channel *channel)
316{
317 int ret = 0;
318 struct pending_notification *pending_notification;
319 struct cds_list_head *last_element =
320 channel->pending_notifications.list.prev;
321
322 pending_notification = caa_container_of(last_element,
323 struct pending_notification, node);
324 if (!pending_notification->notification) {
325 /*
326 * The last enqueued notification indicates dropped
327 * notifications; there is nothing to do as we group
328 * dropped notifications together.
329 */
330 goto end;
331 }
332
333 if (channel->pending_notifications.count >=
334 DEFAULT_CLIENT_MAX_QUEUED_NOTIFICATIONS_COUNT &&
335 pending_notification->notification) {
336 /*
337 * Discard the last enqueued notification to indicate
338 * that notifications were dropped at this point.
339 */
340 lttng_notification_destroy(
341 pending_notification->notification);
342 pending_notification->notification = NULL;
343 goto end;
344 }
345
346 pending_notification = (struct pending_notification *) zmalloc(sizeof(*pending_notification));
347 if (!pending_notification) {
348 ret = -1;
349 goto end;
350 }
351 CDS_INIT_LIST_HEAD(&pending_notification->node);
352 cds_list_add(&pending_notification->node,
353 &channel->pending_notifications.list);
354 channel->pending_notifications.count++;
355end:
356 return ret;
357}
358
359static
360int enqueue_notification_from_current_message(
361 struct lttng_notification_channel *channel)
362{
363 int ret = 0;
364 struct lttng_notification *notification;
365 struct pending_notification *pending_notification;
366
367 if (channel->pending_notifications.count >=
368 DEFAULT_CLIENT_MAX_QUEUED_NOTIFICATIONS_COUNT) {
369 /* Drop the notification. */
370 ret = enqueue_dropped_notification(channel);
371 goto end;
372 }
373
374 pending_notification = (struct pending_notification *) zmalloc(sizeof(*pending_notification));
375 if (!pending_notification) {
376 ret = -1;
377 goto error;
378 }
379 CDS_INIT_LIST_HEAD(&pending_notification->node);
380
381 notification = create_notification_from_current_message(channel);
382 if (!notification) {
383 ret = -1;
384 goto error;
385 }
386
387 pending_notification->notification = notification;
388 cds_list_add(&pending_notification->node,
389 &channel->pending_notifications.list);
390 channel->pending_notifications.count++;
391end:
392 return ret;
393error:
394 free(pending_notification);
395 goto end;
396}
397
398enum lttng_notification_channel_status
399lttng_notification_channel_has_pending_notification(
400 struct lttng_notification_channel *channel,
401 bool *_notification_pending)
402{
403 int ret;
404 enum lttng_notification_channel_status status =
405 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
406 struct lttng_poll_event events;
407
408 if (!channel || !_notification_pending) {
409 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
410 goto end;
411 }
412
413 pthread_mutex_lock(&channel->lock);
414
415 if (channel->pending_notifications.count) {
416 *_notification_pending = true;
417 goto end_unlock;
418 }
419
420 if (channel->socket < 0) {
421 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED;
422 goto end_unlock;
423 }
424
425 /*
426 * Check, without blocking, if data is available on the channel's
427 * socket. If there is data available, it is safe to read (blocking)
428 * on the socket for a message from the session daemon.
429 *
430 * Since all commands wait for the session daemon's reply before
431 * releasing the channel's lock, the protocol only allows for
432 * notifications and "notification dropped" messages to come
433 * through. If we receive a different message type, it is
434 * considered a protocol error.
435 *
436 * Note that this function is not guaranteed not to block. This
437 * will block until our peer (the session daemon) has sent a complete
438 * message if we see data available on the socket. If the peer does
439 * not respect the protocol, this may block indefinitely.
440 */
441 ret = lttng_poll_create(&events, 1, LTTNG_CLOEXEC);
442 if (ret < 0) {
443 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
444 goto end_unlock;
445 }
446 ret = lttng_poll_add(&events, channel->socket, LPOLLIN | LPOLLERR);
447 if (ret < 0) {
448 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
449 goto end_clean_poll;
450 }
451 /* timeout = 0: return immediately. */
452 ret = lttng_poll_wait_interruptible(&events, 0);
453 if (ret == 0) {
454 /* No data available. */
455 *_notification_pending = false;
456 goto end_clean_poll;
457 } else if (ret < 0) {
458 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
459 goto end_clean_poll;
460 }
461
462 /* Data available on socket. */
463 ret = receive_message(channel);
464 if (ret) {
465 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
466 goto end_clean_poll;
467 }
468
469 switch (get_current_message_type(channel)) {
470 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION:
471 ret = enqueue_notification_from_current_message(channel);
472 if (ret) {
473 goto end_clean_poll;
474 }
475 *_notification_pending = true;
476 break;
477 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED:
478 ret = enqueue_dropped_notification(channel);
479 if (ret) {
480 goto end_clean_poll;
481 }
482 *_notification_pending = true;
483 break;
484 default:
485 /* Protocol error. */
486 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
487 goto end_clean_poll;
488 }
489
490end_clean_poll:
491 lttng_poll_clean(&events);
492end_unlock:
493 pthread_mutex_unlock(&channel->lock);
494end:
495 return status;
496}
497
498static
499int receive_command_reply(struct lttng_notification_channel *channel,
500 enum lttng_notification_channel_status *status)
501{
502 int ret;
503 struct lttng_notification_channel_command_reply *reply;
504
505 while (true) {
506 enum lttng_notification_channel_message_type msg_type;
507
508 ret = receive_message(channel);
509 if (ret) {
510 goto end;
511 }
512
513 msg_type = get_current_message_type(channel);
514 switch (msg_type) {
515 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY:
516 goto exit_loop;
517 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION:
518 ret = enqueue_notification_from_current_message(
519 channel);
520 if (ret) {
521 goto end;
522 }
523 break;
524 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED:
525 ret = enqueue_dropped_notification(channel);
526 if (ret) {
527 goto end;
528 }
529 break;
530 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
531 {
532 struct lttng_notification_channel_command_handshake *handshake;
533
534 handshake = (struct lttng_notification_channel_command_handshake *)
535 (channel->reception_payload.buffer.data +
536 sizeof(struct lttng_notification_channel_message));
537 channel->version.major = handshake->major;
538 channel->version.minor = handshake->minor;
539 channel->version.set = true;
540 break;
541 }
542 default:
543 ret = -1;
544 goto end;
545 }
546 }
547
548exit_loop:
549 if (channel->reception_payload.buffer.size <
550 (sizeof(struct lttng_notification_channel_message) +
551 sizeof(*reply))) {
552 /* Invalid message received. */
553 ret = -1;
554 goto end;
555 }
556
557 reply = (struct lttng_notification_channel_command_reply *)
558 (channel->reception_payload.buffer.data +
559 sizeof(struct lttng_notification_channel_message));
560 *status = (enum lttng_notification_channel_status) reply->status;
561end:
562 return ret;
563}
564
565static
566int handshake(struct lttng_notification_channel *channel)
567{
568 ssize_t ret;
569 enum lttng_notification_channel_status status =
570 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
571 struct lttng_notification_channel_command_handshake handshake = {
572 .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
573 .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
574 };
575 struct lttng_notification_channel_message msg_header = {
576 .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE,
577 .size = sizeof(handshake),
578 .fds = 0,
579 };
580 char send_buffer[sizeof(msg_header) + sizeof(handshake)];
581
582 memcpy(send_buffer, &msg_header, sizeof(msg_header));
583 memcpy(send_buffer + sizeof(msg_header), &handshake, sizeof(handshake));
584
585 pthread_mutex_lock(&channel->lock);
586
587 ret = lttcomm_send_creds_unix_sock(channel->socket, send_buffer,
588 sizeof(send_buffer));
589 if (ret < 0) {
590 goto end_unlock;
591 }
592
593 /* Receive handshake info from the sessiond. */
594 ret = receive_command_reply(channel, &status);
595 if (ret < 0) {
596 goto end_unlock;
597 }
598
599 if (!channel->version.set) {
600 ret = -1;
601 goto end_unlock;
602 }
603
604 if (channel->version.major != LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) {
605 ret = -1;
606 goto end_unlock;
607 }
608
609end_unlock:
610 pthread_mutex_unlock(&channel->lock);
611 return ret;
612}
613
614static
615enum lttng_notification_channel_status send_condition_command(
616 struct lttng_notification_channel *channel,
617 enum lttng_notification_channel_message_type type,
618 const struct lttng_condition *condition)
619{
620 int socket;
621 ssize_t ret;
622 enum lttng_notification_channel_status status =
623 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
624 struct lttng_payload payload;
625 struct lttng_notification_channel_message cmd_header = {
626 .type = (int8_t) type,
627 .size =0,
628 .fds = 0,
629 };
630
631 lttng_payload_init(&payload);
632
633 if (!channel) {
634 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
635 goto end;
636 }
637
638 LTTNG_ASSERT(type == LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE ||
639 type == LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE);
640
641 pthread_mutex_lock(&channel->lock);
642 socket = channel->socket;
643
644 if (!lttng_condition_validate(condition)) {
645 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
646 goto end_unlock;
647 }
648
649 ret = lttng_dynamic_buffer_append(&payload.buffer, &cmd_header,
650 sizeof(cmd_header));
651 if (ret) {
652 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
653 goto end_unlock;
654 }
655
656 ret = lttng_condition_serialize(condition, &payload);
657 if (ret) {
658 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
659 goto end_unlock;
660 }
661
662 /* Update payload length. */
663 ((struct lttng_notification_channel_message *) payload.buffer.data)->size =
664 (uint32_t) (payload.buffer.size - sizeof(cmd_header));
665
666 {
667 struct lttng_payload_view pv =
668 lttng_payload_view_from_payload(
669 &payload, 0, -1);
670 const int fd_count =
671 lttng_payload_view_get_fd_handle_count(&pv);
672
673 /* Update fd count. */
674 ((struct lttng_notification_channel_message *) payload.buffer.data)->fds =
675 (uint32_t) fd_count;
676
677 ret = lttcomm_send_unix_sock(
678 socket, pv.buffer.data, pv.buffer.size);
679 if (ret < 0) {
680 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
681 goto end_unlock;
682 }
683
684 /* Pass fds if present. */
685 if (fd_count > 0) {
686 ret = lttcomm_send_payload_view_fds_unix_sock(socket,
687 &pv);
688 if (ret < 0) {
689 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
690 goto end_unlock;
691 }
692 }
693 }
694
695 ret = receive_command_reply(channel, &status);
696 if (ret < 0) {
697 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
698 goto end_unlock;
699 }
700end_unlock:
701 pthread_mutex_unlock(&channel->lock);
702end:
703 lttng_payload_reset(&payload);
704 return status;
705}
706
707enum lttng_notification_channel_status lttng_notification_channel_subscribe(
708 struct lttng_notification_channel *channel,
709 const struct lttng_condition *condition)
710{
711 return send_condition_command(channel,
712 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE,
713 condition);
714}
715
716enum lttng_notification_channel_status lttng_notification_channel_unsubscribe(
717 struct lttng_notification_channel *channel,
718 const struct lttng_condition *condition)
719{
720 return send_condition_command(channel,
721 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE,
722 condition);
723}
724
725void lttng_notification_channel_destroy(
726 struct lttng_notification_channel *channel)
727{
728 if (!channel) {
729 return;
730 }
731
732 if (channel->socket >= 0) {
733 (void) lttcomm_close_unix_sock(channel->socket);
734 }
735 pthread_mutex_destroy(&channel->lock);
736 lttng_payload_reset(&channel->reception_payload);
737 free(channel);
738}
This page took 0.024552 seconds and 4 git commands to generate.