Fix: notification: assert on len > 0 for dropped notification message
[lttng-tools.git] / src / lib / lttng-ctl / channel.c
1 /*
2 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * SPDX-License-Identifier: LGPL-2.1-only
5 *
6 */
7
8 #include <lttng/notification/notification-internal.h>
9 #include <lttng/notification/channel-internal.h>
10 #include <lttng/condition/condition-internal.h>
11 #include <lttng/endpoint.h>
12 #include <common/defaults.h>
13 #include <common/error.h>
14 #include <common/dynamic-buffer.h>
15 #include <common/utils.h>
16 #include <common/defaults.h>
17 #include <common/payload.h>
18 #include <common/payload-view.h>
19 #include <common/unix.h>
20 #include <assert.h>
21 #include "lttng-ctl-helper.h"
22 #include <common/compat/poll.h>
23
24 static
25 int handshake(struct lttng_notification_channel *channel);
26
27 /*
28 * Populates the reception buffer with the next complete message.
29 * The caller must acquire the channel's lock.
30 */
31 static
32 int receive_message(struct lttng_notification_channel *channel)
33 {
34 ssize_t ret;
35 struct lttng_notification_channel_message msg;
36
37 lttng_payload_clear(&channel->reception_payload);
38
39 ret = lttcomm_recv_unix_sock(channel->socket, &msg, sizeof(msg));
40 if (ret <= 0) {
41 ret = -1;
42 goto error;
43 }
44
45 if (msg.size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) {
46 ret = -1;
47 goto error;
48 }
49
50 /* Add message header at buffer's start. */
51 ret = lttng_dynamic_buffer_append(&channel->reception_payload.buffer, &msg,
52 sizeof(msg));
53 if (ret) {
54 goto error;
55 }
56
57 if (msg.size == 0) {
58 goto skip_payload;
59 }
60
61 /* Reserve space for the payload. */
62 ret = lttng_dynamic_buffer_set_size(&channel->reception_payload.buffer,
63 channel->reception_payload.buffer.size + msg.size);
64 if (ret) {
65 goto error;
66 }
67
68 /* Receive message payload. */
69 ret = lttcomm_recv_unix_sock(channel->socket,
70 channel->reception_payload.buffer.data + sizeof(msg), msg.size);
71 if (ret < (ssize_t) msg.size) {
72 ret = -1;
73 goto error;
74 }
75
76 skip_payload:
77 /* Receive message fds. */
78 if (msg.fds != 0) {
79 ret = lttcomm_recv_payload_fds_unix_sock(channel->socket,
80 msg.fds, &channel->reception_payload);
81 if (ret < sizeof(int) * msg.fds) {
82 ret = -1;
83 goto error;
84 }
85 }
86 ret = 0;
87 end:
88 return ret;
89 error:
90 lttng_payload_clear(&channel->reception_payload);
91 goto end;
92 }
93
94 static
95 enum lttng_notification_channel_message_type get_current_message_type(
96 struct lttng_notification_channel *channel)
97 {
98 struct lttng_notification_channel_message *msg;
99
100 assert(channel->reception_payload.buffer.size >= sizeof(*msg));
101
102 msg = (struct lttng_notification_channel_message *)
103 channel->reception_payload.buffer.data;
104 return (enum lttng_notification_channel_message_type) msg->type;
105 }
106
107 static
108 struct lttng_notification *create_notification_from_current_message(
109 struct lttng_notification_channel *channel)
110 {
111 ssize_t ret;
112 struct lttng_notification *notification = NULL;
113
114 if (channel->reception_payload.buffer.size <=
115 sizeof(struct lttng_notification_channel_message)) {
116 goto end;
117 }
118
119 {
120 struct lttng_payload_view view = lttng_payload_view_from_payload(
121 &channel->reception_payload,
122 sizeof(struct lttng_notification_channel_message),
123 -1);
124
125 ret = lttng_notification_create_from_payload(
126 &view, &notification);
127 }
128
129 if (ret != channel->reception_payload.buffer.size -
130 sizeof(struct lttng_notification_channel_message)) {
131 lttng_notification_destroy(notification);
132 notification = NULL;
133 goto end;
134 }
135 end:
136 return notification;
137 }
138
139 struct lttng_notification_channel *lttng_notification_channel_create(
140 struct lttng_endpoint *endpoint)
141 {
142 int fd, ret;
143 bool is_in_tracing_group = false, is_root = false;
144 char *sock_path = NULL;
145 struct lttng_notification_channel *channel = NULL;
146
147 if (!endpoint ||
148 endpoint != lttng_session_daemon_notification_endpoint) {
149 goto end;
150 }
151
152 sock_path = zmalloc(LTTNG_PATH_MAX);
153 if (!sock_path) {
154 goto end;
155 }
156
157 channel = zmalloc(sizeof(struct lttng_notification_channel));
158 if (!channel) {
159 goto end;
160 }
161 channel->socket = -1;
162 pthread_mutex_init(&channel->lock, NULL);
163 lttng_payload_init(&channel->reception_payload);
164 CDS_INIT_LIST_HEAD(&channel->pending_notifications.list);
165
166 is_root = (getuid() == 0);
167 if (!is_root) {
168 is_in_tracing_group = lttng_check_tracing_group();
169 }
170
171 if (is_root || is_in_tracing_group) {
172 ret = lttng_strncpy(sock_path,
173 DEFAULT_GLOBAL_NOTIFICATION_CHANNEL_UNIX_SOCK,
174 LTTNG_PATH_MAX);
175 if (ret) {
176 ret = -LTTNG_ERR_INVALID;
177 goto error;
178 }
179
180 ret = lttcomm_connect_unix_sock(sock_path);
181 if (ret >= 0) {
182 fd = ret;
183 goto set_fd;
184 }
185 }
186
187 /* Fallback to local session daemon. */
188 ret = snprintf(sock_path, LTTNG_PATH_MAX,
189 DEFAULT_HOME_NOTIFICATION_CHANNEL_UNIX_SOCK,
190 utils_get_home_dir());
191 if (ret < 0 || ret >= LTTNG_PATH_MAX) {
192 goto error;
193 }
194
195 ret = lttcomm_connect_unix_sock(sock_path);
196 if (ret < 0) {
197 goto error;
198 }
199 fd = ret;
200
201 set_fd:
202 channel->socket = fd;
203
204 ret = handshake(channel);
205 if (ret) {
206 goto error;
207 }
208 end:
209 free(sock_path);
210 return channel;
211 error:
212 lttng_notification_channel_destroy(channel);
213 channel = NULL;
214 goto end;
215 }
216
217 enum lttng_notification_channel_status
218 lttng_notification_channel_get_next_notification(
219 struct lttng_notification_channel *channel,
220 struct lttng_notification **_notification)
221 {
222 int ret;
223 struct lttng_notification *notification = NULL;
224 enum lttng_notification_channel_status status =
225 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
226 struct lttng_poll_event events;
227
228 if (!channel || !_notification) {
229 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
230 goto end;
231 }
232
233 pthread_mutex_lock(&channel->lock);
234
235 if (channel->pending_notifications.count) {
236 struct pending_notification *pending_notification;
237
238 assert(!cds_list_empty(&channel->pending_notifications.list));
239
240 /* Deliver one of the pending notifications. */
241 pending_notification = cds_list_first_entry(
242 &channel->pending_notifications.list,
243 struct pending_notification,
244 node);
245 notification = pending_notification->notification;
246 if (!notification) {
247 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED;
248 }
249 cds_list_del(&pending_notification->node);
250 channel->pending_notifications.count--;
251 free(pending_notification);
252 goto end_unlock;
253 }
254
255 /*
256 * Block on interruptible epoll/poll() instead of the message reception
257 * itself as the recvmsg() wrappers always restart on EINTR. We choose
258 * to wait using interruptible epoll/poll() in order to:
259 * 1) Return if a signal occurs,
260 * 2) Not deal with partially received messages.
261 *
262 * The drawback to this approach is that we assume that messages
263 * are complete/well formed. If a message is shorter than its
264 * announced length, receive_message() will block on recvmsg()
265 * and never return (even if a signal is received).
266 */
267 ret = lttng_poll_create(&events, 1, LTTNG_CLOEXEC);
268 if (ret < 0) {
269 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
270 goto end_unlock;
271 }
272 ret = lttng_poll_add(&events, channel->socket, LPOLLIN | LPOLLERR);
273 if (ret < 0) {
274 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
275 goto end_clean_poll;
276 }
277 ret = lttng_poll_wait_interruptible(&events, -1);
278 if (ret <= 0) {
279 status = (ret == -1 && errno == EINTR) ?
280 LTTNG_NOTIFICATION_CHANNEL_STATUS_INTERRUPTED :
281 LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
282 goto end_clean_poll;
283 }
284
285 ret = receive_message(channel);
286 if (ret) {
287 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
288 goto end_clean_poll;
289 }
290
291 switch (get_current_message_type(channel)) {
292 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION:
293 notification = create_notification_from_current_message(
294 channel);
295 if (!notification) {
296 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
297 goto end_clean_poll;
298 }
299 break;
300 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED:
301 /* No payload to consume. */
302 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED;
303 break;
304 default:
305 /* Protocol error. */
306 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
307 goto end_clean_poll;
308 }
309
310 end_clean_poll:
311 lttng_poll_clean(&events);
312 end_unlock:
313 pthread_mutex_unlock(&channel->lock);
314 *_notification = notification;
315 end:
316 return status;
317 }
318
319 static
320 int enqueue_dropped_notification(
321 struct lttng_notification_channel *channel)
322 {
323 int ret = 0;
324 struct pending_notification *pending_notification;
325 struct cds_list_head *last_element =
326 channel->pending_notifications.list.prev;
327
328 pending_notification = caa_container_of(last_element,
329 struct pending_notification, node);
330 if (!pending_notification->notification) {
331 /*
332 * The last enqueued notification indicates dropped
333 * notifications; there is nothing to do as we group
334 * dropped notifications together.
335 */
336 goto end;
337 }
338
339 if (channel->pending_notifications.count >=
340 DEFAULT_CLIENT_MAX_QUEUED_NOTIFICATIONS_COUNT &&
341 pending_notification->notification) {
342 /*
343 * Discard the last enqueued notification to indicate
344 * that notifications were dropped at this point.
345 */
346 lttng_notification_destroy(
347 pending_notification->notification);
348 pending_notification->notification = NULL;
349 goto end;
350 }
351
352 pending_notification = zmalloc(sizeof(*pending_notification));
353 if (!pending_notification) {
354 ret = -1;
355 goto end;
356 }
357 CDS_INIT_LIST_HEAD(&pending_notification->node);
358 cds_list_add(&pending_notification->node,
359 &channel->pending_notifications.list);
360 channel->pending_notifications.count++;
361 end:
362 return ret;
363 }
364
365 static
366 int enqueue_notification_from_current_message(
367 struct lttng_notification_channel *channel)
368 {
369 int ret = 0;
370 struct lttng_notification *notification;
371 struct pending_notification *pending_notification;
372
373 if (channel->pending_notifications.count >=
374 DEFAULT_CLIENT_MAX_QUEUED_NOTIFICATIONS_COUNT) {
375 /* Drop the notification. */
376 ret = enqueue_dropped_notification(channel);
377 goto end;
378 }
379
380 pending_notification = zmalloc(sizeof(*pending_notification));
381 if (!pending_notification) {
382 ret = -1;
383 goto error;
384 }
385 CDS_INIT_LIST_HEAD(&pending_notification->node);
386
387 notification = create_notification_from_current_message(channel);
388 if (!notification) {
389 ret = -1;
390 goto error;
391 }
392
393 pending_notification->notification = notification;
394 cds_list_add(&pending_notification->node,
395 &channel->pending_notifications.list);
396 channel->pending_notifications.count++;
397 end:
398 return ret;
399 error:
400 free(pending_notification);
401 goto end;
402 }
403
404 enum lttng_notification_channel_status
405 lttng_notification_channel_has_pending_notification(
406 struct lttng_notification_channel *channel,
407 bool *_notification_pending)
408 {
409 int ret;
410 enum lttng_notification_channel_status status =
411 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
412 struct lttng_poll_event events;
413
414 if (!channel || !_notification_pending) {
415 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
416 goto end;
417 }
418
419 pthread_mutex_lock(&channel->lock);
420
421 if (channel->pending_notifications.count) {
422 *_notification_pending = true;
423 goto end_unlock;
424 }
425
426 if (channel->socket < 0) {
427 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED;
428 goto end_unlock;
429 }
430
431 /*
432 * Check, without blocking, if data is available on the channel's
433 * socket. If there is data available, it is safe to read (blocking)
434 * on the socket for a message from the session daemon.
435 *
436 * Since all commands wait for the session daemon's reply before
437 * releasing the channel's lock, the protocol only allows for
438 * notifications and "notification dropped" messages to come
439 * through. If we receive a different message type, it is
440 * considered a protocol error.
441 *
442 * Note that this function is not guaranteed not to block. This
443 * will block until our peer (the session daemon) has sent a complete
444 * message if we see data available on the socket. If the peer does
445 * not respect the protocol, this may block indefinitely.
446 */
447 ret = lttng_poll_create(&events, 1, LTTNG_CLOEXEC);
448 if (ret < 0) {
449 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
450 goto end_unlock;
451 }
452 ret = lttng_poll_add(&events, channel->socket, LPOLLIN | LPOLLERR);
453 if (ret < 0) {
454 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
455 goto end_clean_poll;
456 }
457 /* timeout = 0: return immediately. */
458 ret = lttng_poll_wait_interruptible(&events, 0);
459 if (ret == 0) {
460 /* No data available. */
461 *_notification_pending = false;
462 goto end_clean_poll;
463 } else if (ret < 0) {
464 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
465 goto end_clean_poll;
466 }
467
468 /* Data available on socket. */
469 ret = receive_message(channel);
470 if (ret) {
471 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
472 goto end_clean_poll;
473 }
474
475 switch (get_current_message_type(channel)) {
476 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION:
477 ret = enqueue_notification_from_current_message(channel);
478 if (ret) {
479 goto end_clean_poll;
480 }
481 *_notification_pending = true;
482 break;
483 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED:
484 ret = enqueue_dropped_notification(channel);
485 if (ret) {
486 goto end_clean_poll;
487 }
488 *_notification_pending = true;
489 break;
490 default:
491 /* Protocol error. */
492 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
493 goto end_clean_poll;
494 }
495
496 end_clean_poll:
497 lttng_poll_clean(&events);
498 end_unlock:
499 pthread_mutex_unlock(&channel->lock);
500 end:
501 return status;
502 }
503
504 static
505 int receive_command_reply(struct lttng_notification_channel *channel,
506 enum lttng_notification_channel_status *status)
507 {
508 int ret;
509 struct lttng_notification_channel_command_reply *reply;
510
511 while (true) {
512 enum lttng_notification_channel_message_type msg_type;
513
514 ret = receive_message(channel);
515 if (ret) {
516 goto end;
517 }
518
519 msg_type = get_current_message_type(channel);
520 switch (msg_type) {
521 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY:
522 goto exit_loop;
523 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION:
524 ret = enqueue_notification_from_current_message(
525 channel);
526 if (ret) {
527 goto end;
528 }
529 break;
530 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED:
531 ret = enqueue_dropped_notification(channel);
532 if (ret) {
533 goto end;
534 }
535 break;
536 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
537 {
538 struct lttng_notification_channel_command_handshake *handshake;
539
540 handshake = (struct lttng_notification_channel_command_handshake *)
541 (channel->reception_payload.buffer.data +
542 sizeof(struct lttng_notification_channel_message));
543 channel->version.major = handshake->major;
544 channel->version.minor = handshake->minor;
545 channel->version.set = true;
546 break;
547 }
548 default:
549 ret = -1;
550 goto end;
551 }
552 }
553
554 exit_loop:
555 if (channel->reception_payload.buffer.size <
556 (sizeof(struct lttng_notification_channel_message) +
557 sizeof(*reply))) {
558 /* Invalid message received. */
559 ret = -1;
560 goto end;
561 }
562
563 reply = (struct lttng_notification_channel_command_reply *)
564 (channel->reception_payload.buffer.data +
565 sizeof(struct lttng_notification_channel_message));
566 *status = (enum lttng_notification_channel_status) reply->status;
567 end:
568 return ret;
569 }
570
571 static
572 int handshake(struct lttng_notification_channel *channel)
573 {
574 ssize_t ret;
575 enum lttng_notification_channel_status status =
576 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
577 struct lttng_notification_channel_command_handshake handshake = {
578 .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
579 .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
580 };
581 struct lttng_notification_channel_message msg_header = {
582 .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE,
583 .size = sizeof(handshake),
584 };
585 char send_buffer[sizeof(msg_header) + sizeof(handshake)];
586
587 memcpy(send_buffer, &msg_header, sizeof(msg_header));
588 memcpy(send_buffer + sizeof(msg_header), &handshake, sizeof(handshake));
589
590 pthread_mutex_lock(&channel->lock);
591
592 ret = lttcomm_send_creds_unix_sock(channel->socket, send_buffer,
593 sizeof(send_buffer));
594 if (ret < 0) {
595 goto end_unlock;
596 }
597
598 /* Receive handshake info from the sessiond. */
599 ret = receive_command_reply(channel, &status);
600 if (ret < 0) {
601 goto end_unlock;
602 }
603
604 if (!channel->version.set) {
605 ret = -1;
606 goto end_unlock;
607 }
608
609 if (channel->version.major != LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) {
610 ret = -1;
611 goto end_unlock;
612 }
613
614 end_unlock:
615 pthread_mutex_unlock(&channel->lock);
616 return ret;
617 }
618
619 static
620 enum lttng_notification_channel_status send_condition_command(
621 struct lttng_notification_channel *channel,
622 enum lttng_notification_channel_message_type type,
623 const struct lttng_condition *condition)
624 {
625 int socket;
626 ssize_t ret;
627 enum lttng_notification_channel_status status =
628 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
629 struct lttng_payload payload;
630 struct lttng_notification_channel_message cmd_header = {
631 .type = (int8_t) type,
632 };
633
634 lttng_payload_init(&payload);
635
636 if (!channel) {
637 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
638 goto end;
639 }
640
641 assert(type == LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE ||
642 type == LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE);
643
644 pthread_mutex_lock(&channel->lock);
645 socket = channel->socket;
646
647 if (!lttng_condition_validate(condition)) {
648 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
649 goto end_unlock;
650 }
651
652 ret = lttng_dynamic_buffer_append(&payload.buffer, &cmd_header,
653 sizeof(cmd_header));
654 if (ret) {
655 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
656 goto end_unlock;
657 }
658
659 ret = lttng_condition_serialize(condition, &payload);
660 if (ret) {
661 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
662 goto end_unlock;
663 }
664
665 /* Update payload length. */
666 ((struct lttng_notification_channel_message *) payload.buffer.data)->size =
667 (uint32_t) (payload.buffer.size - sizeof(cmd_header));
668
669 {
670 struct lttng_payload_view pv =
671 lttng_payload_view_from_payload(
672 &payload, 0, -1);
673 const int fd_count =
674 lttng_payload_view_get_fd_handle_count(&pv);
675
676 /* Update fd count. */
677 ((struct lttng_notification_channel_message *) payload.buffer.data)->fds =
678 (uint32_t) fd_count;
679
680 ret = lttcomm_send_unix_sock(
681 socket, pv.buffer.data, pv.buffer.size);
682 if (ret < 0) {
683 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
684 goto end_unlock;
685 }
686
687 /* Pass fds if present. */
688 if (fd_count > 0) {
689 ret = lttcomm_send_payload_view_fds_unix_sock(socket,
690 &pv);
691 if (ret < 0) {
692 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
693 goto end_unlock;
694 }
695 }
696 }
697
698 ret = receive_command_reply(channel, &status);
699 if (ret < 0) {
700 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
701 goto end_unlock;
702 }
703 end_unlock:
704 pthread_mutex_unlock(&channel->lock);
705 end:
706 lttng_payload_reset(&payload);
707 return status;
708 }
709
710 enum lttng_notification_channel_status lttng_notification_channel_subscribe(
711 struct lttng_notification_channel *channel,
712 const struct lttng_condition *condition)
713 {
714 return send_condition_command(channel,
715 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE,
716 condition);
717 }
718
719 enum lttng_notification_channel_status lttng_notification_channel_unsubscribe(
720 struct lttng_notification_channel *channel,
721 const struct lttng_condition *condition)
722 {
723 return send_condition_command(channel,
724 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE,
725 condition);
726 }
727
728 void lttng_notification_channel_destroy(
729 struct lttng_notification_channel *channel)
730 {
731 if (!channel) {
732 return;
733 }
734
735 if (channel->socket >= 0) {
736 (void) lttcomm_close_unix_sock(channel->socket);
737 }
738 pthread_mutex_destroy(&channel->lock);
739 lttng_payload_reset(&channel->reception_payload);
740 free(channel);
741 }
This page took 0.044636 seconds and 4 git commands to generate.