Fix: notification: assert on len > 0 for dropped notification message
[lttng-tools.git] / src / lib / lttng-ctl / channel.c
1 /*
2 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * SPDX-License-Identifier: LGPL-2.1-only
5 *
6 */
7
8 #include <lttng/notification/notification-internal.h>
9 #include <lttng/notification/channel-internal.h>
10 #include <lttng/condition/condition-internal.h>
11 #include <lttng/endpoint.h>
12 #include <common/defaults.h>
13 #include <common/error.h>
14 #include <common/dynamic-buffer.h>
15 #include <common/utils.h>
16 #include <common/defaults.h>
17 #include <assert.h>
18 #include "lttng-ctl-helper.h"
19 #include <common/compat/poll.h>
20
21 static
22 int handshake(struct lttng_notification_channel *channel);
23
24 /*
25 * Populates the reception buffer with the next complete message.
26 * The caller must acquire the channel's lock.
27 */
28 static
29 int receive_message(struct lttng_notification_channel *channel)
30 {
31 ssize_t ret;
32 struct lttng_notification_channel_message msg;
33
34 if (lttng_dynamic_buffer_set_size(&channel->reception_buffer, 0)) {
35 ret = -1;
36 goto end;
37 }
38
39 ret = lttcomm_recv_unix_sock(channel->socket, &msg, sizeof(msg));
40 if (ret <= 0) {
41 ret = -1;
42 goto error;
43 }
44
45 if (msg.size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) {
46 ret = -1;
47 goto error;
48 }
49
50 /* Add message header at buffer's start. */
51 ret = lttng_dynamic_buffer_append(&channel->reception_buffer, &msg,
52 sizeof(msg));
53 if (ret) {
54 goto error;
55 }
56
57 if (msg.size == 0) {
58 goto skip_payload;
59 }
60
61 /* Reserve space for the payload. */
62 ret = lttng_dynamic_buffer_set_size(&channel->reception_buffer,
63 channel->reception_buffer.size + msg.size);
64 if (ret) {
65 goto error;
66 }
67
68 /* Receive message payload. */
69 ret = lttcomm_recv_unix_sock(channel->socket,
70 channel->reception_buffer.data + sizeof(msg), msg.size);
71 if (ret < (ssize_t) msg.size) {
72 ret = -1;
73 goto error;
74 }
75
76 skip_payload:
77 ret = 0;
78 end:
79 return ret;
80 error:
81 if (lttng_dynamic_buffer_set_size(&channel->reception_buffer, 0)) {
82 ret = -1;
83 }
84 goto end;
85 }
86
87 static
88 enum lttng_notification_channel_message_type get_current_message_type(
89 struct lttng_notification_channel *channel)
90 {
91 struct lttng_notification_channel_message *msg;
92
93 assert(channel->reception_buffer.size >= sizeof(*msg));
94
95 msg = (struct lttng_notification_channel_message *)
96 channel->reception_buffer.data;
97 return (enum lttng_notification_channel_message_type) msg->type;
98 }
99
100 static
101 struct lttng_notification *create_notification_from_current_message(
102 struct lttng_notification_channel *channel)
103 {
104 ssize_t ret;
105 struct lttng_notification *notification = NULL;
106 struct lttng_buffer_view view;
107
108 if (channel->reception_buffer.size <=
109 sizeof(struct lttng_notification_channel_message)) {
110 goto end;
111 }
112
113 view = lttng_buffer_view_from_dynamic_buffer(&channel->reception_buffer,
114 sizeof(struct lttng_notification_channel_message), -1);
115
116 ret = lttng_notification_create_from_buffer(&view, &notification);
117 if (ret != channel->reception_buffer.size -
118 sizeof(struct lttng_notification_channel_message)) {
119 lttng_notification_destroy(notification);
120 notification = NULL;
121 goto end;
122 }
123 end:
124 return notification;
125 }
126
127 struct lttng_notification_channel *lttng_notification_channel_create(
128 struct lttng_endpoint *endpoint)
129 {
130 int fd, ret;
131 bool is_in_tracing_group = false, is_root = false;
132 char *sock_path = NULL;
133 struct lttng_notification_channel *channel = NULL;
134
135 if (!endpoint ||
136 endpoint != lttng_session_daemon_notification_endpoint) {
137 goto end;
138 }
139
140 sock_path = zmalloc(LTTNG_PATH_MAX);
141 if (!sock_path) {
142 goto end;
143 }
144
145 channel = zmalloc(sizeof(struct lttng_notification_channel));
146 if (!channel) {
147 goto end;
148 }
149 channel->socket = -1;
150 pthread_mutex_init(&channel->lock, NULL);
151 lttng_dynamic_buffer_init(&channel->reception_buffer);
152 CDS_INIT_LIST_HEAD(&channel->pending_notifications.list);
153
154 is_root = (getuid() == 0);
155 if (!is_root) {
156 is_in_tracing_group = lttng_check_tracing_group();
157 }
158
159 if (is_root || is_in_tracing_group) {
160 ret = lttng_strncpy(sock_path,
161 DEFAULT_GLOBAL_NOTIFICATION_CHANNEL_UNIX_SOCK,
162 LTTNG_PATH_MAX);
163 if (ret) {
164 ret = -LTTNG_ERR_INVALID;
165 goto error;
166 }
167
168 ret = lttcomm_connect_unix_sock(sock_path);
169 if (ret >= 0) {
170 fd = ret;
171 goto set_fd;
172 }
173 }
174
175 /* Fallback to local session daemon. */
176 ret = snprintf(sock_path, LTTNG_PATH_MAX,
177 DEFAULT_HOME_NOTIFICATION_CHANNEL_UNIX_SOCK,
178 utils_get_home_dir());
179 if (ret < 0 || ret >= LTTNG_PATH_MAX) {
180 goto error;
181 }
182
183 ret = lttcomm_connect_unix_sock(sock_path);
184 if (ret < 0) {
185 goto error;
186 }
187 fd = ret;
188
189 set_fd:
190 channel->socket = fd;
191
192 ret = handshake(channel);
193 if (ret) {
194 goto error;
195 }
196 end:
197 free(sock_path);
198 return channel;
199 error:
200 lttng_notification_channel_destroy(channel);
201 channel = NULL;
202 goto end;
203 }
204
205 enum lttng_notification_channel_status
206 lttng_notification_channel_get_next_notification(
207 struct lttng_notification_channel *channel,
208 struct lttng_notification **_notification)
209 {
210 int ret;
211 struct lttng_notification *notification = NULL;
212 enum lttng_notification_channel_status status =
213 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
214 struct lttng_poll_event events;
215
216 if (!channel || !_notification) {
217 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
218 goto end;
219 }
220
221 pthread_mutex_lock(&channel->lock);
222
223 if (channel->pending_notifications.count) {
224 struct pending_notification *pending_notification;
225
226 assert(!cds_list_empty(&channel->pending_notifications.list));
227
228 /* Deliver one of the pending notifications. */
229 pending_notification = cds_list_first_entry(
230 &channel->pending_notifications.list,
231 struct pending_notification,
232 node);
233 notification = pending_notification->notification;
234 if (!notification) {
235 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED;
236 }
237 cds_list_del(&pending_notification->node);
238 channel->pending_notifications.count--;
239 free(pending_notification);
240 goto end_unlock;
241 }
242
243 /*
244 * Block on interruptible epoll/poll() instead of the message reception
245 * itself as the recvmsg() wrappers always restart on EINTR. We choose
246 * to wait using interruptible epoll/poll() in order to:
247 * 1) Return if a signal occurs,
248 * 2) Not deal with partially received messages.
249 *
250 * The drawback to this approach is that we assume that messages
251 * are complete/well formed. If a message is shorter than its
252 * announced length, receive_message() will block on recvmsg()
253 * and never return (even if a signal is received).
254 */
255 ret = lttng_poll_create(&events, 1, LTTNG_CLOEXEC);
256 if (ret < 0) {
257 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
258 goto end_unlock;
259 }
260 ret = lttng_poll_add(&events, channel->socket, LPOLLIN | LPOLLERR);
261 if (ret < 0) {
262 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
263 goto end_clean_poll;
264 }
265 ret = lttng_poll_wait_interruptible(&events, -1);
266 if (ret <= 0) {
267 status = (ret == -1 && errno == EINTR) ?
268 LTTNG_NOTIFICATION_CHANNEL_STATUS_INTERRUPTED :
269 LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
270 goto end_clean_poll;
271 }
272
273 ret = receive_message(channel);
274 if (ret) {
275 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
276 goto end_clean_poll;
277 }
278
279 switch (get_current_message_type(channel)) {
280 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION:
281 notification = create_notification_from_current_message(
282 channel);
283 if (!notification) {
284 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
285 goto end_clean_poll;
286 }
287 break;
288 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED:
289 /* No payload to consume. */
290 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED;
291 break;
292 default:
293 /* Protocol error. */
294 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
295 goto end_clean_poll;
296 }
297
298 end_clean_poll:
299 lttng_poll_clean(&events);
300 end_unlock:
301 pthread_mutex_unlock(&channel->lock);
302 *_notification = notification;
303 end:
304 return status;
305 }
306
307 static
308 int enqueue_dropped_notification(
309 struct lttng_notification_channel *channel)
310 {
311 int ret = 0;
312 struct pending_notification *pending_notification;
313 struct cds_list_head *last_element =
314 channel->pending_notifications.list.prev;
315
316 pending_notification = caa_container_of(last_element,
317 struct pending_notification, node);
318 if (!pending_notification->notification) {
319 /*
320 * The last enqueued notification indicates dropped
321 * notifications; there is nothing to do as we group
322 * dropped notifications together.
323 */
324 goto end;
325 }
326
327 if (channel->pending_notifications.count >=
328 DEFAULT_CLIENT_MAX_QUEUED_NOTIFICATIONS_COUNT &&
329 pending_notification->notification) {
330 /*
331 * Discard the last enqueued notification to indicate
332 * that notifications were dropped at this point.
333 */
334 lttng_notification_destroy(
335 pending_notification->notification);
336 pending_notification->notification = NULL;
337 goto end;
338 }
339
340 pending_notification = zmalloc(sizeof(*pending_notification));
341 if (!pending_notification) {
342 ret = -1;
343 goto end;
344 }
345 CDS_INIT_LIST_HEAD(&pending_notification->node);
346 cds_list_add(&pending_notification->node,
347 &channel->pending_notifications.list);
348 channel->pending_notifications.count++;
349 end:
350 return ret;
351 }
352
353 static
354 int enqueue_notification_from_current_message(
355 struct lttng_notification_channel *channel)
356 {
357 int ret = 0;
358 struct lttng_notification *notification;
359 struct pending_notification *pending_notification;
360
361 if (channel->pending_notifications.count >=
362 DEFAULT_CLIENT_MAX_QUEUED_NOTIFICATIONS_COUNT) {
363 /* Drop the notification. */
364 ret = enqueue_dropped_notification(channel);
365 goto end;
366 }
367
368 pending_notification = zmalloc(sizeof(*pending_notification));
369 if (!pending_notification) {
370 ret = -1;
371 goto error;
372 }
373 CDS_INIT_LIST_HEAD(&pending_notification->node);
374
375 notification = create_notification_from_current_message(channel);
376 if (!notification) {
377 ret = -1;
378 goto error;
379 }
380
381 pending_notification->notification = notification;
382 cds_list_add(&pending_notification->node,
383 &channel->pending_notifications.list);
384 channel->pending_notifications.count++;
385 end:
386 return ret;
387 error:
388 free(pending_notification);
389 goto end;
390 }
391
392 enum lttng_notification_channel_status
393 lttng_notification_channel_has_pending_notification(
394 struct lttng_notification_channel *channel,
395 bool *_notification_pending)
396 {
397 int ret;
398 enum lttng_notification_channel_status status =
399 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
400 struct lttng_poll_event events;
401
402 if (!channel || !_notification_pending) {
403 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
404 goto end;
405 }
406
407 pthread_mutex_lock(&channel->lock);
408
409 if (channel->pending_notifications.count) {
410 *_notification_pending = true;
411 goto end_unlock;
412 }
413
414 if (channel->socket < 0) {
415 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED;
416 goto end_unlock;
417 }
418
419 /*
420 * Check, without blocking, if data is available on the channel's
421 * socket. If there is data available, it is safe to read (blocking)
422 * on the socket for a message from the session daemon.
423 *
424 * Since all commands wait for the session daemon's reply before
425 * releasing the channel's lock, the protocol only allows for
426 * notifications and "notification dropped" messages to come
427 * through. If we receive a different message type, it is
428 * considered a protocol error.
429 *
430 * Note that this function is not guaranteed not to block. This
431 * will block until our peer (the session daemon) has sent a complete
432 * message if we see data available on the socket. If the peer does
433 * not respect the protocol, this may block indefinitely.
434 */
435 ret = lttng_poll_create(&events, 1, LTTNG_CLOEXEC);
436 if (ret < 0) {
437 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
438 goto end_unlock;
439 }
440 ret = lttng_poll_add(&events, channel->socket, LPOLLIN | LPOLLERR);
441 if (ret < 0) {
442 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
443 goto end_clean_poll;
444 }
445 /* timeout = 0: return immediately. */
446 ret = lttng_poll_wait_interruptible(&events, 0);
447 if (ret == 0) {
448 /* No data available. */
449 *_notification_pending = false;
450 goto end_clean_poll;
451 } else if (ret < 0) {
452 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
453 goto end_clean_poll;
454 }
455
456 /* Data available on socket. */
457 ret = receive_message(channel);
458 if (ret) {
459 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
460 goto end_clean_poll;
461 }
462
463 switch (get_current_message_type(channel)) {
464 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION:
465 ret = enqueue_notification_from_current_message(channel);
466 if (ret) {
467 goto end_clean_poll;
468 }
469 *_notification_pending = true;
470 break;
471 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED:
472 ret = enqueue_dropped_notification(channel);
473 if (ret) {
474 goto end_clean_poll;
475 }
476 *_notification_pending = true;
477 break;
478 default:
479 /* Protocol error. */
480 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
481 goto end_clean_poll;
482 }
483
484 end_clean_poll:
485 lttng_poll_clean(&events);
486 end_unlock:
487 pthread_mutex_unlock(&channel->lock);
488 end:
489 return status;
490 }
491
492 static
493 int receive_command_reply(struct lttng_notification_channel *channel,
494 enum lttng_notification_channel_status *status)
495 {
496 int ret;
497 struct lttng_notification_channel_command_reply *reply;
498
499 while (true) {
500 enum lttng_notification_channel_message_type msg_type;
501
502 ret = receive_message(channel);
503 if (ret) {
504 goto end;
505 }
506
507 msg_type = get_current_message_type(channel);
508 switch (msg_type) {
509 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY:
510 goto exit_loop;
511 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION:
512 ret = enqueue_notification_from_current_message(
513 channel);
514 if (ret) {
515 goto end;
516 }
517 break;
518 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED:
519 ret = enqueue_dropped_notification(channel);
520 if (ret) {
521 goto end;
522 }
523 break;
524 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
525 {
526 struct lttng_notification_channel_command_handshake *handshake;
527
528 handshake = (struct lttng_notification_channel_command_handshake *)
529 (channel->reception_buffer.data +
530 sizeof(struct lttng_notification_channel_message));
531 channel->version.major = handshake->major;
532 channel->version.minor = handshake->minor;
533 channel->version.set = true;
534 break;
535 }
536 default:
537 ret = -1;
538 goto end;
539 }
540 }
541
542 exit_loop:
543 if (channel->reception_buffer.size <
544 (sizeof(struct lttng_notification_channel_message) +
545 sizeof(*reply))) {
546 /* Invalid message received. */
547 ret = -1;
548 goto end;
549 }
550
551 reply = (struct lttng_notification_channel_command_reply *)
552 (channel->reception_buffer.data +
553 sizeof(struct lttng_notification_channel_message));
554 *status = (enum lttng_notification_channel_status) reply->status;
555 end:
556 return ret;
557 }
558
559 static
560 int handshake(struct lttng_notification_channel *channel)
561 {
562 ssize_t ret;
563 enum lttng_notification_channel_status status =
564 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
565 struct lttng_notification_channel_command_handshake handshake = {
566 .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
567 .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
568 };
569 struct lttng_notification_channel_message msg_header = {
570 .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE,
571 .size = sizeof(handshake),
572 };
573 char send_buffer[sizeof(msg_header) + sizeof(handshake)];
574
575 memcpy(send_buffer, &msg_header, sizeof(msg_header));
576 memcpy(send_buffer + sizeof(msg_header), &handshake, sizeof(handshake));
577
578 pthread_mutex_lock(&channel->lock);
579
580 ret = lttcomm_send_creds_unix_sock(channel->socket, send_buffer,
581 sizeof(send_buffer));
582 if (ret < 0) {
583 goto end_unlock;
584 }
585
586 /* Receive handshake info from the sessiond. */
587 ret = receive_command_reply(channel, &status);
588 if (ret < 0) {
589 goto end_unlock;
590 }
591
592 if (!channel->version.set) {
593 ret = -1;
594 goto end_unlock;
595 }
596
597 if (channel->version.major != LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) {
598 ret = -1;
599 goto end_unlock;
600 }
601
602 end_unlock:
603 pthread_mutex_unlock(&channel->lock);
604 return ret;
605 }
606
607 static
608 enum lttng_notification_channel_status send_condition_command(
609 struct lttng_notification_channel *channel,
610 enum lttng_notification_channel_message_type type,
611 const struct lttng_condition *condition)
612 {
613 int socket;
614 ssize_t ret;
615 enum lttng_notification_channel_status status =
616 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
617 struct lttng_dynamic_buffer buffer;
618 struct lttng_notification_channel_message cmd_header = {
619 .type = (int8_t) type,
620 };
621
622 lttng_dynamic_buffer_init(&buffer);
623
624 if (!channel) {
625 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
626 goto end;
627 }
628
629 assert(type == LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE ||
630 type == LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE);
631
632 pthread_mutex_lock(&channel->lock);
633 socket = channel->socket;
634 if (!lttng_condition_validate(condition)) {
635 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
636 goto end_unlock;
637 }
638
639 ret = lttng_dynamic_buffer_append(&buffer, &cmd_header,
640 sizeof(cmd_header));
641 if (ret) {
642 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
643 goto end_unlock;
644 }
645
646 ret = lttng_condition_serialize(condition, &buffer);
647 if (ret) {
648 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
649 goto end_unlock;
650 }
651
652 /* Update payload length. */
653 ((struct lttng_notification_channel_message *) buffer.data)->size =
654 (uint32_t) (buffer.size - sizeof(cmd_header));
655
656 ret = lttcomm_send_unix_sock(socket, buffer.data, buffer.size);
657 if (ret < 0) {
658 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
659 goto end_unlock;
660 }
661
662 ret = receive_command_reply(channel, &status);
663 if (ret < 0) {
664 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
665 goto end_unlock;
666 }
667 end_unlock:
668 pthread_mutex_unlock(&channel->lock);
669 end:
670 lttng_dynamic_buffer_reset(&buffer);
671 return status;
672 }
673
674 enum lttng_notification_channel_status lttng_notification_channel_subscribe(
675 struct lttng_notification_channel *channel,
676 const struct lttng_condition *condition)
677 {
678 return send_condition_command(channel,
679 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE,
680 condition);
681 }
682
683 enum lttng_notification_channel_status lttng_notification_channel_unsubscribe(
684 struct lttng_notification_channel *channel,
685 const struct lttng_condition *condition)
686 {
687 return send_condition_command(channel,
688 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE,
689 condition);
690 }
691
692 void lttng_notification_channel_destroy(
693 struct lttng_notification_channel *channel)
694 {
695 if (!channel) {
696 return;
697 }
698
699 if (channel->socket >= 0) {
700 (void) lttcomm_close_unix_sock(channel->socket);
701 }
702 pthread_mutex_destroy(&channel->lock);
703 lttng_dynamic_buffer_reset(&channel->reception_buffer);
704 free(channel);
705 }
This page took 0.045487 seconds and 4 git commands to generate.