Clean-up: apply suggested clang-tidy fixes
[lttng-tools.git] / src / lib / lttng-ctl / channel.cpp
1 /*
2 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * SPDX-License-Identifier: LGPL-2.1-only
5 *
6 */
7
8 #include "lttng-ctl-helper.hpp"
9
10 #include <common/compat/poll.hpp>
11 #include <common/defaults.hpp>
12 #include <common/dynamic-buffer.hpp>
13 #include <common/error.hpp>
14 #include <common/payload-view.hpp>
15 #include <common/payload.hpp>
16 #include <common/unix.hpp>
17 #include <common/utils.hpp>
18
19 #include <lttng/condition/condition-internal.hpp>
20 #include <lttng/endpoint.h>
21 #include <lttng/notification/channel-internal.hpp>
22 #include <lttng/notification/notification-internal.hpp>
23
24 static int handshake(struct lttng_notification_channel *channel);
25
26 /*
27 * Populates the reception buffer with the next complete message.
28 * The caller must acquire the channel's lock.
29 */
30 static int receive_message(struct lttng_notification_channel *channel)
31 {
32 ssize_t ret;
33 struct lttng_notification_channel_message msg;
34
35 lttng_payload_clear(&channel->reception_payload);
36
37 ret = lttcomm_recv_unix_sock(channel->socket, &msg, sizeof(msg));
38 if (ret <= 0) {
39 ret = -1;
40 goto error;
41 }
42
43 if (msg.size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) {
44 ret = -1;
45 goto error;
46 }
47
48 /* Add message header at buffer's start. */
49 ret = lttng_dynamic_buffer_append(&channel->reception_payload.buffer, &msg, sizeof(msg));
50 if (ret) {
51 goto error;
52 }
53
54 if (msg.size == 0) {
55 goto skip_payload;
56 }
57
58 /* Reserve space for the payload. */
59 ret = lttng_dynamic_buffer_set_size(&channel->reception_payload.buffer,
60 channel->reception_payload.buffer.size + msg.size);
61 if (ret) {
62 goto error;
63 }
64
65 /* Receive message payload. */
66 ret = lttcomm_recv_unix_sock(
67 channel->socket, channel->reception_payload.buffer.data + sizeof(msg), msg.size);
68 if (ret < (ssize_t) msg.size) {
69 ret = -1;
70 goto error;
71 }
72
73 skip_payload:
74 /* Receive message fds. */
75 if (msg.fds != 0) {
76 ret = lttcomm_recv_payload_fds_unix_sock(
77 channel->socket, msg.fds, &channel->reception_payload);
78 if (ret < sizeof(int) * msg.fds) {
79 ret = -1;
80 goto error;
81 }
82 }
83 ret = 0;
84 end:
85 return ret;
86 error:
87 lttng_payload_clear(&channel->reception_payload);
88 goto end;
89 }
90
91 static enum lttng_notification_channel_message_type
92 get_current_message_type(struct lttng_notification_channel *channel)
93 {
94 struct lttng_notification_channel_message *msg;
95
96 LTTNG_ASSERT(channel->reception_payload.buffer.size >= sizeof(*msg));
97
98 msg = (struct lttng_notification_channel_message *) channel->reception_payload.buffer.data;
99 return (enum lttng_notification_channel_message_type) msg->type;
100 }
101
102 static struct lttng_notification *
103 create_notification_from_current_message(struct lttng_notification_channel *channel)
104 {
105 ssize_t ret;
106 struct lttng_notification *notification = nullptr;
107
108 if (channel->reception_payload.buffer.size <=
109 sizeof(struct lttng_notification_channel_message)) {
110 goto end;
111 }
112
113 {
114 struct lttng_payload_view view = lttng_payload_view_from_payload(
115 &channel->reception_payload,
116 sizeof(struct lttng_notification_channel_message),
117 -1);
118
119 ret = lttng_notification_create_from_payload(&view, &notification);
120 }
121
122 if (ret !=
123 channel->reception_payload.buffer.size -
124 sizeof(struct lttng_notification_channel_message)) {
125 lttng_notification_destroy(notification);
126 notification = nullptr;
127 goto end;
128 }
129 end:
130 return notification;
131 }
132
133 struct lttng_notification_channel *
134 lttng_notification_channel_create(struct lttng_endpoint *endpoint)
135 {
136 int fd, ret;
137 bool is_in_tracing_group = false, is_root = false;
138 char *sock_path = nullptr;
139 struct lttng_notification_channel *channel = nullptr;
140
141 if (!endpoint || endpoint != lttng_session_daemon_notification_endpoint) {
142 goto end;
143 }
144
145 sock_path = calloc<char>(LTTNG_PATH_MAX);
146 if (!sock_path) {
147 goto end;
148 }
149
150 channel = zmalloc<lttng_notification_channel>();
151 if (!channel) {
152 goto end;
153 }
154 channel->socket = -1;
155 pthread_mutex_init(&channel->lock, nullptr);
156 lttng_payload_init(&channel->reception_payload);
157 CDS_INIT_LIST_HEAD(&channel->pending_notifications.list);
158
159 is_root = (getuid() == 0);
160 if (!is_root) {
161 is_in_tracing_group = lttng_check_tracing_group();
162 }
163
164 if (is_root || is_in_tracing_group) {
165 ret = lttng_strncpy(
166 sock_path, DEFAULT_GLOBAL_NOTIFICATION_CHANNEL_UNIX_SOCK, LTTNG_PATH_MAX);
167 if (ret) {
168 ret = -LTTNG_ERR_INVALID;
169 goto error;
170 }
171
172 ret = lttcomm_connect_unix_sock(sock_path);
173 if (ret >= 0) {
174 fd = ret;
175 goto set_fd;
176 }
177 }
178
179 /* Fallback to local session daemon. */
180 ret = snprintf(sock_path,
181 LTTNG_PATH_MAX,
182 DEFAULT_HOME_NOTIFICATION_CHANNEL_UNIX_SOCK,
183 utils_get_home_dir());
184 if (ret < 0 || ret >= LTTNG_PATH_MAX) {
185 goto error;
186 }
187
188 ret = lttcomm_connect_unix_sock(sock_path);
189 if (ret < 0) {
190 goto error;
191 }
192 fd = ret;
193
194 set_fd:
195 channel->socket = fd;
196
197 ret = handshake(channel);
198 if (ret) {
199 goto error;
200 }
201 end:
202 free(sock_path);
203 return channel;
204 error:
205 lttng_notification_channel_destroy(channel);
206 channel = nullptr;
207 goto end;
208 }
209
210 enum lttng_notification_channel_status
211 lttng_notification_channel_get_next_notification(struct lttng_notification_channel *channel,
212 struct lttng_notification **_notification)
213 {
214 int ret;
215 struct lttng_notification *notification = nullptr;
216 enum lttng_notification_channel_status status = LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
217 struct lttng_poll_event events;
218
219 if (!channel || !_notification) {
220 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
221 goto end;
222 }
223
224 pthread_mutex_lock(&channel->lock);
225
226 if (channel->pending_notifications.count) {
227 struct pending_notification *pending_notification;
228
229 LTTNG_ASSERT(!cds_list_empty(&channel->pending_notifications.list));
230
231 /* Deliver one of the pending notifications. */
232 pending_notification = cds_list_first_entry(
233 &channel->pending_notifications.list, struct pending_notification, node);
234 notification = pending_notification->notification;
235 if (!notification) {
236 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED;
237 }
238 cds_list_del(&pending_notification->node);
239 channel->pending_notifications.count--;
240 free(pending_notification);
241 goto end_unlock;
242 }
243
244 /*
245 * Block on interruptible epoll/poll() instead of the message reception
246 * itself as the recvmsg() wrappers always restart on EINTR. We choose
247 * to wait using interruptible epoll/poll() in order to:
248 * 1) Return if a signal occurs,
249 * 2) Not deal with partially received messages.
250 *
251 * The drawback to this approach is that we assume that messages
252 * are complete/well formed. If a message is shorter than its
253 * announced length, receive_message() will block on recvmsg()
254 * and never return (even if a signal is received).
255 */
256 ret = lttng_poll_create(&events, 1, LTTNG_CLOEXEC);
257 if (ret < 0) {
258 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
259 goto end_unlock;
260 }
261 ret = lttng_poll_add(&events, channel->socket, LPOLLIN);
262 if (ret < 0) {
263 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
264 goto end_clean_poll;
265 }
266 ret = lttng_poll_wait_interruptible(&events, -1);
267 if (ret <= 0) {
268 status = (ret == -1 && errno == EINTR) ?
269 LTTNG_NOTIFICATION_CHANNEL_STATUS_INTERRUPTED :
270 LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
271 goto end_clean_poll;
272 }
273
274 ret = receive_message(channel);
275 if (ret) {
276 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
277 goto end_clean_poll;
278 }
279
280 switch (get_current_message_type(channel)) {
281 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION:
282 notification = create_notification_from_current_message(channel);
283 if (!notification) {
284 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
285 goto end_clean_poll;
286 }
287 break;
288 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED:
289 /* No payload to consume. */
290 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED;
291 break;
292 default:
293 /* Protocol error. */
294 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
295 goto end_clean_poll;
296 }
297
298 end_clean_poll:
299 lttng_poll_clean(&events);
300 end_unlock:
301 pthread_mutex_unlock(&channel->lock);
302 *_notification = notification;
303 end:
304 return status;
305 }
306
307 static int enqueue_dropped_notification(struct lttng_notification_channel *channel)
308 {
309 int ret = 0;
310 struct pending_notification *pending_notification;
311 struct cds_list_head *last_element = channel->pending_notifications.list.prev;
312
313 pending_notification = caa_container_of(last_element, struct pending_notification, node);
314 if (!pending_notification->notification) {
315 /*
316 * The last enqueued notification indicates dropped
317 * notifications; there is nothing to do as we group
318 * dropped notifications together.
319 */
320 goto end;
321 }
322
323 if (channel->pending_notifications.count >= DEFAULT_CLIENT_MAX_QUEUED_NOTIFICATIONS_COUNT &&
324 pending_notification->notification) {
325 /*
326 * Discard the last enqueued notification to indicate
327 * that notifications were dropped at this point.
328 */
329 lttng_notification_destroy(pending_notification->notification);
330 pending_notification->notification = nullptr;
331 goto end;
332 }
333
334 pending_notification = zmalloc<struct pending_notification>();
335 if (!pending_notification) {
336 ret = -1;
337 goto end;
338 }
339 CDS_INIT_LIST_HEAD(&pending_notification->node);
340 cds_list_add(&pending_notification->node, &channel->pending_notifications.list);
341 channel->pending_notifications.count++;
342 end:
343 return ret;
344 }
345
346 static int enqueue_notification_from_current_message(struct lttng_notification_channel *channel)
347 {
348 int ret = 0;
349 struct lttng_notification *notification;
350 struct pending_notification *pending_notification;
351
352 if (channel->pending_notifications.count >= DEFAULT_CLIENT_MAX_QUEUED_NOTIFICATIONS_COUNT) {
353 /* Drop the notification. */
354 ret = enqueue_dropped_notification(channel);
355 goto end;
356 }
357
358 pending_notification = zmalloc<struct pending_notification>();
359 if (!pending_notification) {
360 ret = -1;
361 goto error;
362 }
363 CDS_INIT_LIST_HEAD(&pending_notification->node);
364
365 notification = create_notification_from_current_message(channel);
366 if (!notification) {
367 ret = -1;
368 goto error;
369 }
370
371 pending_notification->notification = notification;
372 cds_list_add(&pending_notification->node, &channel->pending_notifications.list);
373 channel->pending_notifications.count++;
374 end:
375 return ret;
376 error:
377 free(pending_notification);
378 goto end;
379 }
380
381 enum lttng_notification_channel_status
382 lttng_notification_channel_has_pending_notification(struct lttng_notification_channel *channel,
383 bool *_notification_pending)
384 {
385 int ret;
386 enum lttng_notification_channel_status status = LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
387 struct lttng_poll_event events;
388
389 if (!channel || !_notification_pending) {
390 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
391 goto end;
392 }
393
394 pthread_mutex_lock(&channel->lock);
395
396 if (channel->pending_notifications.count) {
397 *_notification_pending = true;
398 goto end_unlock;
399 }
400
401 if (channel->socket < 0) {
402 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED;
403 goto end_unlock;
404 }
405
406 /*
407 * Check, without blocking, if data is available on the channel's
408 * socket. If there is data available, it is safe to read (blocking)
409 * on the socket for a message from the session daemon.
410 *
411 * Since all commands wait for the session daemon's reply before
412 * releasing the channel's lock, the protocol only allows for
413 * notifications and "notification dropped" messages to come
414 * through. If we receive a different message type, it is
415 * considered a protocol error.
416 *
417 * Note that this function is not guaranteed not to block. This
418 * will block until our peer (the session daemon) has sent a complete
419 * message if we see data available on the socket. If the peer does
420 * not respect the protocol, this may block indefinitely.
421 */
422 ret = lttng_poll_create(&events, 1, LTTNG_CLOEXEC);
423 if (ret < 0) {
424 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
425 goto end_unlock;
426 }
427 ret = lttng_poll_add(&events, channel->socket, LPOLLIN);
428 if (ret < 0) {
429 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
430 goto end_clean_poll;
431 }
432 /* timeout = 0: return immediately. */
433 ret = lttng_poll_wait_interruptible(&events, 0);
434 if (ret == 0) {
435 /* No data available. */
436 *_notification_pending = false;
437 goto end_clean_poll;
438 } else if (ret < 0) {
439 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
440 goto end_clean_poll;
441 }
442
443 /* Data available on socket. */
444 ret = receive_message(channel);
445 if (ret) {
446 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
447 goto end_clean_poll;
448 }
449
450 switch (get_current_message_type(channel)) {
451 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION:
452 ret = enqueue_notification_from_current_message(channel);
453 if (ret) {
454 goto end_clean_poll;
455 }
456 *_notification_pending = true;
457 break;
458 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED:
459 ret = enqueue_dropped_notification(channel);
460 if (ret) {
461 goto end_clean_poll;
462 }
463 *_notification_pending = true;
464 break;
465 default:
466 /* Protocol error. */
467 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
468 goto end_clean_poll;
469 }
470
471 end_clean_poll:
472 lttng_poll_clean(&events);
473 end_unlock:
474 pthread_mutex_unlock(&channel->lock);
475 end:
476 return status;
477 }
478
479 static int receive_command_reply(struct lttng_notification_channel *channel,
480 enum lttng_notification_channel_status *status)
481 {
482 int ret;
483 struct lttng_notification_channel_command_reply *reply;
484
485 while (true) {
486 enum lttng_notification_channel_message_type msg_type;
487
488 ret = receive_message(channel);
489 if (ret) {
490 goto end;
491 }
492
493 msg_type = get_current_message_type(channel);
494 switch (msg_type) {
495 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY:
496 goto exit_loop;
497 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION:
498 ret = enqueue_notification_from_current_message(channel);
499 if (ret) {
500 goto end;
501 }
502 break;
503 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED:
504 ret = enqueue_dropped_notification(channel);
505 if (ret) {
506 goto end;
507 }
508 break;
509 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
510 {
511 struct lttng_notification_channel_command_handshake *handshake;
512
513 handshake = (struct lttng_notification_channel_command_handshake
514 *) (channel->reception_payload.buffer.data +
515 sizeof(struct lttng_notification_channel_message));
516 channel->version.major = handshake->major;
517 channel->version.minor = handshake->minor;
518 channel->version.set = true;
519 break;
520 }
521 default:
522 ret = -1;
523 goto end;
524 }
525 }
526
527 exit_loop:
528 if (channel->reception_payload.buffer.size <
529 (sizeof(struct lttng_notification_channel_message) + sizeof(*reply))) {
530 /* Invalid message received. */
531 ret = -1;
532 goto end;
533 }
534
535 reply = (struct lttng_notification_channel_command_reply
536 *) (channel->reception_payload.buffer.data +
537 sizeof(struct lttng_notification_channel_message));
538 *status = (enum lttng_notification_channel_status) reply->status;
539 end:
540 return ret;
541 }
542
543 static int handshake(struct lttng_notification_channel *channel)
544 {
545 ssize_t ret;
546 enum lttng_notification_channel_status status = LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
547 struct lttng_notification_channel_command_handshake handshake = {
548 .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
549 .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
550 };
551 struct lttng_notification_channel_message msg_header;
552 char send_buffer[sizeof(msg_header) + sizeof(handshake)];
553
554 msg_header.type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE;
555 msg_header.size = sizeof(handshake);
556 msg_header.fds = 0;
557
558 memcpy(send_buffer, &msg_header, sizeof(msg_header));
559 memcpy(send_buffer + sizeof(msg_header), &handshake, sizeof(handshake));
560
561 pthread_mutex_lock(&channel->lock);
562
563 ret = lttcomm_send_creds_unix_sock(channel->socket, send_buffer, sizeof(send_buffer));
564 if (ret < 0) {
565 goto end_unlock;
566 }
567
568 /* Receive handshake info from the sessiond. */
569 ret = receive_command_reply(channel, &status);
570 if (ret < 0) {
571 goto end_unlock;
572 }
573
574 if (!channel->version.set) {
575 ret = -1;
576 goto end_unlock;
577 }
578
579 if (channel->version.major != LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) {
580 ret = -1;
581 goto end_unlock;
582 }
583
584 end_unlock:
585 pthread_mutex_unlock(&channel->lock);
586 return ret;
587 }
588
589 static enum lttng_notification_channel_status
590 send_condition_command(struct lttng_notification_channel *channel,
591 enum lttng_notification_channel_message_type type,
592 const struct lttng_condition *condition)
593 {
594 int socket;
595 ssize_t ret;
596 enum lttng_notification_channel_status status = LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
597 struct lttng_payload payload;
598 struct lttng_notification_channel_message cmd_header;
599
600 cmd_header.type = (int8_t) type;
601 cmd_header.size = 0;
602 cmd_header.fds = 0;
603
604 lttng_payload_init(&payload);
605
606 if (!channel) {
607 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
608 goto end;
609 }
610
611 LTTNG_ASSERT(type == LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE ||
612 type == LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE);
613
614 pthread_mutex_lock(&channel->lock);
615 socket = channel->socket;
616
617 if (!lttng_condition_validate(condition)) {
618 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
619 goto end_unlock;
620 }
621
622 ret = lttng_dynamic_buffer_append(&payload.buffer, &cmd_header, sizeof(cmd_header));
623 if (ret) {
624 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
625 goto end_unlock;
626 }
627
628 ret = lttng_condition_serialize(condition, &payload);
629 if (ret) {
630 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
631 goto end_unlock;
632 }
633
634 /* Update payload length. */
635 ((struct lttng_notification_channel_message *) payload.buffer.data)->size =
636 (uint32_t) (payload.buffer.size - sizeof(cmd_header));
637
638 {
639 struct lttng_payload_view pv = lttng_payload_view_from_payload(&payload, 0, -1);
640 const int fd_count = lttng_payload_view_get_fd_handle_count(&pv);
641
642 /* Update fd count. */
643 ((struct lttng_notification_channel_message *) payload.buffer.data)->fds =
644 (uint32_t) fd_count;
645
646 ret = lttcomm_send_unix_sock(socket, pv.buffer.data, pv.buffer.size);
647 if (ret < 0) {
648 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
649 goto end_unlock;
650 }
651
652 /* Pass fds if present. */
653 if (fd_count > 0) {
654 ret = lttcomm_send_payload_view_fds_unix_sock(socket, &pv);
655 if (ret < 0) {
656 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
657 goto end_unlock;
658 }
659 }
660 }
661
662 ret = receive_command_reply(channel, &status);
663 if (ret < 0) {
664 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
665 goto end_unlock;
666 }
667 end_unlock:
668 pthread_mutex_unlock(&channel->lock);
669 end:
670 lttng_payload_reset(&payload);
671 return status;
672 }
673
674 enum lttng_notification_channel_status
675 lttng_notification_channel_subscribe(struct lttng_notification_channel *channel,
676 const struct lttng_condition *condition)
677 {
678 return send_condition_command(
679 channel, LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE, condition);
680 }
681
682 enum lttng_notification_channel_status
683 lttng_notification_channel_unsubscribe(struct lttng_notification_channel *channel,
684 const struct lttng_condition *condition)
685 {
686 return send_condition_command(
687 channel, LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE, condition);
688 }
689
690 void lttng_notification_channel_destroy(struct lttng_notification_channel *channel)
691 {
692 if (!channel) {
693 return;
694 }
695
696 if (channel->socket >= 0) {
697 (void) lttcomm_close_unix_sock(channel->socket);
698 }
699 pthread_mutex_destroy(&channel->lock);
700 lttng_payload_reset(&channel->reception_payload);
701 free(channel);
702 }
This page took 0.097981 seconds and 4 git commands to generate.