16bd298be330e4dfcbc274326c22825669fd5346
[lttng-tools.git] / src / lib / lttng-ctl / channel.c
1 /*
2 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * SPDX-License-Identifier: LGPL-2.1-only
5 *
6 */
7
8 #include <lttng/notification/notification-internal.h>
9 #include <lttng/notification/channel-internal.h>
10 #include <lttng/condition/condition-internal.h>
11 #include <lttng/endpoint.h>
12 #include <common/defaults.h>
13 #include <common/error.h>
14 #include <common/dynamic-buffer.h>
15 #include <common/utils.h>
16 #include <common/defaults.h>
17 #include <assert.h>
18 #include "lttng-ctl-helper.h"
19 #include <common/compat/poll.h>
20
21 static
22 int handshake(struct lttng_notification_channel *channel);
23
24 /*
25 * Populates the reception buffer with the next complete message.
26 * The caller must acquire the channel's lock.
27 */
28 static
29 int receive_message(struct lttng_notification_channel *channel)
30 {
31 ssize_t ret;
32 struct lttng_notification_channel_message msg;
33
34 if (lttng_dynamic_buffer_set_size(&channel->reception_buffer, 0)) {
35 ret = -1;
36 goto end;
37 }
38
39 ret = lttcomm_recv_unix_sock(channel->socket, &msg, sizeof(msg));
40 if (ret <= 0) {
41 ret = -1;
42 goto error;
43 }
44
45 if (msg.size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) {
46 ret = -1;
47 goto error;
48 }
49
50 /* Add message header at buffer's start. */
51 ret = lttng_dynamic_buffer_append(&channel->reception_buffer, &msg,
52 sizeof(msg));
53 if (ret) {
54 goto error;
55 }
56
57 /* Reserve space for the payload. */
58 ret = lttng_dynamic_buffer_set_size(&channel->reception_buffer,
59 channel->reception_buffer.size + msg.size);
60 if (ret) {
61 goto error;
62 }
63
64 /* Receive message payload. */
65 ret = lttcomm_recv_unix_sock(channel->socket,
66 channel->reception_buffer.data + sizeof(msg), msg.size);
67 if (ret < (ssize_t) msg.size) {
68 ret = -1;
69 goto error;
70 }
71 ret = 0;
72 end:
73 return ret;
74 error:
75 if (lttng_dynamic_buffer_set_size(&channel->reception_buffer, 0)) {
76 ret = -1;
77 }
78 goto end;
79 }
80
81 static
82 enum lttng_notification_channel_message_type get_current_message_type(
83 struct lttng_notification_channel *channel)
84 {
85 struct lttng_notification_channel_message *msg;
86
87 assert(channel->reception_buffer.size >= sizeof(*msg));
88
89 msg = (struct lttng_notification_channel_message *)
90 channel->reception_buffer.data;
91 return (enum lttng_notification_channel_message_type) msg->type;
92 }
93
94 static
95 struct lttng_notification *create_notification_from_current_message(
96 struct lttng_notification_channel *channel)
97 {
98 ssize_t ret;
99 struct lttng_notification *notification = NULL;
100
101 if (channel->reception_buffer.size <=
102 sizeof(struct lttng_notification_channel_message)) {
103 goto end;
104 }
105
106 {
107 struct lttng_payload_view view = lttng_payload_view_from_dynamic_buffer(
108 &channel->reception_buffer,
109 sizeof(struct lttng_notification_channel_message),
110 -1);
111
112 ret = lttng_notification_create_from_payload(
113 &view, &notification);
114 }
115
116 if (ret != channel->reception_buffer.size -
117 sizeof(struct lttng_notification_channel_message)) {
118 lttng_notification_destroy(notification);
119 notification = NULL;
120 goto end;
121 }
122 end:
123 return notification;
124 }
125
126 struct lttng_notification_channel *lttng_notification_channel_create(
127 struct lttng_endpoint *endpoint)
128 {
129 int fd, ret;
130 bool is_in_tracing_group = false, is_root = false;
131 char *sock_path = NULL;
132 struct lttng_notification_channel *channel = NULL;
133
134 if (!endpoint ||
135 endpoint != lttng_session_daemon_notification_endpoint) {
136 goto end;
137 }
138
139 sock_path = zmalloc(LTTNG_PATH_MAX);
140 if (!sock_path) {
141 goto end;
142 }
143
144 channel = zmalloc(sizeof(struct lttng_notification_channel));
145 if (!channel) {
146 goto end;
147 }
148 channel->socket = -1;
149 pthread_mutex_init(&channel->lock, NULL);
150 lttng_dynamic_buffer_init(&channel->reception_buffer);
151 CDS_INIT_LIST_HEAD(&channel->pending_notifications.list);
152
153 is_root = (getuid() == 0);
154 if (!is_root) {
155 is_in_tracing_group = lttng_check_tracing_group();
156 }
157
158 if (is_root || is_in_tracing_group) {
159 lttng_ctl_copy_string(sock_path,
160 DEFAULT_GLOBAL_NOTIFICATION_CHANNEL_UNIX_SOCK,
161 LTTNG_PATH_MAX);
162 ret = lttcomm_connect_unix_sock(sock_path);
163 if (ret >= 0) {
164 fd = ret;
165 goto set_fd;
166 }
167 }
168
169 /* Fallback to local session daemon. */
170 ret = snprintf(sock_path, LTTNG_PATH_MAX,
171 DEFAULT_HOME_NOTIFICATION_CHANNEL_UNIX_SOCK,
172 utils_get_home_dir());
173 if (ret < 0 || ret >= LTTNG_PATH_MAX) {
174 goto error;
175 }
176
177 ret = lttcomm_connect_unix_sock(sock_path);
178 if (ret < 0) {
179 goto error;
180 }
181 fd = ret;
182
183 set_fd:
184 channel->socket = fd;
185
186 ret = handshake(channel);
187 if (ret) {
188 goto error;
189 }
190 end:
191 free(sock_path);
192 return channel;
193 error:
194 lttng_notification_channel_destroy(channel);
195 channel = NULL;
196 goto end;
197 }
198
199 enum lttng_notification_channel_status
200 lttng_notification_channel_get_next_notification(
201 struct lttng_notification_channel *channel,
202 struct lttng_notification **_notification)
203 {
204 int ret;
205 struct lttng_notification *notification = NULL;
206 enum lttng_notification_channel_status status =
207 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
208 struct lttng_poll_event events;
209
210 if (!channel || !_notification) {
211 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
212 goto end;
213 }
214
215 pthread_mutex_lock(&channel->lock);
216
217 if (channel->pending_notifications.count) {
218 struct pending_notification *pending_notification;
219
220 assert(!cds_list_empty(&channel->pending_notifications.list));
221
222 /* Deliver one of the pending notifications. */
223 pending_notification = cds_list_first_entry(
224 &channel->pending_notifications.list,
225 struct pending_notification,
226 node);
227 notification = pending_notification->notification;
228 if (!notification) {
229 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED;
230 }
231 cds_list_del(&pending_notification->node);
232 channel->pending_notifications.count--;
233 free(pending_notification);
234 goto end_unlock;
235 }
236
237 /*
238 * Block on interruptible epoll/poll() instead of the message reception
239 * itself as the recvmsg() wrappers always restart on EINTR. We choose
240 * to wait using interruptible epoll/poll() in order to:
241 * 1) Return if a signal occurs,
242 * 2) Not deal with partially received messages.
243 *
244 * The drawback to this approach is that we assume that messages
245 * are complete/well formed. If a message is shorter than its
246 * announced length, receive_message() will block on recvmsg()
247 * and never return (even if a signal is received).
248 */
249 ret = lttng_poll_create(&events, 1, LTTNG_CLOEXEC);
250 if (ret < 0) {
251 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
252 goto end_unlock;
253 }
254 ret = lttng_poll_add(&events, channel->socket, LPOLLIN | LPOLLERR);
255 if (ret < 0) {
256 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
257 goto end_clean_poll;
258 }
259 ret = lttng_poll_wait_interruptible(&events, -1);
260 if (ret <= 0) {
261 status = (ret == -1 && errno == EINTR) ?
262 LTTNG_NOTIFICATION_CHANNEL_STATUS_INTERRUPTED :
263 LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
264 goto end_clean_poll;
265 }
266
267 ret = receive_message(channel);
268 if (ret) {
269 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
270 goto end_clean_poll;
271 }
272
273 switch (get_current_message_type(channel)) {
274 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION:
275 notification = create_notification_from_current_message(
276 channel);
277 if (!notification) {
278 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
279 goto end_clean_poll;
280 }
281 break;
282 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED:
283 /* No payload to consume. */
284 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED;
285 break;
286 default:
287 /* Protocol error. */
288 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
289 goto end_clean_poll;
290 }
291
292 end_clean_poll:
293 lttng_poll_clean(&events);
294 end_unlock:
295 pthread_mutex_unlock(&channel->lock);
296 *_notification = notification;
297 end:
298 return status;
299 }
300
301 static
302 int enqueue_dropped_notification(
303 struct lttng_notification_channel *channel)
304 {
305 int ret = 0;
306 struct pending_notification *pending_notification;
307 struct cds_list_head *last_element =
308 channel->pending_notifications.list.prev;
309
310 pending_notification = caa_container_of(last_element,
311 struct pending_notification, node);
312 if (!pending_notification->notification) {
313 /*
314 * The last enqueued notification indicates dropped
315 * notifications; there is nothing to do as we group
316 * dropped notifications together.
317 */
318 goto end;
319 }
320
321 if (channel->pending_notifications.count >=
322 DEFAULT_CLIENT_MAX_QUEUED_NOTIFICATIONS_COUNT &&
323 pending_notification->notification) {
324 /*
325 * Discard the last enqueued notification to indicate
326 * that notifications were dropped at this point.
327 */
328 lttng_notification_destroy(
329 pending_notification->notification);
330 pending_notification->notification = NULL;
331 goto end;
332 }
333
334 pending_notification = zmalloc(sizeof(*pending_notification));
335 if (!pending_notification) {
336 ret = -1;
337 goto end;
338 }
339 CDS_INIT_LIST_HEAD(&pending_notification->node);
340 cds_list_add(&pending_notification->node,
341 &channel->pending_notifications.list);
342 channel->pending_notifications.count++;
343 end:
344 return ret;
345 }
346
347 static
348 int enqueue_notification_from_current_message(
349 struct lttng_notification_channel *channel)
350 {
351 int ret = 0;
352 struct lttng_notification *notification;
353 struct pending_notification *pending_notification;
354
355 if (channel->pending_notifications.count >=
356 DEFAULT_CLIENT_MAX_QUEUED_NOTIFICATIONS_COUNT) {
357 /* Drop the notification. */
358 ret = enqueue_dropped_notification(channel);
359 goto end;
360 }
361
362 pending_notification = zmalloc(sizeof(*pending_notification));
363 if (!pending_notification) {
364 ret = -1;
365 goto error;
366 }
367 CDS_INIT_LIST_HEAD(&pending_notification->node);
368
369 notification = create_notification_from_current_message(channel);
370 if (!notification) {
371 ret = -1;
372 goto error;
373 }
374
375 pending_notification->notification = notification;
376 cds_list_add(&pending_notification->node,
377 &channel->pending_notifications.list);
378 channel->pending_notifications.count++;
379 end:
380 return ret;
381 error:
382 free(pending_notification);
383 goto end;
384 }
385
386 enum lttng_notification_channel_status
387 lttng_notification_channel_has_pending_notification(
388 struct lttng_notification_channel *channel,
389 bool *_notification_pending)
390 {
391 int ret;
392 enum lttng_notification_channel_status status =
393 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
394 struct lttng_poll_event events;
395
396 if (!channel || !_notification_pending) {
397 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
398 goto end;
399 }
400
401 pthread_mutex_lock(&channel->lock);
402
403 if (channel->pending_notifications.count) {
404 *_notification_pending = true;
405 goto end_unlock;
406 }
407
408 if (channel->socket < 0) {
409 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED;
410 goto end_unlock;
411 }
412
413 /*
414 * Check, without blocking, if data is available on the channel's
415 * socket. If there is data available, it is safe to read (blocking)
416 * on the socket for a message from the session daemon.
417 *
418 * Since all commands wait for the session daemon's reply before
419 * releasing the channel's lock, the protocol only allows for
420 * notifications and "notification dropped" messages to come
421 * through. If we receive a different message type, it is
422 * considered a protocol error.
423 *
424 * Note that this function is not guaranteed not to block. This
425 * will block until our peer (the session daemon) has sent a complete
426 * message if we see data available on the socket. If the peer does
427 * not respect the protocol, this may block indefinitely.
428 */
429 ret = lttng_poll_create(&events, 1, LTTNG_CLOEXEC);
430 if (ret < 0) {
431 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
432 goto end_unlock;
433 }
434 ret = lttng_poll_add(&events, channel->socket, LPOLLIN | LPOLLERR);
435 if (ret < 0) {
436 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
437 goto end_clean_poll;
438 }
439 /* timeout = 0: return immediately. */
440 ret = lttng_poll_wait_interruptible(&events, 0);
441 if (ret == 0) {
442 /* No data available. */
443 *_notification_pending = false;
444 goto end_clean_poll;
445 } else if (ret < 0) {
446 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
447 goto end_clean_poll;
448 }
449
450 /* Data available on socket. */
451 ret = receive_message(channel);
452 if (ret) {
453 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
454 goto end_clean_poll;
455 }
456
457 switch (get_current_message_type(channel)) {
458 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION:
459 ret = enqueue_notification_from_current_message(channel);
460 if (ret) {
461 goto end_clean_poll;
462 }
463 *_notification_pending = true;
464 break;
465 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED:
466 ret = enqueue_dropped_notification(channel);
467 if (ret) {
468 goto end_clean_poll;
469 }
470 *_notification_pending = true;
471 break;
472 default:
473 /* Protocol error. */
474 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
475 goto end_clean_poll;
476 }
477
478 end_clean_poll:
479 lttng_poll_clean(&events);
480 end_unlock:
481 pthread_mutex_unlock(&channel->lock);
482 end:
483 return status;
484 }
485
486 static
487 int receive_command_reply(struct lttng_notification_channel *channel,
488 enum lttng_notification_channel_status *status)
489 {
490 int ret;
491 struct lttng_notification_channel_command_reply *reply;
492
493 while (true) {
494 enum lttng_notification_channel_message_type msg_type;
495
496 ret = receive_message(channel);
497 if (ret) {
498 goto end;
499 }
500
501 msg_type = get_current_message_type(channel);
502 switch (msg_type) {
503 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY:
504 goto exit_loop;
505 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION:
506 ret = enqueue_notification_from_current_message(
507 channel);
508 if (ret) {
509 goto end;
510 }
511 break;
512 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED:
513 ret = enqueue_dropped_notification(channel);
514 if (ret) {
515 goto end;
516 }
517 break;
518 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
519 {
520 struct lttng_notification_channel_command_handshake *handshake;
521
522 handshake = (struct lttng_notification_channel_command_handshake *)
523 (channel->reception_buffer.data +
524 sizeof(struct lttng_notification_channel_message));
525 channel->version.major = handshake->major;
526 channel->version.minor = handshake->minor;
527 channel->version.set = true;
528 break;
529 }
530 default:
531 ret = -1;
532 goto end;
533 }
534 }
535
536 exit_loop:
537 if (channel->reception_buffer.size <
538 (sizeof(struct lttng_notification_channel_message) +
539 sizeof(*reply))) {
540 /* Invalid message received. */
541 ret = -1;
542 goto end;
543 }
544
545 reply = (struct lttng_notification_channel_command_reply *)
546 (channel->reception_buffer.data +
547 sizeof(struct lttng_notification_channel_message));
548 *status = (enum lttng_notification_channel_status) reply->status;
549 end:
550 return ret;
551 }
552
553 static
554 int handshake(struct lttng_notification_channel *channel)
555 {
556 ssize_t ret;
557 enum lttng_notification_channel_status status =
558 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
559 struct lttng_notification_channel_command_handshake handshake = {
560 .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
561 .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
562 };
563 struct lttng_notification_channel_message msg_header = {
564 .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE,
565 .size = sizeof(handshake),
566 };
567 char send_buffer[sizeof(msg_header) + sizeof(handshake)];
568
569 memcpy(send_buffer, &msg_header, sizeof(msg_header));
570 memcpy(send_buffer + sizeof(msg_header), &handshake, sizeof(handshake));
571
572 pthread_mutex_lock(&channel->lock);
573
574 ret = lttcomm_send_creds_unix_sock(channel->socket, send_buffer,
575 sizeof(send_buffer));
576 if (ret < 0) {
577 goto end_unlock;
578 }
579
580 /* Receive handshake info from the sessiond. */
581 ret = receive_command_reply(channel, &status);
582 if (ret < 0) {
583 goto end_unlock;
584 }
585
586 if (!channel->version.set) {
587 ret = -1;
588 goto end_unlock;
589 }
590
591 if (channel->version.major != LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) {
592 ret = -1;
593 goto end_unlock;
594 }
595
596 end_unlock:
597 pthread_mutex_unlock(&channel->lock);
598 return ret;
599 }
600
601 static
602 enum lttng_notification_channel_status send_condition_command(
603 struct lttng_notification_channel *channel,
604 enum lttng_notification_channel_message_type type,
605 const struct lttng_condition *condition)
606 {
607 int socket;
608 ssize_t ret;
609 enum lttng_notification_channel_status status =
610 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
611 struct lttng_payload payload;
612 struct lttng_notification_channel_message cmd_header = {
613 .type = (int8_t) type,
614 };
615
616 lttng_payload_init(&payload);
617
618 if (!channel) {
619 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
620 goto end;
621 }
622
623 assert(type == LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE ||
624 type == LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE);
625
626 pthread_mutex_lock(&channel->lock);
627 socket = channel->socket;
628 if (!lttng_condition_validate(condition)) {
629 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
630 goto end_unlock;
631 }
632
633 ret = lttng_dynamic_buffer_append(&payload.buffer, &cmd_header,
634 sizeof(cmd_header));
635 if (ret) {
636 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
637 goto end_unlock;
638 }
639
640 ret = lttng_condition_serialize(condition, &payload);
641 if (ret) {
642 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
643 goto end_unlock;
644 }
645
646 /* Update payload length. */
647 ((struct lttng_notification_channel_message *) payload.buffer.data)->size =
648 (uint32_t) (payload.buffer.size - sizeof(cmd_header));
649
650 ret = lttcomm_send_unix_sock(
651 socket, payload.buffer.data, payload.buffer.size);
652 if (ret < 0) {
653 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
654 goto end_unlock;
655 }
656
657 ret = receive_command_reply(channel, &status);
658 if (ret < 0) {
659 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
660 goto end_unlock;
661 }
662 end_unlock:
663 pthread_mutex_unlock(&channel->lock);
664 end:
665 lttng_payload_reset(&payload);
666 return status;
667 }
668
669 enum lttng_notification_channel_status lttng_notification_channel_subscribe(
670 struct lttng_notification_channel *channel,
671 const struct lttng_condition *condition)
672 {
673 return send_condition_command(channel,
674 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE,
675 condition);
676 }
677
678 enum lttng_notification_channel_status lttng_notification_channel_unsubscribe(
679 struct lttng_notification_channel *channel,
680 const struct lttng_condition *condition)
681 {
682 return send_condition_command(channel,
683 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE,
684 condition);
685 }
686
687 void lttng_notification_channel_destroy(
688 struct lttng_notification_channel *channel)
689 {
690 if (!channel) {
691 return;
692 }
693
694 if (channel->socket >= 0) {
695 (void) lttcomm_close_unix_sock(channel->socket);
696 }
697 pthread_mutex_destroy(&channel->lock);
698 lttng_dynamic_buffer_reset(&channel->reception_buffer);
699 free(channel);
700 }
This page took 0.041984 seconds and 3 git commands to generate.