Remove unnecessary check of output parameter
[lttng-tools.git] / src / lib / lttng-ctl / channel.c
1 /*
2 * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * This library is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU Lesser General Public License, version 2.1 only,
6 * as published by the Free Software Foundation.
7 *
8 * This library is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
11 * for more details.
12 *
13 * You should have received a copy of the GNU Lesser General Public License
14 * along with this library; if not, write to the Free Software Foundation,
15 * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17
18 #include <lttng/notification/notification-internal.h>
19 #include <lttng/notification/channel-internal.h>
20 #include <lttng/condition/condition-internal.h>
21 #include <lttng/endpoint.h>
22 #include <common/defaults.h>
23 #include <common/error.h>
24 #include <common/dynamic-buffer.h>
25 #include <common/utils.h>
26 #include <common/defaults.h>
27 #include <assert.h>
28 #include "lttng-ctl-helper.h"
29 #include <sys/select.h>
30 #include <sys/time.h>
31
32 static
33 int handshake(struct lttng_notification_channel *channel);
34
35 /*
36 * Populates the reception buffer with the next complete message.
37 * The caller must acquire the channel's lock.
38 */
39 static
40 int receive_message(struct lttng_notification_channel *channel)
41 {
42 ssize_t ret;
43 struct lttng_notification_channel_message msg;
44
45 if (lttng_dynamic_buffer_set_size(&channel->reception_buffer, 0)) {
46 ret = -1;
47 goto end;
48 }
49
50 ret = lttcomm_recv_unix_sock(channel->socket, &msg, sizeof(msg));
51 if (ret <= 0) {
52 ret = -1;
53 goto error;
54 }
55
56 if (msg.size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) {
57 ret = -1;
58 goto error;
59 }
60
61 /* Add message header at buffer's start. */
62 ret = lttng_dynamic_buffer_append(&channel->reception_buffer, &msg,
63 sizeof(msg));
64 if (ret) {
65 goto error;
66 }
67
68 /* Reserve space for the payload. */
69 ret = lttng_dynamic_buffer_set_size(&channel->reception_buffer,
70 channel->reception_buffer.size + msg.size);
71 if (ret) {
72 goto error;
73 }
74
75 /* Receive message payload. */
76 ret = lttcomm_recv_unix_sock(channel->socket,
77 channel->reception_buffer.data + sizeof(msg), msg.size);
78 if (ret < (ssize_t) msg.size) {
79 ret = -1;
80 goto error;
81 }
82 ret = 0;
83 end:
84 return ret;
85 error:
86 if (lttng_dynamic_buffer_set_size(&channel->reception_buffer, 0)) {
87 ret = -1;
88 }
89 goto end;
90 }
91
92 static
93 enum lttng_notification_channel_message_type get_current_message_type(
94 struct lttng_notification_channel *channel)
95 {
96 struct lttng_notification_channel_message *msg;
97
98 assert(channel->reception_buffer.size >= sizeof(*msg));
99
100 msg = (struct lttng_notification_channel_message *)
101 channel->reception_buffer.data;
102 return (enum lttng_notification_channel_message_type) msg->type;
103 }
104
105 static
106 struct lttng_notification *create_notification_from_current_message(
107 struct lttng_notification_channel *channel)
108 {
109 ssize_t ret;
110 struct lttng_notification *notification = NULL;
111 struct lttng_buffer_view view;
112
113 if (channel->reception_buffer.size <=
114 sizeof(struct lttng_notification_channel_message)) {
115 goto end;
116 }
117
118 view = lttng_buffer_view_from_dynamic_buffer(&channel->reception_buffer,
119 sizeof(struct lttng_notification_channel_message), -1);
120
121 ret = lttng_notification_create_from_buffer(&view, &notification);
122 if (ret != channel->reception_buffer.size -
123 sizeof(struct lttng_notification_channel_message)) {
124 lttng_notification_destroy(notification);
125 notification = NULL;
126 goto end;
127 }
128 end:
129 return notification;
130 }
131
132 struct lttng_notification_channel *lttng_notification_channel_create(
133 struct lttng_endpoint *endpoint)
134 {
135 int fd, ret;
136 bool is_in_tracing_group = false, is_root = false;
137 char *sock_path = NULL;
138 struct lttng_notification_channel *channel = NULL;
139
140 if (!endpoint ||
141 endpoint != lttng_session_daemon_notification_endpoint) {
142 goto end;
143 }
144
145 sock_path = zmalloc(LTTNG_PATH_MAX);
146 if (!sock_path) {
147 goto end;
148 }
149
150 channel = zmalloc(sizeof(struct lttng_notification_channel));
151 if (!channel) {
152 goto end;
153 }
154 channel->socket = -1;
155 pthread_mutex_init(&channel->lock, NULL);
156 lttng_dynamic_buffer_init(&channel->reception_buffer);
157 CDS_INIT_LIST_HEAD(&channel->pending_notifications.list);
158
159 is_root = (getuid() == 0);
160 if (!is_root) {
161 is_in_tracing_group = lttng_check_tracing_group();
162 }
163
164 if (is_root || is_in_tracing_group) {
165 lttng_ctl_copy_string(sock_path,
166 DEFAULT_GLOBAL_NOTIFICATION_CHANNEL_UNIX_SOCK,
167 LTTNG_PATH_MAX);
168 ret = lttcomm_connect_unix_sock(sock_path);
169 if (ret >= 0) {
170 fd = ret;
171 goto set_fd;
172 }
173 }
174
175 /* Fallback to local session daemon. */
176 ret = snprintf(sock_path, LTTNG_PATH_MAX,
177 DEFAULT_HOME_NOTIFICATION_CHANNEL_UNIX_SOCK,
178 utils_get_home_dir());
179 if (ret < 0 || ret >= LTTNG_PATH_MAX) {
180 goto error;
181 }
182
183 ret = lttcomm_connect_unix_sock(sock_path);
184 if (ret < 0) {
185 goto error;
186 }
187 fd = ret;
188
189 set_fd:
190 channel->socket = fd;
191
192 ret = handshake(channel);
193 if (ret) {
194 goto error;
195 }
196 end:
197 free(sock_path);
198 return channel;
199 error:
200 lttng_notification_channel_destroy(channel);
201 channel = NULL;
202 goto end;
203 }
204
205 enum lttng_notification_channel_status
206 lttng_notification_channel_get_next_notification(
207 struct lttng_notification_channel *channel,
208 struct lttng_notification **_notification)
209 {
210 int ret;
211 struct lttng_notification *notification = NULL;
212 enum lttng_notification_channel_status status =
213 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
214 fd_set read_fds;
215
216 if (!channel || !_notification) {
217 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
218 goto end;
219 }
220
221 pthread_mutex_lock(&channel->lock);
222
223 if (channel->pending_notifications.count) {
224 struct pending_notification *pending_notification;
225
226 assert(!cds_list_empty(&channel->pending_notifications.list));
227
228 /* Deliver one of the pending notifications. */
229 pending_notification = cds_list_first_entry(
230 &channel->pending_notifications.list,
231 struct pending_notification,
232 node);
233 notification = pending_notification->notification;
234 if (!notification) {
235 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED;
236 }
237 cds_list_del(&pending_notification->node);
238 channel->pending_notifications.count--;
239 free(pending_notification);
240 goto end_unlock;
241 }
242
243 /*
244 * Block on select() instead of the message reception itself as the
245 * recvmsg() wrappers always restard on EINTR. We choose to wait
246 * using select() in order to:
247 * 1) Return if a signal occurs,
248 * 2) Not deal with partially received messages.
249 *
250 * The drawback to this approach is that we assume that messages
251 * are complete/well formed. If a message is shorter than its
252 * announced length, receive_message() will block on recvmsg()
253 * and never return (even if a signal is received).
254 */
255 FD_ZERO(&read_fds);
256 FD_SET(channel->socket, &read_fds);
257 ret = select(channel->socket + 1, &read_fds, NULL, NULL, NULL);
258 if (ret == -1) {
259 status = errno == EINTR ?
260 LTTNG_NOTIFICATION_CHANNEL_STATUS_INTERRUPTED :
261 LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
262 goto end_unlock;
263 }
264
265 ret = receive_message(channel);
266 if (ret) {
267 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
268 goto end_unlock;
269 }
270
271 switch (get_current_message_type(channel)) {
272 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION:
273 notification = create_notification_from_current_message(
274 channel);
275 if (!notification) {
276 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
277 goto end_unlock;
278 }
279 break;
280 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED:
281 /* No payload to consume. */
282 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED;
283 break;
284 default:
285 /* Protocol error. */
286 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
287 goto end_unlock;
288 }
289
290 end_unlock:
291 pthread_mutex_unlock(&channel->lock);
292 *_notification = notification;
293 end:
294 return status;
295 }
296
297 static
298 int enqueue_dropped_notification(
299 struct lttng_notification_channel *channel)
300 {
301 int ret = 0;
302 struct pending_notification *pending_notification;
303 struct cds_list_head *last_element =
304 channel->pending_notifications.list.prev;
305
306 pending_notification = caa_container_of(last_element,
307 struct pending_notification, node);
308 if (!pending_notification->notification) {
309 /*
310 * The last enqueued notification indicates dropped
311 * notifications; there is nothing to do as we group
312 * dropped notifications together.
313 */
314 goto end;
315 }
316
317 if (channel->pending_notifications.count >=
318 DEFAULT_CLIENT_MAX_QUEUED_NOTIFICATIONS_COUNT &&
319 pending_notification->notification) {
320 /*
321 * Discard the last enqueued notification to indicate
322 * that notifications were dropped at this point.
323 */
324 lttng_notification_destroy(
325 pending_notification->notification);
326 pending_notification->notification = NULL;
327 goto end;
328 }
329
330 pending_notification = zmalloc(sizeof(*pending_notification));
331 if (!pending_notification) {
332 ret = -1;
333 goto end;
334 }
335 CDS_INIT_LIST_HEAD(&pending_notification->node);
336 cds_list_add(&pending_notification->node,
337 &channel->pending_notifications.list);
338 channel->pending_notifications.count++;
339 end:
340 return ret;
341 }
342
343 static
344 int enqueue_notification_from_current_message(
345 struct lttng_notification_channel *channel)
346 {
347 int ret = 0;
348 struct lttng_notification *notification;
349 struct pending_notification *pending_notification;
350
351 if (channel->pending_notifications.count >=
352 DEFAULT_CLIENT_MAX_QUEUED_NOTIFICATIONS_COUNT) {
353 /* Drop the notification. */
354 ret = enqueue_dropped_notification(channel);
355 goto end;
356 }
357
358 pending_notification = zmalloc(sizeof(*pending_notification));
359 if (!pending_notification) {
360 ret = -1;
361 goto error;
362 }
363 CDS_INIT_LIST_HEAD(&pending_notification->node);
364
365 notification = create_notification_from_current_message(channel);
366 if (!notification) {
367 ret = -1;
368 goto error;
369 }
370
371 pending_notification->notification = notification;
372 cds_list_add(&pending_notification->node,
373 &channel->pending_notifications.list);
374 channel->pending_notifications.count++;
375 end:
376 return ret;
377 error:
378 free(pending_notification);
379 goto end;
380 }
381
382 enum lttng_notification_channel_status
383 lttng_notification_channel_has_pending_notification(
384 struct lttng_notification_channel *channel,
385 bool *_notification_pending)
386 {
387 int ret;
388 enum lttng_notification_channel_status status =
389 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
390 fd_set read_fds;
391 struct timeval timeout;
392
393 FD_ZERO(&read_fds);
394 memset(&timeout, 0, sizeof(timeout));
395
396 if (!channel || !_notification_pending) {
397 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
398 goto end;
399 }
400
401 pthread_mutex_lock(&channel->lock);
402
403 if (channel->pending_notifications.count) {
404 *_notification_pending = true;
405 goto end_unlock;
406 }
407
408 if (channel->socket < 0) {
409 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED;
410 goto end_unlock;
411 }
412
413 /*
414 * Check, without blocking, if data is available on the channel's
415 * socket. If there is data available, it is safe to read (blocking)
416 * on the socket for a message from the session daemon.
417 *
418 * Since all commands wait for the session daemon's reply before
419 * releasing the channel's lock, the protocol only allows for
420 * notifications and "notification dropped" messages to come
421 * through. If we receive a different message type, it is
422 * considered a protocol error.
423 *
424 * Note that this function is not guaranteed not to block. This
425 * will block until our peer (the session daemon) has sent a complete
426 * message if we see data available on the socket. If the peer does
427 * not respect the protocol, this may block indefinitely.
428 */
429 FD_SET(channel->socket, &read_fds);
430 do {
431 ret = select(channel->socket + 1, &read_fds, NULL, NULL, &timeout);
432 } while (ret < 0 && errno == EINTR);
433
434 if (ret == 0) {
435 /* No data available. */
436 *_notification_pending = false;
437 goto end_unlock;
438 } else if (ret < 0) {
439 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
440 goto end_unlock;
441 }
442
443 /* Data available on socket. */
444 ret = receive_message(channel);
445 if (ret) {
446 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
447 goto end_unlock;
448 }
449
450 switch (get_current_message_type(channel)) {
451 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION:
452 ret = enqueue_notification_from_current_message(channel);
453 if (ret) {
454 goto end_unlock;
455 }
456 *_notification_pending = true;
457 break;
458 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED:
459 ret = enqueue_dropped_notification(channel);
460 if (ret) {
461 goto end_unlock;
462 }
463 *_notification_pending = true;
464 break;
465 default:
466 /* Protocol error. */
467 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
468 goto end_unlock;
469 }
470
471 end_unlock:
472 pthread_mutex_unlock(&channel->lock);
473 end:
474 return status;
475 }
476
477 static
478 int receive_command_reply(struct lttng_notification_channel *channel,
479 enum lttng_notification_channel_status *status)
480 {
481 int ret;
482 struct lttng_notification_channel_command_reply *reply;
483
484 while (true) {
485 enum lttng_notification_channel_message_type msg_type;
486
487 ret = receive_message(channel);
488 if (ret) {
489 goto end;
490 }
491
492 msg_type = get_current_message_type(channel);
493 switch (msg_type) {
494 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY:
495 goto exit_loop;
496 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION:
497 ret = enqueue_notification_from_current_message(
498 channel);
499 if (ret) {
500 goto end;
501 }
502 break;
503 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED:
504 ret = enqueue_dropped_notification(channel);
505 if (ret) {
506 goto end;
507 }
508 break;
509 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
510 {
511 struct lttng_notification_channel_command_handshake *handshake;
512
513 handshake = (struct lttng_notification_channel_command_handshake *)
514 (channel->reception_buffer.data +
515 sizeof(struct lttng_notification_channel_message));
516 channel->version.major = handshake->major;
517 channel->version.minor = handshake->minor;
518 channel->version.set = true;
519 break;
520 }
521 default:
522 ret = -1;
523 goto end;
524 }
525 }
526
527 exit_loop:
528 if (channel->reception_buffer.size <
529 (sizeof(struct lttng_notification_channel_message) +
530 sizeof(*reply))) {
531 /* Invalid message received. */
532 ret = -1;
533 goto end;
534 }
535
536 reply = (struct lttng_notification_channel_command_reply *)
537 (channel->reception_buffer.data +
538 sizeof(struct lttng_notification_channel_message));
539 *status = (enum lttng_notification_channel_status) reply->status;
540 end:
541 return ret;
542 }
543
544 static
545 int handshake(struct lttng_notification_channel *channel)
546 {
547 ssize_t ret;
548 enum lttng_notification_channel_status status =
549 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
550 struct lttng_notification_channel_command_handshake handshake = {
551 .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
552 .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
553 };
554 struct lttng_notification_channel_message msg_header = {
555 .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE,
556 .size = sizeof(handshake),
557 };
558 char send_buffer[sizeof(msg_header) + sizeof(handshake)];
559
560 memcpy(send_buffer, &msg_header, sizeof(msg_header));
561 memcpy(send_buffer + sizeof(msg_header), &handshake, sizeof(handshake));
562
563 pthread_mutex_lock(&channel->lock);
564
565 ret = lttcomm_send_creds_unix_sock(channel->socket, send_buffer,
566 sizeof(send_buffer));
567 if (ret < 0) {
568 goto end_unlock;
569 }
570
571 /* Receive handshake info from the sessiond. */
572 ret = receive_command_reply(channel, &status);
573 if (ret < 0) {
574 goto end_unlock;
575 }
576
577 if (!channel->version.set) {
578 ret = -1;
579 goto end_unlock;
580 }
581
582 if (channel->version.major != LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) {
583 ret = -1;
584 goto end_unlock;
585 }
586
587 end_unlock:
588 pthread_mutex_unlock(&channel->lock);
589 return ret;
590 }
591
592 static
593 enum lttng_notification_channel_status send_condition_command(
594 struct lttng_notification_channel *channel,
595 enum lttng_notification_channel_message_type type,
596 const struct lttng_condition *condition)
597 {
598 int socket;
599 ssize_t ret;
600 enum lttng_notification_channel_status status =
601 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
602 struct lttng_dynamic_buffer buffer;
603 struct lttng_notification_channel_message cmd_header = {
604 .type = (int8_t) type,
605 };
606
607 lttng_dynamic_buffer_init(&buffer);
608
609 if (!channel) {
610 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
611 goto end;
612 }
613
614 assert(type == LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE ||
615 type == LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE);
616
617 pthread_mutex_lock(&channel->lock);
618 socket = channel->socket;
619 if (!lttng_condition_validate(condition)) {
620 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
621 goto end_unlock;
622 }
623
624 ret = lttng_dynamic_buffer_append(&buffer, &cmd_header,
625 sizeof(cmd_header));
626 if (ret) {
627 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
628 goto end_unlock;
629 }
630
631 ret = lttng_condition_serialize(condition, &buffer);
632 if (ret) {
633 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
634 goto end_unlock;
635 }
636
637 /* Update payload length. */
638 ((struct lttng_notification_channel_message *) buffer.data)->size =
639 (uint32_t) (buffer.size - sizeof(cmd_header));
640
641 ret = lttcomm_send_unix_sock(socket, buffer.data, buffer.size);
642 if (ret < 0) {
643 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
644 goto end_unlock;
645 }
646
647 ret = receive_command_reply(channel, &status);
648 if (ret < 0) {
649 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
650 goto end_unlock;
651 }
652 end_unlock:
653 pthread_mutex_unlock(&channel->lock);
654 end:
655 lttng_dynamic_buffer_reset(&buffer);
656 return status;
657 }
658
659 enum lttng_notification_channel_status lttng_notification_channel_subscribe(
660 struct lttng_notification_channel *channel,
661 const struct lttng_condition *condition)
662 {
663 return send_condition_command(channel,
664 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE,
665 condition);
666 }
667
668 enum lttng_notification_channel_status lttng_notification_channel_unsubscribe(
669 struct lttng_notification_channel *channel,
670 const struct lttng_condition *condition)
671 {
672 return send_condition_command(channel,
673 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE,
674 condition);
675 }
676
677 void lttng_notification_channel_destroy(
678 struct lttng_notification_channel *channel)
679 {
680 if (!channel) {
681 return;
682 }
683
684 if (channel->socket >= 0) {
685 (void) lttcomm_close_unix_sock(channel->socket);
686 }
687 pthread_mutex_destroy(&channel->lock);
688 lttng_dynamic_buffer_reset(&channel->reception_buffer);
689 free(channel);
690 }
This page took 0.046322 seconds and 4 git commands to generate.