fix: write EINTR handling
[lttng-tools.git] / src / bin / lttng-relayd / main.c
1 /*
2 * Copyright (C) 2012 - Julien Desfossez <jdesfossez@efficios.com>
3 * David Goulet <dgoulet@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License, version 2 only,
7 * as published by the Free Software Foundation.
8 *
9 * This program is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
12 * more details.
13 *
14 * You should have received a copy of the GNU General Public License along
15 * with this program; if not, write to the Free Software Foundation, Inc.,
16 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17 */
18
19 #define _GNU_SOURCE
20 #include <getopt.h>
21 #include <grp.h>
22 #include <limits.h>
23 #include <pthread.h>
24 #include <signal.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <sys/mman.h>
29 #include <sys/mount.h>
30 #include <sys/resource.h>
31 #include <sys/socket.h>
32 #include <sys/stat.h>
33 #include <sys/types.h>
34 #include <sys/wait.h>
35 #include <urcu/futex.h>
36 #include <urcu/uatomic.h>
37 #include <unistd.h>
38 #include <fcntl.h>
39 #include <config.h>
40
41 #include <lttng/lttng.h>
42 #include <common/common.h>
43 #include <common/compat/poll.h>
44 #include <common/compat/socket.h>
45 #include <common/defaults.h>
46 #include <common/futex.h>
47 #include <common/sessiond-comm/sessiond-comm.h>
48 #include <common/sessiond-comm/inet.h>
49 #include <common/hashtable/hashtable.h>
50 #include <common/sessiond-comm/relayd.h>
51 #include <common/uri.h>
52 #include <common/utils.h>
53
54 #include "lttng-relayd.h"
55
56 /* command line options */
57 static int opt_daemon;
58 static char *opt_output_path;
59 static struct lttng_uri *control_uri = NULL;
60 static struct lttng_uri *data_uri = NULL;
61
62 const char *progname;
63 static int is_root; /* Set to 1 if the daemon is running as root */
64
65 /*
66 * Quit pipe for all threads. This permits a single cancellation point
67 * for all threads when receiving an event on the pipe.
68 */
69 static int thread_quit_pipe[2] = { -1, -1 };
70
71 /*
72 * This pipe is used to inform the worker thread that a command is queued and
73 * ready to be processed.
74 */
75 static int relay_cmd_pipe[2] = { -1, -1 };
76
77 static int dispatch_thread_exit;
78
79 static pthread_t listener_thread;
80 static pthread_t dispatcher_thread;
81 static pthread_t worker_thread;
82
83 static uint64_t last_relay_stream_id = 0;
84 static uint64_t last_relay_session_id = 0;
85
86 /*
87 * Relay command queue.
88 *
89 * The relay_thread_listener and relay_thread_dispatcher communicate with this
90 * queue.
91 */
92 static struct relay_cmd_queue relay_cmd_queue;
93
94 /* buffer allocated at startup, used to store the trace data */
95 static char *data_buffer = NULL;
96 static unsigned int data_buffer_size = 0;
97
98 /*
99 * usage function on stderr
100 */
101 static
102 void usage(void)
103 {
104 fprintf(stderr, "Usage: %s OPTIONS\n\nOptions:\n", progname);
105 fprintf(stderr, " -h, --help Display this usage.\n");
106 fprintf(stderr, " -d, --daemonize Start as a daemon.\n");
107 fprintf(stderr, " -C, --control-port Control port listening (URI)\n");
108 fprintf(stderr, " -D, --data-port Data port listening (URI)\n");
109 fprintf(stderr, " -o, --output Output path for traces (PATH)\n");
110 fprintf(stderr, " -v, --verbose Verbose mode. Activate DBG() macro.\n");
111 }
112
113 static
114 int parse_args(int argc, char **argv)
115 {
116 int c;
117 int ret = 0;
118 char *default_address;
119
120 static struct option long_options[] = {
121 { "control-port", 1, 0, 'C', },
122 { "data-port", 1, 0, 'D', },
123 { "daemonize", 0, 0, 'd', },
124 { "help", 0, 0, 'h', },
125 { "output", 1, 0, 'o', },
126 { "verbose", 0, 0, 'v', },
127 { NULL, 0, 0, 0 },
128 };
129
130 while (1) {
131 int option_index = 0;
132 c = getopt_long(argc, argv, "dhv" "C:D:o:",
133 long_options, &option_index);
134 if (c == -1) {
135 break;
136 }
137
138 switch (c) {
139 case 0:
140 fprintf(stderr, "option %s", long_options[option_index].name);
141 if (optarg) {
142 fprintf(stderr, " with arg %s\n", optarg);
143 }
144 break;
145 case 'C':
146 ret = uri_parse(optarg, &control_uri);
147 if (ret < 0) {
148 ERR("Invalid control URI specified");
149 goto exit;
150 }
151 if (control_uri->port == 0) {
152 control_uri->port = DEFAULT_NETWORK_CONTROL_PORT;
153 }
154 break;
155 case 'D':
156 ret = uri_parse(optarg, &data_uri);
157 if (ret < 0) {
158 ERR("Invalid data URI specified");
159 goto exit;
160 }
161 if (data_uri->port == 0) {
162 data_uri->port = DEFAULT_NETWORK_DATA_PORT;
163 }
164 break;
165 case 'd':
166 opt_daemon = 1;
167 break;
168 case 'h':
169 usage();
170 exit(EXIT_FAILURE);
171 case 'o':
172 ret = asprintf(&opt_output_path, "%s", optarg);
173 if (ret < 0) {
174 PERROR("asprintf opt_output_path");
175 goto exit;
176 }
177 break;
178 case 'v':
179 /* Verbose level can increase using multiple -v */
180 lttng_opt_verbose += 1;
181 break;
182 default:
183 /* Unknown option or other error.
184 * Error is printed by getopt, just return */
185 ret = -1;
186 goto exit;
187 }
188 }
189
190 /* assign default values */
191 if (control_uri == NULL) {
192 ret = asprintf(&default_address, "tcp://0.0.0.0:%d",
193 DEFAULT_NETWORK_CONTROL_PORT);
194 if (ret < 0) {
195 PERROR("asprintf default data address");
196 goto exit;
197 }
198
199 ret = uri_parse(default_address, &control_uri);
200 free(default_address);
201 if (ret < 0) {
202 ERR("Invalid control URI specified");
203 goto exit;
204 }
205 }
206 if (data_uri == NULL) {
207 ret = asprintf(&default_address, "tcp://0.0.0.0:%d",
208 DEFAULT_NETWORK_DATA_PORT);
209 if (ret < 0) {
210 PERROR("asprintf default data address");
211 goto exit;
212 }
213
214 ret = uri_parse(default_address, &data_uri);
215 free(default_address);
216 if (ret < 0) {
217 ERR("Invalid data URI specified");
218 goto exit;
219 }
220 }
221
222 exit:
223 return ret;
224 }
225
226 /*
227 * Cleanup the daemon
228 */
229 static
230 void cleanup(void)
231 {
232 DBG("Cleaning up");
233
234 /* Close thread quit pipes */
235 utils_close_pipe(thread_quit_pipe);
236
237 /* Close relay cmd pipes */
238 utils_close_pipe(relay_cmd_pipe);
239 }
240
241 /*
242 * Write to writable pipe used to notify a thread.
243 */
244 static
245 int notify_thread_pipe(int wpipe)
246 {
247 int ret;
248
249 do {
250 ret = write(wpipe, "!", 1);
251 } while (ret < 0 && errno == EINTR);
252 if (ret < 0) {
253 PERROR("write poll pipe");
254 }
255
256 return ret;
257 }
258
259 /*
260 * Stop all threads by closing the thread quit pipe.
261 */
262 static
263 void stop_threads(void)
264 {
265 int ret;
266
267 /* Stopping all threads */
268 DBG("Terminating all threads");
269 ret = notify_thread_pipe(thread_quit_pipe[1]);
270 if (ret < 0) {
271 ERR("write error on thread quit pipe");
272 }
273
274 /* Dispatch thread */
275 dispatch_thread_exit = 1;
276 futex_nto1_wake(&relay_cmd_queue.futex);
277 }
278
279 /*
280 * Signal handler for the daemon
281 *
282 * Simply stop all worker threads, leaving main() return gracefully after
283 * joining all threads and calling cleanup().
284 */
285 static
286 void sighandler(int sig)
287 {
288 switch (sig) {
289 case SIGPIPE:
290 DBG("SIGPIPE caught");
291 return;
292 case SIGINT:
293 DBG("SIGINT caught");
294 stop_threads();
295 break;
296 case SIGTERM:
297 DBG("SIGTERM caught");
298 stop_threads();
299 break;
300 default:
301 break;
302 }
303 }
304
305 /*
306 * Setup signal handler for :
307 * SIGINT, SIGTERM, SIGPIPE
308 */
309 static
310 int set_signal_handler(void)
311 {
312 int ret = 0;
313 struct sigaction sa;
314 sigset_t sigset;
315
316 if ((ret = sigemptyset(&sigset)) < 0) {
317 PERROR("sigemptyset");
318 return ret;
319 }
320
321 sa.sa_handler = sighandler;
322 sa.sa_mask = sigset;
323 sa.sa_flags = 0;
324 if ((ret = sigaction(SIGTERM, &sa, NULL)) < 0) {
325 PERROR("sigaction");
326 return ret;
327 }
328
329 if ((ret = sigaction(SIGINT, &sa, NULL)) < 0) {
330 PERROR("sigaction");
331 return ret;
332 }
333
334 if ((ret = sigaction(SIGPIPE, &sa, NULL)) < 0) {
335 PERROR("sigaction");
336 return ret;
337 }
338
339 DBG("Signal handler set for SIGTERM, SIGPIPE and SIGINT");
340
341 return ret;
342 }
343
344 /*
345 * Init thread quit pipe.
346 *
347 * Return -1 on error or 0 if all pipes are created.
348 */
349 static
350 int init_thread_quit_pipe(void)
351 {
352 int ret;
353
354 ret = utils_create_pipe_cloexec(thread_quit_pipe);
355
356 return ret;
357 }
358
359 /*
360 * Create a poll set with O_CLOEXEC and add the thread quit pipe to the set.
361 */
362 static
363 int create_thread_poll_set(struct lttng_poll_event *events, int size)
364 {
365 int ret;
366
367 if (events == NULL || size == 0) {
368 ret = -1;
369 goto error;
370 }
371
372 ret = lttng_poll_create(events, size, LTTNG_CLOEXEC);
373 if (ret < 0) {
374 goto error;
375 }
376
377 /* Add quit pipe */
378 ret = lttng_poll_add(events, thread_quit_pipe[0], LPOLLIN);
379 if (ret < 0) {
380 goto error;
381 }
382
383 return 0;
384
385 error:
386 return ret;
387 }
388
389 /*
390 * Check if the thread quit pipe was triggered.
391 *
392 * Return 1 if it was triggered else 0;
393 */
394 static
395 int check_thread_quit_pipe(int fd, uint32_t events)
396 {
397 if (fd == thread_quit_pipe[0] && (events & LPOLLIN)) {
398 return 1;
399 }
400
401 return 0;
402 }
403
404 /*
405 * Create and init socket from uri.
406 */
407 static
408 struct lttcomm_sock *relay_init_sock(struct lttng_uri *uri)
409 {
410 int ret;
411 struct lttcomm_sock *sock = NULL;
412
413 sock = lttcomm_alloc_sock_from_uri(uri);
414 if (sock == NULL) {
415 ERR("Allocating socket");
416 goto error;
417 }
418
419 ret = lttcomm_create_sock(sock);
420 if (ret < 0) {
421 goto error;
422 }
423 DBG("Listening on sock %d", sock->fd);
424
425 ret = sock->ops->bind(sock);
426 if (ret < 0) {
427 goto error;
428 }
429
430 ret = sock->ops->listen(sock, -1);
431 if (ret < 0) {
432 goto error;
433
434 }
435
436 return sock;
437
438 error:
439 if (sock) {
440 lttcomm_destroy_sock(sock);
441 }
442 return NULL;
443 }
444
445 /*
446 * This thread manages the listening for new connections on the network
447 */
448 static
449 void *relay_thread_listener(void *data)
450 {
451 int i, ret, pollfd;
452 int val = 1;
453 uint32_t revents, nb_fd;
454 struct lttng_poll_event events;
455 struct lttcomm_sock *control_sock, *data_sock;
456
457 /*
458 * Get allocated in this thread, enqueued to a global queue, dequeued and
459 * freed in the worker thread.
460 */
461 struct relay_command *relay_cmd = NULL;
462
463 DBG("[thread] Relay listener started");
464
465 control_sock = relay_init_sock(control_uri);
466 if (!control_sock) {
467 goto error_sock;
468 }
469
470 data_sock = relay_init_sock(data_uri);
471 if (!data_sock) {
472 goto error_sock;
473 }
474
475 /*
476 * Pass 3 as size here for the thread quit pipe, control and data socket.
477 */
478 ret = create_thread_poll_set(&events, 3);
479 if (ret < 0) {
480 goto error_create_poll;
481 }
482
483 /* Add the control socket */
484 ret = lttng_poll_add(&events, control_sock->fd, LPOLLIN | LPOLLRDHUP);
485 if (ret < 0) {
486 goto error_poll_add;
487 }
488
489 /* Add the data socket */
490 ret = lttng_poll_add(&events, data_sock->fd, LPOLLIN | LPOLLRDHUP);
491 if (ret < 0) {
492 goto error_poll_add;
493 }
494
495 while (1) {
496 DBG("Listener accepting connections");
497
498 nb_fd = LTTNG_POLL_GETNB(&events);
499
500 restart:
501 ret = lttng_poll_wait(&events, -1);
502 if (ret < 0) {
503 /*
504 * Restart interrupted system call.
505 */
506 if (errno == EINTR) {
507 goto restart;
508 }
509 goto error;
510 }
511
512 DBG("Relay new connection received");
513 for (i = 0; i < nb_fd; i++) {
514 /* Fetch once the poll data */
515 revents = LTTNG_POLL_GETEV(&events, i);
516 pollfd = LTTNG_POLL_GETFD(&events, i);
517
518 /* Thread quit pipe has been closed. Killing thread. */
519 ret = check_thread_quit_pipe(pollfd, revents);
520 if (ret) {
521 goto error;
522 }
523
524 if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
525 ERR("socket poll error");
526 goto error;
527 } else if (revents & LPOLLIN) {
528 struct lttcomm_sock *newsock = NULL;
529
530 relay_cmd = zmalloc(sizeof(struct relay_command));
531 if (relay_cmd == NULL) {
532 PERROR("relay command zmalloc");
533 goto error;
534 }
535
536 if (pollfd == data_sock->fd) {
537 newsock = data_sock->ops->accept(data_sock);
538 if (newsock < 0) {
539 PERROR("accepting data sock");
540 goto error;
541 }
542 relay_cmd->type = RELAY_DATA;
543 DBG("Relay data connection accepted, socket %d", newsock->fd);
544 } else if (pollfd == control_sock->fd) {
545 newsock = control_sock->ops->accept(control_sock);
546 if (newsock < 0) {
547 PERROR("accepting control sock");
548 goto error;
549 }
550 relay_cmd->type = RELAY_CONTROL;
551 DBG("Relay control connection accepted, socket %d", newsock->fd);
552 }
553 ret = setsockopt(newsock->fd, SOL_SOCKET, SO_REUSEADDR,
554 &val, sizeof(int));
555 if (ret < 0) {
556 PERROR("setsockopt inet");
557 goto error;
558 }
559 relay_cmd->sock = newsock;
560 /*
561 * Lock free enqueue the request.
562 */
563 cds_wfq_enqueue(&relay_cmd_queue.queue, &relay_cmd->node);
564
565 /*
566 * Wake the dispatch queue futex. Implicit memory
567 * barrier with the exchange in cds_wfq_enqueue.
568 */
569 futex_nto1_wake(&relay_cmd_queue.futex);
570 }
571 }
572 }
573
574 error:
575 error_poll_add:
576 lttng_poll_clean(&events);
577 error_create_poll:
578 if (control_sock->fd >= 0) {
579 ret = control_sock->ops->close(control_sock);
580 if (ret) {
581 PERROR("close");
582 }
583 lttcomm_destroy_sock(control_sock);
584 }
585 if (data_sock->fd >= 0) {
586 ret = data_sock->ops->close(data_sock);
587 if (ret) {
588 PERROR("close");
589 }
590 lttcomm_destroy_sock(data_sock);
591 }
592
593 DBG("Relay listener thread cleanup complete");
594 stop_threads();
595 error_sock:
596 return NULL;
597 }
598
599 /*
600 * This thread manages the dispatching of the requests to worker threads
601 */
602 static
603 void *relay_thread_dispatcher(void *data)
604 {
605 int ret;
606 struct cds_wfq_node *node;
607 struct relay_command *relay_cmd = NULL;
608
609 DBG("[thread] Relay dispatcher started");
610
611 while (!dispatch_thread_exit) {
612 /* Atomically prepare the queue futex */
613 futex_nto1_prepare(&relay_cmd_queue.futex);
614
615 do {
616 /* Dequeue commands */
617 node = cds_wfq_dequeue_blocking(&relay_cmd_queue.queue);
618 if (node == NULL) {
619 DBG("Woken up but nothing in the relay command queue");
620 /* Continue thread execution */
621 break;
622 }
623
624 relay_cmd = caa_container_of(node, struct relay_command, node);
625 DBG("Dispatching request waiting on sock %d", relay_cmd->sock->fd);
626
627 /*
628 * Inform worker thread of the new request. This
629 * call is blocking so we can be assured that the data will be read
630 * at some point in time or wait to the end of the world :)
631 */
632 do {
633 ret = write(relay_cmd_pipe[1], relay_cmd,
634 sizeof(struct relay_command));
635 } while (ret < 0 && errno == EINTR);
636 free(relay_cmd);
637 if (ret < 0) {
638 PERROR("write cmd pipe");
639 goto error;
640 }
641 } while (node != NULL);
642
643 /* Futex wait on queue. Blocking call on futex() */
644 futex_nto1_wait(&relay_cmd_queue.futex);
645 }
646
647 error:
648 DBG("Dispatch thread dying");
649 stop_threads();
650 return NULL;
651 }
652
653 /*
654 * Return the realpath(3) of the path even if the last directory token does not
655 * exist. For example, with /tmp/test1/test2, if test2/ does not exist but the
656 * /tmp/test1 does, the real path is returned. In normal time, realpath(3)
657 * fails if the end point directory does not exist.
658 */
659 static
660 char *expand_full_path(const char *path)
661 {
662 const char *end_path = path;
663 char *next, *cut_path, *expanded_path;
664
665 /* Find last token delimited by '/' */
666 while ((next = strpbrk(end_path + 1, "/"))) {
667 end_path = next;
668 }
669
670 /* Cut last token from original path */
671 cut_path = strndup(path, end_path - path);
672
673 expanded_path = malloc(PATH_MAX);
674 if (expanded_path == NULL) {
675 goto error;
676 }
677
678 expanded_path = realpath((char *)cut_path, expanded_path);
679 if (expanded_path == NULL) {
680 switch (errno) {
681 case ENOENT:
682 ERR("%s: No such file or directory", cut_path);
683 break;
684 default:
685 PERROR("realpath");
686 break;
687 }
688 goto error;
689 }
690
691 /* Add end part to expanded path */
692 strcat(expanded_path, end_path);
693
694 free(cut_path);
695 return expanded_path;
696
697 error:
698 free(cut_path);
699 return NULL;
700 }
701
702
703 /*
704 * config_get_default_path
705 *
706 * Returns the HOME directory path. Caller MUST NOT free(3) the return pointer.
707 */
708 static
709 char *config_get_default_path(void)
710 {
711 return getenv("HOME");
712 }
713
714 /*
715 * Create recursively directory using the FULL path.
716 */
717 static
718 int mkdir_recursive(char *path, mode_t mode)
719 {
720 char *p, tmp[PATH_MAX];
721 struct stat statbuf;
722 size_t len;
723 int ret;
724
725 ret = snprintf(tmp, sizeof(tmp), "%s", path);
726 if (ret < 0) {
727 PERROR("snprintf mkdir");
728 goto error;
729 }
730
731 len = ret;
732 if (tmp[len - 1] == '/') {
733 tmp[len - 1] = 0;
734 }
735
736 for (p = tmp + 1; *p; p++) {
737 if (*p == '/') {
738 *p = 0;
739 if (tmp[strlen(tmp) - 1] == '.' &&
740 tmp[strlen(tmp) - 2] == '.' &&
741 tmp[strlen(tmp) - 3] == '/') {
742 ERR("Using '/../' is not permitted in the trace path (%s)",
743 tmp);
744 ret = -1;
745 goto error;
746 }
747 ret = stat(tmp, &statbuf);
748 if (ret < 0) {
749 ret = mkdir(tmp, mode);
750 if (ret < 0) {
751 if (!(errno == EEXIST)) {
752 PERROR("mkdir recursive");
753 ret = -errno;
754 goto error;
755 }
756 }
757 }
758 *p = '/';
759 }
760 }
761
762 ret = mkdir(tmp, mode);
763 if (ret < 0) {
764 if (!(errno == EEXIST)) {
765 PERROR("mkdir recursive last piece");
766 ret = -errno;
767 } else {
768 ret = 0;
769 }
770 }
771
772 error:
773 return ret;
774 }
775
776 /*
777 * create_output_path: create the output trace directory
778 */
779 static
780 char *create_output_path(char *path_name)
781 {
782 int ret = 0;
783 char *alloc_path = NULL;
784 char *traces_path = NULL;
785 char *full_path = NULL;
786
787 /* Auto output path */
788 if (opt_output_path == NULL) {
789 alloc_path = strdup(config_get_default_path());
790 if (alloc_path == NULL) {
791 ERR("Home path not found.\n \
792 Please specify an output path using -o, --output PATH");
793 ret = -1;
794 goto exit;
795 }
796
797 ret = asprintf(&traces_path, "%s/" DEFAULT_TRACE_DIR_NAME
798 "/%s", alloc_path, path_name);
799 if (ret < 0) {
800 PERROR("asprintf trace dir name");
801 goto exit;
802 }
803 } else {
804 full_path = expand_full_path(opt_output_path);
805 ret = asprintf(&traces_path, "%s/%s", full_path, path_name);
806 if (ret < 0) {
807 PERROR("asprintf trace dir name");
808 goto exit;
809 }
810 }
811 free(alloc_path);
812 free(full_path);
813
814 exit:
815 return traces_path;
816 }
817
818 /*
819 * relay_delete_session: Free all memory associated with a session and
820 * close all the FDs
821 */
822 static
823 void relay_delete_session(struct relay_command *cmd, struct lttng_ht *streams_ht)
824 {
825 struct lttng_ht_iter iter;
826 struct lttng_ht_node_ulong *node;
827 struct relay_stream *stream;
828 int ret;
829
830 if (!cmd->session)
831 return;
832
833 DBG("Relay deleting session %lu", cmd->session->id);
834 free(cmd->session->sock);
835
836 cds_lfht_for_each_entry(streams_ht->ht, &iter.iter, node, node) {
837 node = lttng_ht_iter_get_node_ulong(&iter);
838 if (node) {
839 stream = caa_container_of(node,
840 struct relay_stream, stream_n);
841 if (stream->session == cmd->session) {
842 close(stream->fd);
843 ret = lttng_ht_del(streams_ht, &iter);
844 assert(!ret);
845 free(stream);
846 }
847 }
848 }
849 }
850
851 /*
852 * relay_add_stream: allocate a new stream for a session
853 */
854 static
855 int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr,
856 struct relay_command *cmd, struct lttng_ht *streams_ht)
857 {
858 struct relay_session *session = cmd->session;
859 struct lttcomm_relayd_add_stream stream_info;
860 struct relay_stream *stream = NULL;
861 struct lttcomm_relayd_status_stream reply;
862 char *path = NULL, *root_path = NULL;
863 int ret, send_ret;
864
865 if (!session || session->version_check_done == 0) {
866 ERR("Trying to add a stream before version check");
867 ret = -1;
868 goto end_no_session;
869 }
870
871 /* FIXME : use data_size for something ? */
872 ret = cmd->sock->ops->recvmsg(cmd->sock, &stream_info,
873 sizeof(struct lttcomm_relayd_add_stream), MSG_WAITALL);
874 if (ret < sizeof(struct lttcomm_relayd_add_stream)) {
875 ERR("Relay didn't receive valid add_stream struct size : %d", ret);
876 ret = -1;
877 goto end_no_session;
878 }
879 stream = zmalloc(sizeof(struct relay_stream));
880 if (stream == NULL) {
881 PERROR("relay stream zmalloc");
882 ret = -1;
883 goto end_no_session;
884 }
885
886 stream->stream_handle = ++last_relay_stream_id;
887 stream->seq = 0;
888 stream->session = session;
889
890 root_path = create_output_path(stream_info.pathname);
891 if (!root_path) {
892 ret = -1;
893 goto end;
894 }
895 ret = mkdir_recursive(root_path, S_IRWXU | S_IRWXG);
896 if (ret < 0) {
897 free(root_path);
898 ERR("relay creating output directory");
899 goto end;
900 }
901
902 ret = asprintf(&path, "%s/%s", root_path, stream_info.channel_name);
903 if (ret < 0) {
904 PERROR("asprintf stream path");
905 goto end;
906 }
907
908 ret = open(path, O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO);
909 if (ret < 0) {
910 PERROR("Relay creating trace file");
911 goto end;
912 }
913
914 stream->fd = ret;
915 DBG("Tracefile %s created", path);
916
917 lttng_ht_node_init_ulong(&stream->stream_n,
918 (unsigned long) stream->stream_handle);
919 lttng_ht_add_unique_ulong(streams_ht,
920 &stream->stream_n);
921
922 DBG("Relay new stream added %s", stream_info.channel_name);
923
924 end:
925 free(path);
926 free(root_path);
927 /* send the session id to the client or a negative return code on error */
928 if (ret < 0) {
929 reply.ret_code = htobe32(LTTCOMM_ERR);
930 } else {
931 reply.ret_code = htobe32(LTTCOMM_OK);
932 }
933 reply.handle = htobe64(stream->stream_handle);
934 send_ret = cmd->sock->ops->sendmsg(cmd->sock, &reply,
935 sizeof(struct lttcomm_relayd_status_stream), 0);
936 if (send_ret < 0) {
937 ERR("Relay sending stream id");
938 }
939
940 end_no_session:
941 return ret;
942 }
943
944 /*
945 * relay_unknown_command: send -1 if received unknown command
946 */
947 static
948 void relay_unknown_command(struct relay_command *cmd)
949 {
950 struct lttcomm_relayd_generic_reply reply;
951 int ret;
952
953 reply.ret_code = htobe32(LTTCOMM_ERR);
954 ret = cmd->sock->ops->sendmsg(cmd->sock, &reply,
955 sizeof(struct lttcomm_relayd_generic_reply), 0);
956 if (ret < 0) {
957 ERR("Relay sending unknown command");
958 }
959 }
960
961 /*
962 * relay_start: send an acknowledgment to the client to tell if we are
963 * ready to receive data. We are ready if a session is established.
964 */
965 static
966 int relay_start(struct lttcomm_relayd_hdr *recv_hdr,
967 struct relay_command *cmd)
968 {
969 int ret = htobe32(LTTCOMM_OK);
970 struct lttcomm_relayd_generic_reply reply;
971 struct relay_session *session = cmd->session;
972
973 if (!session) {
974 DBG("Trying to start the streaming without a session established");
975 ret = htobe32(LTTCOMM_ERR);
976 }
977
978 reply.ret_code = ret;
979 ret = cmd->sock->ops->sendmsg(cmd->sock, &reply,
980 sizeof(struct lttcomm_relayd_generic_reply), 0);
981 if (ret < 0) {
982 ERR("Relay sending start ack");
983 }
984
985 return ret;
986 }
987
988 /*
989 * Get stream from stream id.
990 */
991 static
992 struct relay_stream *relay_stream_from_stream_id(uint64_t stream_id,
993 struct lttng_ht *streams_ht)
994 {
995 struct lttng_ht_node_ulong *node;
996 struct lttng_ht_iter iter;
997 struct relay_stream *ret;
998
999 lttng_ht_lookup(streams_ht,
1000 (void *)((unsigned long) stream_id),
1001 &iter);
1002 node = lttng_ht_iter_get_node_ulong(&iter);
1003 if (node == NULL) {
1004 DBG("Relay stream %lu not found", stream_id);
1005 ret = NULL;
1006 goto end;
1007 }
1008
1009 ret = caa_container_of(node, struct relay_stream, stream_n);
1010
1011 end:
1012 return ret;
1013 }
1014
1015 /*
1016 * relay_recv_metadata: receive the metada for the session.
1017 */
1018 static
1019 int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr,
1020 struct relay_command *cmd, struct lttng_ht *streams_ht)
1021 {
1022 int ret = htobe32(LTTCOMM_OK);
1023 struct relay_session *session = cmd->session;
1024 struct lttcomm_relayd_metadata_payload *metadata_struct;
1025 struct relay_stream *metadata_stream;
1026 uint64_t data_size, payload_size;
1027
1028 if (!session) {
1029 ERR("Metadata sent before version check");
1030 ret = -1;
1031 goto end;
1032 }
1033
1034 data_size = be64toh(recv_hdr->data_size);
1035 payload_size = data_size - sizeof(uint64_t);
1036 if (data_buffer_size < data_size) {
1037 data_buffer = realloc(data_buffer, data_size);
1038 if (!data_buffer) {
1039 ERR("Allocating data buffer");
1040 ret = -1;
1041 goto end;
1042 }
1043 data_buffer_size = data_size;
1044 }
1045 memset(data_buffer, 0, data_size);
1046 DBG2("Relay receiving metadata, waiting for %lu bytes", data_size);
1047 ret = cmd->sock->ops->recvmsg(cmd->sock, data_buffer, data_size, MSG_WAITALL);
1048 if (ret < 0 || ret != data_size) {
1049 ret = -1;
1050 ERR("Relay didn't receive the whole metadata");
1051 goto end;
1052 }
1053 metadata_struct = (struct lttcomm_relayd_metadata_payload *) data_buffer;
1054 metadata_stream = relay_stream_from_stream_id(
1055 be64toh(metadata_struct->stream_id), streams_ht);
1056 if (!metadata_stream) {
1057 ret = -1;
1058 goto end;
1059 }
1060
1061 do {
1062 ret = write(metadata_stream->fd, metadata_struct->payload,
1063 payload_size);
1064 } while (ret < 0 && errno == EINTR);
1065 if (ret < (payload_size)) {
1066 ERR("Relay error writing metadata on file");
1067 ret = -1;
1068 goto end;
1069 }
1070 DBG2("Relay metadata written");
1071
1072 end:
1073 return ret;
1074 }
1075
1076 /*
1077 * relay_send_version: send relayd version number
1078 */
1079 static
1080 int relay_send_version(struct lttcomm_relayd_hdr *recv_hdr,
1081 struct relay_command *cmd)
1082 {
1083 int ret = htobe32(LTTCOMM_OK);
1084 struct lttcomm_relayd_version reply;
1085 struct relay_session *session = NULL;
1086
1087 if (cmd->session == NULL) {
1088 session = zmalloc(sizeof(struct relay_session));
1089 if (session == NULL) {
1090 PERROR("relay session zmalloc");
1091 ret = -1;
1092 goto end;
1093 }
1094 session->id = ++last_relay_session_id;
1095 DBG("Created session %lu", session->id);
1096 cmd->session = session;
1097 }
1098 session->version_check_done = 1;
1099
1100 sscanf(VERSION, "%u.%u", &reply.major, &reply.minor);
1101 reply.major = htobe32(reply.major);
1102 reply.minor = htobe32(reply.minor);
1103 ret = cmd->sock->ops->sendmsg(cmd->sock, &reply,
1104 sizeof(struct lttcomm_relayd_version), 0);
1105 if (ret < 0) {
1106 ERR("Relay sending version");
1107 }
1108 DBG("Version check done");
1109
1110 end:
1111 return ret;
1112 }
1113
1114 /*
1115 * relay_process_control: Process the commands received on the control socket
1116 */
1117 static
1118 int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr,
1119 struct relay_command *cmd, struct lttng_ht *streams_ht)
1120 {
1121 int ret = 0;
1122
1123 switch (be32toh(recv_hdr->cmd)) {
1124 /*
1125 case RELAYD_CREATE_SESSION:
1126 ret = relay_create_session(recv_hdr, cmd);
1127 break;
1128 */
1129 case RELAYD_ADD_STREAM:
1130 ret = relay_add_stream(recv_hdr, cmd, streams_ht);
1131 break;
1132 case RELAYD_START_DATA:
1133 ret = relay_start(recv_hdr, cmd);
1134 break;
1135 case RELAYD_SEND_METADATA:
1136 ret = relay_recv_metadata(recv_hdr, cmd, streams_ht);
1137 break;
1138 case RELAYD_VERSION:
1139 ret = relay_send_version(recv_hdr, cmd);
1140 break;
1141 case RELAYD_UPDATE_SYNC_INFO:
1142 default:
1143 ERR("Received unknown command (%u)", be32toh(recv_hdr->cmd));
1144 relay_unknown_command(cmd);
1145 ret = -1;
1146 goto end;
1147 }
1148
1149 end:
1150 return ret;
1151 }
1152
1153 /*
1154 * relay_process_data: Process the data received on the data socket
1155 */
1156 static
1157 int relay_process_data(struct relay_command *cmd, struct lttng_ht *streams_ht)
1158 {
1159 int ret = 0;
1160 struct relay_stream *stream;
1161 struct lttcomm_relayd_data_hdr data_hdr;
1162 uint64_t stream_id;
1163 uint32_t data_size;
1164
1165 ret = cmd->sock->ops->recvmsg(cmd->sock, &data_hdr,
1166 sizeof(struct lttcomm_relayd_data_hdr), MSG_WAITALL);
1167 if (ret <= 0) {
1168 ERR("Connections seems to be closed");
1169 ret = -1;
1170 goto end;
1171 }
1172
1173 stream_id = be64toh(data_hdr.stream_id);
1174 stream = relay_stream_from_stream_id(stream_id, streams_ht);
1175 if (!stream) {
1176 ret = -1;
1177 goto end;
1178 }
1179
1180 data_size = be32toh(data_hdr.data_size);
1181 if (data_buffer_size < data_size) {
1182 data_buffer = realloc(data_buffer, data_size);
1183 if (!data_buffer) {
1184 ERR("Allocating data buffer");
1185 ret = -1;
1186 goto end;
1187 }
1188 data_buffer_size = data_size;
1189 }
1190 memset(data_buffer, 0, data_size);
1191
1192 ret = cmd->sock->ops->recvmsg(cmd->sock, data_buffer, data_size, MSG_WAITALL);
1193 if (ret <= 0) {
1194 ret = -1;
1195 goto end;
1196 }
1197
1198 do {
1199 ret = write(stream->fd, data_buffer, data_size);
1200 } while (ret < 0 && errno == EINTR);
1201 if (ret < data_size) {
1202 ERR("Relay error writing data to file");
1203 ret = -1;
1204 goto end;
1205 }
1206 DBG2("Relay wrote %d bytes to tracefile for stream id %lu", ret, stream->stream_handle);
1207
1208 end:
1209 return ret;
1210 }
1211
1212 static
1213 void relay_cleanup_connection(struct lttng_ht *relay_connections_ht, struct lttng_poll_event *events,
1214 struct lttng_ht *streams_ht, int pollfd, struct lttng_ht_iter *iter)
1215 {
1216 int ret;
1217
1218 ret = lttng_ht_del(relay_connections_ht, iter);
1219 assert(!ret);
1220 lttng_poll_del(events, pollfd);
1221
1222 ret = close(pollfd);
1223 if (ret < 0) {
1224 ERR("Closing pollfd %d", pollfd);
1225 }
1226 }
1227
1228 static
1229 int relay_add_connection(int fd, struct lttng_poll_event *events,
1230 struct lttng_ht *relay_connections_ht)
1231 {
1232 int ret;
1233 struct relay_command *relay_connection;
1234
1235 relay_connection = zmalloc(sizeof(struct relay_command));
1236 if (relay_connection == NULL) {
1237 PERROR("Relay command zmalloc");
1238 ret = -1;
1239 goto end;
1240 }
1241 ret = read(fd, relay_connection, sizeof(struct relay_command));
1242 if (ret < 0 || ret < sizeof(relay_connection)) {
1243 PERROR("read relay cmd pipe");
1244 ret = -1;
1245 goto end;
1246 }
1247
1248 lttng_ht_node_init_ulong(&relay_connection->sock_n,
1249 (unsigned long) relay_connection->sock->fd);
1250 lttng_ht_add_unique_ulong(relay_connections_ht,
1251 &relay_connection->sock_n);
1252 ret = lttng_poll_add(events,
1253 relay_connection->sock->fd,
1254 LPOLLIN | LPOLLRDHUP);
1255
1256 end:
1257 return ret;
1258 }
1259
1260 /*
1261 * This thread does the actual work
1262 */
1263 static
1264 void *relay_thread_worker(void *data)
1265 {
1266 int i, ret, pollfd;
1267 uint32_t revents, nb_fd;
1268 struct relay_command *relay_connection;
1269 struct lttng_poll_event events;
1270 struct lttng_ht *relay_connections_ht;
1271 struct lttng_ht_node_ulong *node;
1272 struct lttng_ht_iter iter;
1273 struct lttng_ht *streams_ht;
1274 struct lttcomm_relayd_hdr recv_hdr;
1275
1276 DBG("[thread] Relay worker started");
1277
1278 /* table of connections indexed on socket */
1279 relay_connections_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
1280
1281 /* tables of streams indexed by stream ID */
1282 streams_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
1283
1284 ret = create_thread_poll_set(&events, 2);
1285 if (ret < 0) {
1286 goto error_poll_create;
1287 }
1288
1289 ret = lttng_poll_add(&events, relay_cmd_pipe[0], LPOLLIN | LPOLLRDHUP);
1290 if (ret < 0) {
1291 goto error;
1292 }
1293
1294 while (1) {
1295 /* Zeroed the events structure */
1296 lttng_poll_reset(&events);
1297
1298 nb_fd = LTTNG_POLL_GETNB(&events);
1299
1300 /* Infinite blocking call, waiting for transmission */
1301 restart:
1302 ret = lttng_poll_wait(&events, -1);
1303 if (ret < 0) {
1304 /*
1305 * Restart interrupted system call.
1306 */
1307 if (errno == EINTR) {
1308 goto restart;
1309 }
1310 goto error;
1311 }
1312
1313 for (i = 0; i < nb_fd; i++) {
1314 /* Fetch once the poll data */
1315 revents = LTTNG_POLL_GETEV(&events, i);
1316 pollfd = LTTNG_POLL_GETFD(&events, i);
1317
1318 /* Thread quit pipe has been closed. Killing thread. */
1319 ret = check_thread_quit_pipe(pollfd, revents);
1320 if (ret) {
1321 goto error;
1322 }
1323
1324 /* Inspect the relay cmd pipe for new connection */
1325 if (pollfd == relay_cmd_pipe[0]) {
1326 if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
1327 ERR("Relay pipe error");
1328 goto error;
1329 } else if (revents & LPOLLIN) {
1330 DBG("Relay command received");
1331 ret = relay_add_connection(relay_cmd_pipe[0],
1332 &events, relay_connections_ht);
1333 if (ret < 0) {
1334 goto error;
1335 }
1336 }
1337 } else if (revents > 0) {
1338 lttng_ht_lookup(relay_connections_ht,
1339 (void *)((unsigned long) pollfd),
1340 &iter);
1341 node = lttng_ht_iter_get_node_ulong(&iter);
1342 if (node == NULL) {
1343 DBG2("Relay sock %d not found", pollfd);
1344 goto error;
1345 }
1346 relay_connection = caa_container_of(node,
1347 struct relay_command, sock_n);
1348
1349 if (revents & (LPOLLERR)) {
1350 ERR("POLL ERROR");
1351 relay_cleanup_connection(relay_connections_ht,
1352 &events, streams_ht, pollfd, &iter);
1353 free(relay_connection);
1354 } else if (revents & (LPOLLHUP | LPOLLRDHUP)) {
1355 DBG("Socket %d hung up", pollfd);
1356 relay_cleanup_connection(relay_connections_ht,
1357 &events, streams_ht, pollfd, &iter);
1358 if (relay_connection->type == RELAY_CONTROL) {
1359 relay_delete_session(relay_connection, streams_ht);
1360 }
1361 free(relay_connection);
1362 } else if (revents & LPOLLIN) {
1363 /* control socket */
1364 if (relay_connection->type == RELAY_CONTROL) {
1365 ret = relay_connection->sock->ops->recvmsg(
1366 relay_connection->sock, &recv_hdr,
1367 sizeof(struct lttcomm_relayd_hdr), MSG_WAITALL);
1368 /* connection closed */
1369 if (ret <= 0) {
1370 relay_cleanup_connection(relay_connections_ht,
1371 &events, streams_ht, pollfd, &iter);
1372 relay_delete_session(relay_connection, streams_ht);
1373 free(relay_connection);
1374 DBG("Control connection closed with %d", pollfd);
1375 } else {
1376 if (relay_connection->session) {
1377 DBG2("Relay worker receiving data for session : %lu",
1378 relay_connection->session->id);
1379 }
1380 ret = relay_process_control(&recv_hdr,
1381 relay_connection,
1382 streams_ht);
1383 /*
1384 * there was an error in processing a control
1385 * command: clear the session
1386 * */
1387 if (ret < 0) {
1388 relay_cleanup_connection(relay_connections_ht,
1389 &events, streams_ht, pollfd, &iter);
1390 free(relay_connection);
1391 DBG("Connection closed with %d", pollfd);
1392 }
1393 }
1394 /* data socket */
1395 } else if (relay_connection->type == RELAY_DATA) {
1396 ret = relay_process_data(relay_connection, streams_ht);
1397 /* connection closed */
1398 if (ret < 0) {
1399 relay_cleanup_connection(relay_connections_ht,
1400 &events, streams_ht, pollfd, &iter);
1401 relay_delete_session(relay_connection, streams_ht);
1402 DBG("Data connection closed with %d", pollfd);
1403 }
1404 }
1405 }
1406 }
1407 }
1408 }
1409
1410 error:
1411 lttng_poll_clean(&events);
1412
1413 /* empty the hash table and free the memory */
1414 cds_lfht_for_each_entry(relay_connections_ht->ht, &iter.iter, node, node) {
1415 node = lttng_ht_iter_get_node_ulong(&iter);
1416 if (node) {
1417 relay_connection = caa_container_of(node,
1418 struct relay_command, sock_n);
1419 free(relay_connection);
1420 }
1421 ret = lttng_ht_del(relay_connections_ht, &iter);
1422 assert(!ret);
1423 }
1424 error_poll_create:
1425 free(data_buffer);
1426 lttng_ht_destroy(relay_connections_ht);
1427 DBG("Worker thread cleanup complete");
1428 stop_threads();
1429 return NULL;
1430 }
1431
1432 /*
1433 * Create the relay command pipe to wake thread_manage_apps.
1434 * Closed in cleanup().
1435 */
1436 static int create_relay_cmd_pipe(void)
1437 {
1438 int ret;
1439
1440 ret = utils_create_pipe_cloexec(relay_cmd_pipe);
1441
1442 return ret;
1443 }
1444
1445 /*
1446 * main
1447 */
1448 int main(int argc, char **argv)
1449 {
1450 int ret = 0;
1451 void *status;
1452
1453 /* Create thread quit pipe */
1454 if ((ret = init_thread_quit_pipe()) < 0) {
1455 goto error;
1456 }
1457
1458 /* Parse arguments */
1459 progname = argv[0];
1460 if ((ret = parse_args(argc, argv) < 0)) {
1461 goto exit;
1462 }
1463
1464 if ((ret = set_signal_handler()) < 0) {
1465 goto exit;
1466 }
1467
1468 /* Daemonize */
1469 if (opt_daemon) {
1470 ret = daemon(0, 0);
1471 if (ret < 0) {
1472 PERROR("daemon");
1473 goto exit;
1474 }
1475 }
1476
1477 /* Check if daemon is UID = 0 */
1478 is_root = !getuid();
1479
1480 if (!is_root) {
1481 if (control_uri->port < 1024 || data_uri->port < 1024) {
1482 ERR("Need to be root to use ports < 1024");
1483 ret = -1;
1484 goto exit;
1485 }
1486 }
1487
1488 /* Setup the thread apps communication pipe. */
1489 if ((ret = create_relay_cmd_pipe()) < 0) {
1490 goto exit;
1491 }
1492
1493 /* Init relay command queue. */
1494 cds_wfq_init(&relay_cmd_queue.queue);
1495
1496 /* Set up max poll set size */
1497 lttng_poll_set_max_size();
1498
1499 /* Setup the dispatcher thread */
1500 ret = pthread_create(&dispatcher_thread, NULL,
1501 relay_thread_dispatcher, (void *) NULL);
1502 if (ret != 0) {
1503 PERROR("pthread_create dispatcher");
1504 goto exit_dispatcher;
1505 }
1506
1507 /* Setup the worker thread */
1508 ret = pthread_create(&worker_thread, NULL,
1509 relay_thread_worker, (void *) NULL);
1510 if (ret != 0) {
1511 PERROR("pthread_create worker");
1512 goto exit_worker;
1513 }
1514
1515 /* Setup the listener thread */
1516 ret = pthread_create(&listener_thread, NULL,
1517 relay_thread_listener, (void *) NULL);
1518 if (ret != 0) {
1519 PERROR("pthread_create listener");
1520 goto exit_listener;
1521 }
1522
1523 exit_listener:
1524 ret = pthread_join(listener_thread, &status);
1525 if (ret != 0) {
1526 PERROR("pthread_join");
1527 goto error; /* join error, exit without cleanup */
1528 }
1529
1530 exit_worker:
1531 ret = pthread_join(worker_thread, &status);
1532 if (ret != 0) {
1533 PERROR("pthread_join");
1534 goto error; /* join error, exit without cleanup */
1535 }
1536
1537 exit_dispatcher:
1538 ret = pthread_join(dispatcher_thread, &status);
1539 if (ret != 0) {
1540 PERROR("pthread_join");
1541 goto error; /* join error, exit without cleanup */
1542 }
1543
1544 exit:
1545 cleanup();
1546 if (!ret) {
1547 exit(EXIT_SUCCESS);
1548 }
1549
1550 error:
1551 exit(EXIT_FAILURE);
1552 }
This page took 0.098793 seconds and 5 git commands to generate.