lttng-ctl: notifications: use epoll()/poll() instead of select()
[lttng-tools.git] / src / lib / lttng-ctl / channel.c
1 /*
2 * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * This library is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU Lesser General Public License, version 2.1 only,
6 * as published by the Free Software Foundation.
7 *
8 * This library is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
11 * for more details.
12 *
13 * You should have received a copy of the GNU Lesser General Public License
14 * along with this library; if not, write to the Free Software Foundation,
15 * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17
18 #include <lttng/notification/notification-internal.h>
19 #include <lttng/notification/channel-internal.h>
20 #include <lttng/condition/condition-internal.h>
21 #include <lttng/endpoint.h>
22 #include <common/defaults.h>
23 #include <common/error.h>
24 #include <common/dynamic-buffer.h>
25 #include <common/utils.h>
26 #include <common/defaults.h>
27 #include <assert.h>
28 #include "lttng-ctl-helper.h"
29 #include <common/compat/poll.h>
30
31 static
32 int handshake(struct lttng_notification_channel *channel);
33
34 /*
35 * Populates the reception buffer with the next complete message.
36 * The caller must acquire the channel's lock.
37 */
38 static
39 int receive_message(struct lttng_notification_channel *channel)
40 {
41 ssize_t ret;
42 struct lttng_notification_channel_message msg;
43
44 if (lttng_dynamic_buffer_set_size(&channel->reception_buffer, 0)) {
45 ret = -1;
46 goto end;
47 }
48
49 ret = lttcomm_recv_unix_sock(channel->socket, &msg, sizeof(msg));
50 if (ret <= 0) {
51 ret = -1;
52 goto error;
53 }
54
55 if (msg.size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) {
56 ret = -1;
57 goto error;
58 }
59
60 /* Add message header at buffer's start. */
61 ret = lttng_dynamic_buffer_append(&channel->reception_buffer, &msg,
62 sizeof(msg));
63 if (ret) {
64 goto error;
65 }
66
67 /* Reserve space for the payload. */
68 ret = lttng_dynamic_buffer_set_size(&channel->reception_buffer,
69 channel->reception_buffer.size + msg.size);
70 if (ret) {
71 goto error;
72 }
73
74 /* Receive message payload. */
75 ret = lttcomm_recv_unix_sock(channel->socket,
76 channel->reception_buffer.data + sizeof(msg), msg.size);
77 if (ret < (ssize_t) msg.size) {
78 ret = -1;
79 goto error;
80 }
81 ret = 0;
82 end:
83 return ret;
84 error:
85 if (lttng_dynamic_buffer_set_size(&channel->reception_buffer, 0)) {
86 ret = -1;
87 }
88 goto end;
89 }
90
91 static
92 enum lttng_notification_channel_message_type get_current_message_type(
93 struct lttng_notification_channel *channel)
94 {
95 struct lttng_notification_channel_message *msg;
96
97 assert(channel->reception_buffer.size >= sizeof(*msg));
98
99 msg = (struct lttng_notification_channel_message *)
100 channel->reception_buffer.data;
101 return (enum lttng_notification_channel_message_type) msg->type;
102 }
103
104 static
105 struct lttng_notification *create_notification_from_current_message(
106 struct lttng_notification_channel *channel)
107 {
108 ssize_t ret;
109 struct lttng_notification *notification = NULL;
110 struct lttng_buffer_view view;
111
112 if (channel->reception_buffer.size <=
113 sizeof(struct lttng_notification_channel_message)) {
114 goto end;
115 }
116
117 view = lttng_buffer_view_from_dynamic_buffer(&channel->reception_buffer,
118 sizeof(struct lttng_notification_channel_message), -1);
119
120 ret = lttng_notification_create_from_buffer(&view, &notification);
121 if (ret != channel->reception_buffer.size -
122 sizeof(struct lttng_notification_channel_message)) {
123 lttng_notification_destroy(notification);
124 notification = NULL;
125 goto end;
126 }
127 end:
128 return notification;
129 }
130
131 struct lttng_notification_channel *lttng_notification_channel_create(
132 struct lttng_endpoint *endpoint)
133 {
134 int fd, ret;
135 bool is_in_tracing_group = false, is_root = false;
136 char *sock_path = NULL;
137 struct lttng_notification_channel *channel = NULL;
138
139 if (!endpoint ||
140 endpoint != lttng_session_daemon_notification_endpoint) {
141 goto end;
142 }
143
144 sock_path = zmalloc(LTTNG_PATH_MAX);
145 if (!sock_path) {
146 goto end;
147 }
148
149 channel = zmalloc(sizeof(struct lttng_notification_channel));
150 if (!channel) {
151 goto end;
152 }
153 channel->socket = -1;
154 pthread_mutex_init(&channel->lock, NULL);
155 lttng_dynamic_buffer_init(&channel->reception_buffer);
156 CDS_INIT_LIST_HEAD(&channel->pending_notifications.list);
157
158 is_root = (getuid() == 0);
159 if (!is_root) {
160 is_in_tracing_group = lttng_check_tracing_group();
161 }
162
163 if (is_root || is_in_tracing_group) {
164 lttng_ctl_copy_string(sock_path,
165 DEFAULT_GLOBAL_NOTIFICATION_CHANNEL_UNIX_SOCK,
166 LTTNG_PATH_MAX);
167 ret = lttcomm_connect_unix_sock(sock_path);
168 if (ret >= 0) {
169 fd = ret;
170 goto set_fd;
171 }
172 }
173
174 /* Fallback to local session daemon. */
175 ret = snprintf(sock_path, LTTNG_PATH_MAX,
176 DEFAULT_HOME_NOTIFICATION_CHANNEL_UNIX_SOCK,
177 utils_get_home_dir());
178 if (ret < 0 || ret >= LTTNG_PATH_MAX) {
179 goto error;
180 }
181
182 ret = lttcomm_connect_unix_sock(sock_path);
183 if (ret < 0) {
184 goto error;
185 }
186 fd = ret;
187
188 set_fd:
189 channel->socket = fd;
190
191 ret = handshake(channel);
192 if (ret) {
193 goto error;
194 }
195 end:
196 free(sock_path);
197 return channel;
198 error:
199 lttng_notification_channel_destroy(channel);
200 channel = NULL;
201 goto end;
202 }
203
204 enum lttng_notification_channel_status
205 lttng_notification_channel_get_next_notification(
206 struct lttng_notification_channel *channel,
207 struct lttng_notification **_notification)
208 {
209 int ret;
210 struct lttng_notification *notification = NULL;
211 enum lttng_notification_channel_status status =
212 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
213 struct lttng_poll_event events;
214
215 if (!channel || !_notification) {
216 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
217 goto end;
218 }
219
220 pthread_mutex_lock(&channel->lock);
221
222 if (channel->pending_notifications.count) {
223 struct pending_notification *pending_notification;
224
225 assert(!cds_list_empty(&channel->pending_notifications.list));
226
227 /* Deliver one of the pending notifications. */
228 pending_notification = cds_list_first_entry(
229 &channel->pending_notifications.list,
230 struct pending_notification,
231 node);
232 notification = pending_notification->notification;
233 if (!notification) {
234 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED;
235 }
236 cds_list_del(&pending_notification->node);
237 channel->pending_notifications.count--;
238 free(pending_notification);
239 goto end_unlock;
240 }
241
242 /*
243 * Block on interruptible epoll/poll() instead of the message reception
244 * itself as the recvmsg() wrappers always restart on EINTR. We choose
245 * to wait using interruptible epoll/poll() in order to:
246 * 1) Return if a signal occurs,
247 * 2) Not deal with partially received messages.
248 *
249 * The drawback to this approach is that we assume that messages
250 * are complete/well formed. If a message is shorter than its
251 * announced length, receive_message() will block on recvmsg()
252 * and never return (even if a signal is received).
253 */
254 ret = lttng_poll_create(&events, 1, LTTNG_CLOEXEC);
255 if (ret < 0) {
256 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
257 goto end_unlock;
258 }
259 ret = lttng_poll_add(&events, channel->socket, LPOLLIN | LPOLLERR);
260 if (ret < 0) {
261 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
262 goto end_clean_poll;
263 }
264 ret = lttng_poll_wait_interruptible(&events, -1);
265 if (ret <= 0) {
266 status = (ret == -1 && errno == EINTR) ?
267 LTTNG_NOTIFICATION_CHANNEL_STATUS_INTERRUPTED :
268 LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
269 goto end_clean_poll;
270 }
271
272 ret = receive_message(channel);
273 if (ret) {
274 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
275 goto end_clean_poll;
276 }
277
278 switch (get_current_message_type(channel)) {
279 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION:
280 notification = create_notification_from_current_message(
281 channel);
282 if (!notification) {
283 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
284 goto end_clean_poll;
285 }
286 break;
287 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED:
288 /* No payload to consume. */
289 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED;
290 break;
291 default:
292 /* Protocol error. */
293 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
294 goto end_clean_poll;
295 }
296
297 end_clean_poll:
298 lttng_poll_clean(&events);
299 end_unlock:
300 pthread_mutex_unlock(&channel->lock);
301 *_notification = notification;
302 end:
303 return status;
304 }
305
306 static
307 int enqueue_dropped_notification(
308 struct lttng_notification_channel *channel)
309 {
310 int ret = 0;
311 struct pending_notification *pending_notification;
312 struct cds_list_head *last_element =
313 channel->pending_notifications.list.prev;
314
315 pending_notification = caa_container_of(last_element,
316 struct pending_notification, node);
317 if (!pending_notification->notification) {
318 /*
319 * The last enqueued notification indicates dropped
320 * notifications; there is nothing to do as we group
321 * dropped notifications together.
322 */
323 goto end;
324 }
325
326 if (channel->pending_notifications.count >=
327 DEFAULT_CLIENT_MAX_QUEUED_NOTIFICATIONS_COUNT &&
328 pending_notification->notification) {
329 /*
330 * Discard the last enqueued notification to indicate
331 * that notifications were dropped at this point.
332 */
333 lttng_notification_destroy(
334 pending_notification->notification);
335 pending_notification->notification = NULL;
336 goto end;
337 }
338
339 pending_notification = zmalloc(sizeof(*pending_notification));
340 if (!pending_notification) {
341 ret = -1;
342 goto end;
343 }
344 CDS_INIT_LIST_HEAD(&pending_notification->node);
345 cds_list_add(&pending_notification->node,
346 &channel->pending_notifications.list);
347 channel->pending_notifications.count++;
348 end:
349 return ret;
350 }
351
352 static
353 int enqueue_notification_from_current_message(
354 struct lttng_notification_channel *channel)
355 {
356 int ret = 0;
357 struct lttng_notification *notification;
358 struct pending_notification *pending_notification;
359
360 if (channel->pending_notifications.count >=
361 DEFAULT_CLIENT_MAX_QUEUED_NOTIFICATIONS_COUNT) {
362 /* Drop the notification. */
363 ret = enqueue_dropped_notification(channel);
364 goto end;
365 }
366
367 pending_notification = zmalloc(sizeof(*pending_notification));
368 if (!pending_notification) {
369 ret = -1;
370 goto error;
371 }
372 CDS_INIT_LIST_HEAD(&pending_notification->node);
373
374 notification = create_notification_from_current_message(channel);
375 if (!notification) {
376 ret = -1;
377 goto error;
378 }
379
380 pending_notification->notification = notification;
381 cds_list_add(&pending_notification->node,
382 &channel->pending_notifications.list);
383 channel->pending_notifications.count++;
384 end:
385 return ret;
386 error:
387 free(pending_notification);
388 goto end;
389 }
390
391 enum lttng_notification_channel_status
392 lttng_notification_channel_has_pending_notification(
393 struct lttng_notification_channel *channel,
394 bool *_notification_pending)
395 {
396 int ret;
397 enum lttng_notification_channel_status status =
398 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
399 struct lttng_poll_event events;
400
401 if (!channel || !_notification_pending) {
402 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
403 goto end;
404 }
405
406 pthread_mutex_lock(&channel->lock);
407
408 if (channel->pending_notifications.count) {
409 *_notification_pending = true;
410 goto end_unlock;
411 }
412
413 if (channel->socket < 0) {
414 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED;
415 goto end_unlock;
416 }
417
418 /*
419 * Check, without blocking, if data is available on the channel's
420 * socket. If there is data available, it is safe to read (blocking)
421 * on the socket for a message from the session daemon.
422 *
423 * Since all commands wait for the session daemon's reply before
424 * releasing the channel's lock, the protocol only allows for
425 * notifications and "notification dropped" messages to come
426 * through. If we receive a different message type, it is
427 * considered a protocol error.
428 *
429 * Note that this function is not guaranteed not to block. This
430 * will block until our peer (the session daemon) has sent a complete
431 * message if we see data available on the socket. If the peer does
432 * not respect the protocol, this may block indefinitely.
433 */
434 ret = lttng_poll_create(&events, 1, LTTNG_CLOEXEC);
435 if (ret < 0) {
436 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
437 goto end_unlock;
438 }
439 ret = lttng_poll_add(&events, channel->socket, LPOLLIN | LPOLLERR);
440 if (ret < 0) {
441 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
442 goto end_clean_poll;
443 }
444 /* timeout = 0: return immediately. */
445 ret = lttng_poll_wait_interruptible(&events, 0);
446 if (ret == 0) {
447 /* No data available. */
448 *_notification_pending = false;
449 goto end_clean_poll;
450 } else if (ret < 0) {
451 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
452 goto end_clean_poll;
453 }
454
455 /* Data available on socket. */
456 ret = receive_message(channel);
457 if (ret) {
458 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
459 goto end_clean_poll;
460 }
461
462 switch (get_current_message_type(channel)) {
463 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION:
464 ret = enqueue_notification_from_current_message(channel);
465 if (ret) {
466 goto end_clean_poll;
467 }
468 *_notification_pending = true;
469 break;
470 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED:
471 ret = enqueue_dropped_notification(channel);
472 if (ret) {
473 goto end_clean_poll;
474 }
475 *_notification_pending = true;
476 break;
477 default:
478 /* Protocol error. */
479 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
480 goto end_clean_poll;
481 }
482
483 end_clean_poll:
484 lttng_poll_clean(&events);
485 end_unlock:
486 pthread_mutex_unlock(&channel->lock);
487 end:
488 return status;
489 }
490
491 static
492 int receive_command_reply(struct lttng_notification_channel *channel,
493 enum lttng_notification_channel_status *status)
494 {
495 int ret;
496 struct lttng_notification_channel_command_reply *reply;
497
498 while (true) {
499 enum lttng_notification_channel_message_type msg_type;
500
501 ret = receive_message(channel);
502 if (ret) {
503 goto end;
504 }
505
506 msg_type = get_current_message_type(channel);
507 switch (msg_type) {
508 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY:
509 goto exit_loop;
510 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION:
511 ret = enqueue_notification_from_current_message(
512 channel);
513 if (ret) {
514 goto end;
515 }
516 break;
517 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED:
518 ret = enqueue_dropped_notification(channel);
519 if (ret) {
520 goto end;
521 }
522 break;
523 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
524 {
525 struct lttng_notification_channel_command_handshake *handshake;
526
527 handshake = (struct lttng_notification_channel_command_handshake *)
528 (channel->reception_buffer.data +
529 sizeof(struct lttng_notification_channel_message));
530 channel->version.major = handshake->major;
531 channel->version.minor = handshake->minor;
532 channel->version.set = true;
533 break;
534 }
535 default:
536 ret = -1;
537 goto end;
538 }
539 }
540
541 exit_loop:
542 if (channel->reception_buffer.size <
543 (sizeof(struct lttng_notification_channel_message) +
544 sizeof(*reply))) {
545 /* Invalid message received. */
546 ret = -1;
547 goto end;
548 }
549
550 reply = (struct lttng_notification_channel_command_reply *)
551 (channel->reception_buffer.data +
552 sizeof(struct lttng_notification_channel_message));
553 *status = (enum lttng_notification_channel_status) reply->status;
554 end:
555 return ret;
556 }
557
558 static
559 int handshake(struct lttng_notification_channel *channel)
560 {
561 ssize_t ret;
562 enum lttng_notification_channel_status status =
563 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
564 struct lttng_notification_channel_command_handshake handshake = {
565 .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
566 .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
567 };
568 struct lttng_notification_channel_message msg_header = {
569 .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE,
570 .size = sizeof(handshake),
571 };
572 char send_buffer[sizeof(msg_header) + sizeof(handshake)];
573
574 memcpy(send_buffer, &msg_header, sizeof(msg_header));
575 memcpy(send_buffer + sizeof(msg_header), &handshake, sizeof(handshake));
576
577 pthread_mutex_lock(&channel->lock);
578
579 ret = lttcomm_send_creds_unix_sock(channel->socket, send_buffer,
580 sizeof(send_buffer));
581 if (ret < 0) {
582 goto end_unlock;
583 }
584
585 /* Receive handshake info from the sessiond. */
586 ret = receive_command_reply(channel, &status);
587 if (ret < 0) {
588 goto end_unlock;
589 }
590
591 if (!channel->version.set) {
592 ret = -1;
593 goto end_unlock;
594 }
595
596 if (channel->version.major != LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) {
597 ret = -1;
598 goto end_unlock;
599 }
600
601 end_unlock:
602 pthread_mutex_unlock(&channel->lock);
603 return ret;
604 }
605
606 static
607 enum lttng_notification_channel_status send_condition_command(
608 struct lttng_notification_channel *channel,
609 enum lttng_notification_channel_message_type type,
610 const struct lttng_condition *condition)
611 {
612 int socket;
613 ssize_t ret;
614 enum lttng_notification_channel_status status =
615 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
616 struct lttng_dynamic_buffer buffer;
617 struct lttng_notification_channel_message cmd_header = {
618 .type = (int8_t) type,
619 };
620
621 lttng_dynamic_buffer_init(&buffer);
622
623 if (!channel) {
624 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
625 goto end;
626 }
627
628 assert(type == LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE ||
629 type == LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE);
630
631 pthread_mutex_lock(&channel->lock);
632 socket = channel->socket;
633 if (!lttng_condition_validate(condition)) {
634 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
635 goto end_unlock;
636 }
637
638 ret = lttng_dynamic_buffer_append(&buffer, &cmd_header,
639 sizeof(cmd_header));
640 if (ret) {
641 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
642 goto end_unlock;
643 }
644
645 ret = lttng_condition_serialize(condition, &buffer);
646 if (ret) {
647 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
648 goto end_unlock;
649 }
650
651 /* Update payload length. */
652 ((struct lttng_notification_channel_message *) buffer.data)->size =
653 (uint32_t) (buffer.size - sizeof(cmd_header));
654
655 ret = lttcomm_send_unix_sock(socket, buffer.data, buffer.size);
656 if (ret < 0) {
657 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
658 goto end_unlock;
659 }
660
661 ret = receive_command_reply(channel, &status);
662 if (ret < 0) {
663 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
664 goto end_unlock;
665 }
666 end_unlock:
667 pthread_mutex_unlock(&channel->lock);
668 end:
669 lttng_dynamic_buffer_reset(&buffer);
670 return status;
671 }
672
673 enum lttng_notification_channel_status lttng_notification_channel_subscribe(
674 struct lttng_notification_channel *channel,
675 const struct lttng_condition *condition)
676 {
677 return send_condition_command(channel,
678 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE,
679 condition);
680 }
681
682 enum lttng_notification_channel_status lttng_notification_channel_unsubscribe(
683 struct lttng_notification_channel *channel,
684 const struct lttng_condition *condition)
685 {
686 return send_condition_command(channel,
687 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE,
688 condition);
689 }
690
691 void lttng_notification_channel_destroy(
692 struct lttng_notification_channel *channel)
693 {
694 if (!channel) {
695 return;
696 }
697
698 if (channel->socket >= 0) {
699 (void) lttcomm_close_unix_sock(channel->socket);
700 }
701 pthread_mutex_destroy(&channel->lock);
702 lttng_dynamic_buffer_reset(&channel->reception_buffer);
703 free(channel);
704 }
This page took 0.04316 seconds and 5 git commands to generate.