Fix: relayd pipes should be closed on exit/error
[lttng-tools.git] / src / bin / lttng-relayd / main.c
1 /*
2 * Copyright (C) 2012 - Julien Desfossez <jdesfossez@efficios.com>
3 * David Goulet <dgoulet@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License, version 2 only,
7 * as published by the Free Software Foundation.
8 *
9 * This program is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
12 * more details.
13 *
14 * You should have received a copy of the GNU General Public License along
15 * with this program; if not, write to the Free Software Foundation, Inc.,
16 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17 */
18
19 #define _GNU_SOURCE
20 #include <getopt.h>
21 #include <grp.h>
22 #include <limits.h>
23 #include <pthread.h>
24 #include <signal.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <sys/mman.h>
29 #include <sys/mount.h>
30 #include <sys/resource.h>
31 #include <sys/socket.h>
32 #include <sys/stat.h>
33 #include <sys/types.h>
34 #include <sys/wait.h>
35 #include <urcu/futex.h>
36 #include <urcu/uatomic.h>
37 #include <unistd.h>
38 #include <fcntl.h>
39 #include <config.h>
40
41 #include <lttng/lttng.h>
42 #include <common/common.h>
43 #include <common/compat/poll.h>
44 #include <common/compat/socket.h>
45 #include <common/defaults.h>
46 #include <common/futex.h>
47 #include <common/sessiond-comm/sessiond-comm.h>
48 #include <common/sessiond-comm/inet.h>
49 #include <common/hashtable/hashtable.h>
50 #include <common/sessiond-comm/relayd.h>
51 #include <common/uri.h>
52 #include <common/utils.h>
53
54 #include "lttng-relayd.h"
55
56 /* command line options */
57 static int opt_daemon;
58 static char *opt_output_path;
59 static struct lttng_uri *control_uri = NULL;
60 static struct lttng_uri *data_uri = NULL;
61
62 const char *progname;
63 static int is_root; /* Set to 1 if the daemon is running as root */
64
65 /*
66 * Quit pipe for all threads. This permits a single cancellation point
67 * for all threads when receiving an event on the pipe.
68 */
69 static int thread_quit_pipe[2] = { -1, -1 };
70
71 /*
72 * This pipe is used to inform the worker thread that a command is queued and
73 * ready to be processed.
74 */
75 static int relay_cmd_pipe[2] = { -1, -1 };
76
77 static int dispatch_thread_exit;
78
79 static pthread_t listener_thread;
80 static pthread_t dispatcher_thread;
81 static pthread_t worker_thread;
82
83 static uint64_t last_relay_stream_id = 0;
84 static uint64_t last_relay_session_id = 0;
85
86 /*
87 * Relay command queue.
88 *
89 * The relay_thread_listener and relay_thread_dispatcher communicate with this
90 * queue.
91 */
92 static struct relay_cmd_queue relay_cmd_queue;
93
94 /* buffer allocated at startup, used to store the trace data */
95 static char *data_buffer = NULL;
96 static unsigned int data_buffer_size = 0;
97
98 /*
99 * usage function on stderr
100 */
101 static
102 void usage(void)
103 {
104 fprintf(stderr, "Usage: %s OPTIONS\n\nOptions:\n", progname);
105 fprintf(stderr, " -h, --help Display this usage.\n");
106 fprintf(stderr, " -d, --daemonize Start as a daemon.\n");
107 fprintf(stderr, " -C, --control-port Control port listening (URI)\n");
108 fprintf(stderr, " -D, --data-port Data port listening (URI)\n");
109 fprintf(stderr, " -o, --output Output path for traces (PATH)\n");
110 fprintf(stderr, " -v, --verbose Verbose mode. Activate DBG() macro.\n");
111 }
112
113 static
114 int parse_args(int argc, char **argv)
115 {
116 int c;
117 int ret = 0;
118 char *default_address;
119
120 static struct option long_options[] = {
121 { "control-port", 1, 0, 'C' },
122 { "data-port", 1, 0, 'D' },
123 { "daemonize", 0, 0, 'd' },
124 { "help", 0, 0, 'h' },
125 { "output", 1, 0, 'o' },
126 { "verbose", 0, 0, 'v' },
127 { NULL, 0, 0, 0 }
128 };
129
130 while (1) {
131 int option_index = 0;
132 c = getopt_long(argc, argv, "dhv" "C:D:o:",
133 long_options, &option_index);
134 if (c == -1) {
135 break;
136 }
137
138 switch (c) {
139 case 0:
140 fprintf(stderr, "option %s", long_options[option_index].name);
141 if (optarg) {
142 fprintf(stderr, " with arg %s\n", optarg);
143 }
144 break;
145 case 'C':
146 ret = uri_parse(optarg, &control_uri);
147 if (ret < 0) {
148 ERR("Invalid control URI specified");
149 goto exit;
150 }
151 if (control_uri->port == 0) {
152 control_uri->port = DEFAULT_NETWORK_CONTROL_PORT;
153 }
154 break;
155 case 'D':
156 ret = uri_parse(optarg, &data_uri);
157 if (ret < 0) {
158 ERR("Invalid data URI specified");
159 goto exit;
160 }
161 if (data_uri->port == 0) {
162 data_uri->port = DEFAULT_NETWORK_DATA_PORT;
163 }
164 break;
165 case 'd':
166 opt_daemon = 1;
167 break;
168 case 'h':
169 usage();
170 exit(EXIT_FAILURE);
171 case 'o':
172 ret = asprintf(&opt_output_path, "%s", optarg);
173 if (ret < 0) {
174 PERROR("asprintf opt_output_path");
175 goto exit;
176 }
177 break;
178 case 'v':
179 /* Verbose level can increase using multiple -v */
180 lttng_opt_verbose += 1;
181 break;
182 default:
183 /* Unknown option or other error.
184 * Error is printed by getopt, just return */
185 ret = -1;
186 goto exit;
187 }
188 }
189
190 /* assign default values */
191 if (control_uri == NULL) {
192 ret = asprintf(&default_address, "tcp://0.0.0.0:%d",
193 DEFAULT_NETWORK_CONTROL_PORT);
194 if (ret < 0) {
195 PERROR("asprintf default data address");
196 goto exit;
197 }
198
199 ret = uri_parse(default_address, &control_uri);
200 free(default_address);
201 if (ret < 0) {
202 ERR("Invalid control URI specified");
203 goto exit;
204 }
205 }
206 if (data_uri == NULL) {
207 ret = asprintf(&default_address, "tcp://0.0.0.0:%d",
208 DEFAULT_NETWORK_DATA_PORT);
209 if (ret < 0) {
210 PERROR("asprintf default data address");
211 goto exit;
212 }
213
214 ret = uri_parse(default_address, &data_uri);
215 free(default_address);
216 if (ret < 0) {
217 ERR("Invalid data URI specified");
218 goto exit;
219 }
220 }
221
222 exit:
223 return ret;
224 }
225
226 /*
227 * Cleanup the daemon
228 */
229 static
230 void cleanup(void)
231 {
232 DBG("Cleaning up");
233
234 /* Close thread quit pipes */
235 utils_close_pipe(thread_quit_pipe);
236
237 /* Close relay cmd pipes */
238 utils_close_pipe(relay_cmd_pipe);
239 }
240
241 /*
242 * Write to writable pipe used to notify a thread.
243 */
244 static
245 int notify_thread_pipe(int wpipe)
246 {
247 int ret;
248
249 ret = write(wpipe, "!", 1);
250 if (ret < 0) {
251 PERROR("write poll pipe");
252 }
253
254 return ret;
255 }
256
257 /*
258 * Stop all threads by closing the thread quit pipe.
259 */
260 static
261 void stop_threads(void)
262 {
263 int ret;
264
265 /* Stopping all threads */
266 DBG("Terminating all threads");
267 ret = notify_thread_pipe(thread_quit_pipe[1]);
268 if (ret < 0) {
269 ERR("write error on thread quit pipe");
270 }
271
272 /* Dispatch thread */
273 dispatch_thread_exit = 1;
274 futex_nto1_wake(&relay_cmd_queue.futex);
275 }
276
277 /*
278 * Signal handler for the daemon
279 *
280 * Simply stop all worker threads, leaving main() return gracefully after
281 * joining all threads and calling cleanup().
282 */
283 static
284 void sighandler(int sig)
285 {
286 switch (sig) {
287 case SIGPIPE:
288 DBG("SIGPIPE caught");
289 return;
290 case SIGINT:
291 DBG("SIGINT caught");
292 stop_threads();
293 break;
294 case SIGTERM:
295 DBG("SIGTERM caught");
296 stop_threads();
297 break;
298 default:
299 break;
300 }
301 }
302
303 /*
304 * Setup signal handler for :
305 * SIGINT, SIGTERM, SIGPIPE
306 */
307 static
308 int set_signal_handler(void)
309 {
310 int ret = 0;
311 struct sigaction sa;
312 sigset_t sigset;
313
314 if ((ret = sigemptyset(&sigset)) < 0) {
315 PERROR("sigemptyset");
316 return ret;
317 }
318
319 sa.sa_handler = sighandler;
320 sa.sa_mask = sigset;
321 sa.sa_flags = 0;
322 if ((ret = sigaction(SIGTERM, &sa, NULL)) < 0) {
323 PERROR("sigaction");
324 return ret;
325 }
326
327 if ((ret = sigaction(SIGINT, &sa, NULL)) < 0) {
328 PERROR("sigaction");
329 return ret;
330 }
331
332 if ((ret = sigaction(SIGPIPE, &sa, NULL)) < 0) {
333 PERROR("sigaction");
334 return ret;
335 }
336
337 DBG("Signal handler set for SIGTERM, SIGPIPE and SIGINT");
338
339 return ret;
340 }
341
342 /*
343 * Init thread quit pipe.
344 *
345 * Return -1 on error or 0 if all pipes are created.
346 */
347 static
348 int init_thread_quit_pipe(void)
349 {
350 int ret;
351
352 ret = utils_create_pipe_cloexec(thread_quit_pipe);
353
354 return ret;
355 }
356
357 /*
358 * Create a poll set with O_CLOEXEC and add the thread quit pipe to the set.
359 */
360 static
361 int create_thread_poll_set(struct lttng_poll_event *events, int size)
362 {
363 int ret;
364
365 if (events == NULL || size == 0) {
366 ret = -1;
367 goto error;
368 }
369
370 ret = lttng_poll_create(events, size, LTTNG_CLOEXEC);
371 if (ret < 0) {
372 goto error;
373 }
374
375 /* Add quit pipe */
376 ret = lttng_poll_add(events, thread_quit_pipe[0], LPOLLIN);
377 if (ret < 0) {
378 goto error;
379 }
380
381 return 0;
382
383 error:
384 return ret;
385 }
386
387 /*
388 * Check if the thread quit pipe was triggered.
389 *
390 * Return 1 if it was triggered else 0;
391 */
392 static
393 int check_thread_quit_pipe(int fd, uint32_t events)
394 {
395 if (fd == thread_quit_pipe[0] && (events & LPOLLIN)) {
396 return 1;
397 }
398
399 return 0;
400 }
401
402 /*
403 * Create and init socket from uri.
404 */
405 static
406 struct lttcomm_sock *relay_init_sock(struct lttng_uri *uri)
407 {
408 int ret;
409 struct lttcomm_sock *sock = NULL;
410
411 sock = lttcomm_alloc_sock_from_uri(uri);
412 if (sock == NULL) {
413 ERR("Allocating socket");
414 goto error;
415 }
416
417 ret = lttcomm_create_sock(sock);
418 if (ret < 0) {
419 goto error;
420 }
421 DBG("Listening on sock %d", sock->fd);
422
423 ret = sock->ops->bind(sock);
424 if (ret < 0) {
425 goto error;
426 }
427
428 ret = sock->ops->listen(sock, -1);
429 if (ret < 0) {
430 goto error;
431
432 }
433
434 return sock;
435
436 error:
437 if (sock) {
438 lttcomm_destroy_sock(sock);
439 }
440 return NULL;
441 }
442
443 /*
444 * This thread manages the listening for new connections on the network
445 */
446 static
447 void *relay_thread_listener(void *data)
448 {
449 int i, ret, pollfd;
450 int val = 1;
451 uint32_t revents, nb_fd;
452 struct lttng_poll_event events;
453 struct lttcomm_sock *control_sock, *data_sock;
454
455 /*
456 * Get allocated in this thread, enqueued to a global queue, dequeued and
457 * freed in the worker thread.
458 */
459 struct relay_command *relay_cmd = NULL;
460
461 DBG("[thread] Relay listener started");
462
463 control_sock = relay_init_sock(control_uri);
464 if (!control_sock) {
465 goto error_sock;
466 }
467
468 data_sock = relay_init_sock(data_uri);
469 if (!data_sock) {
470 goto error_sock;
471 }
472
473 /*
474 * Pass 3 as size here for the thread quit pipe, control and data socket.
475 */
476 ret = create_thread_poll_set(&events, 3);
477 if (ret < 0) {
478 goto error_create_poll;
479 }
480
481 /* Add the control socket */
482 ret = lttng_poll_add(&events, control_sock->fd, LPOLLIN | LPOLLRDHUP);
483 if (ret < 0) {
484 goto error_poll_add;
485 }
486
487 /* Add the data socket */
488 ret = lttng_poll_add(&events, data_sock->fd, LPOLLIN | LPOLLRDHUP);
489 if (ret < 0) {
490 goto error_poll_add;
491 }
492
493 while (1) {
494 DBG("Listener accepting connections");
495
496 nb_fd = LTTNG_POLL_GETNB(&events);
497
498 restart:
499 ret = lttng_poll_wait(&events, -1);
500 if (ret < 0) {
501 /*
502 * Restart interrupted system call.
503 */
504 if (errno == EINTR) {
505 goto restart;
506 }
507 goto error;
508 }
509
510 DBG("Relay new connection received");
511 for (i = 0; i < nb_fd; i++) {
512 /* Fetch once the poll data */
513 revents = LTTNG_POLL_GETEV(&events, i);
514 pollfd = LTTNG_POLL_GETFD(&events, i);
515
516 /* Thread quit pipe has been closed. Killing thread. */
517 ret = check_thread_quit_pipe(pollfd, revents);
518 if (ret) {
519 goto error;
520 }
521
522 if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
523 ERR("socket poll error");
524 goto error;
525 } else if (revents & LPOLLIN) {
526 struct lttcomm_sock *newsock = NULL;
527
528 relay_cmd = zmalloc(sizeof(struct relay_command));
529 if (relay_cmd == NULL) {
530 PERROR("relay command zmalloc");
531 goto error;
532 }
533
534 if (pollfd == data_sock->fd) {
535 newsock = data_sock->ops->accept(data_sock);
536 if (newsock < 0) {
537 PERROR("accepting data sock");
538 goto error;
539 }
540 relay_cmd->type = RELAY_DATA;
541 DBG("Relay data connection accepted, socket %d", newsock->fd);
542 } else if (pollfd == control_sock->fd) {
543 newsock = control_sock->ops->accept(control_sock);
544 if (newsock < 0) {
545 PERROR("accepting control sock");
546 goto error;
547 }
548 relay_cmd->type = RELAY_CONTROL;
549 DBG("Relay control connection accepted, socket %d", newsock->fd);
550 }
551 ret = setsockopt(newsock->fd, SOL_SOCKET, SO_REUSEADDR,
552 &val, sizeof(int));
553 if (ret < 0) {
554 PERROR("setsockopt inet");
555 goto error;
556 }
557 relay_cmd->sock = newsock;
558 /*
559 * Lock free enqueue the request.
560 */
561 cds_wfq_enqueue(&relay_cmd_queue.queue, &relay_cmd->node);
562
563 /*
564 * Wake the dispatch queue futex. Implicit memory
565 * barrier with the exchange in cds_wfq_enqueue.
566 */
567 futex_nto1_wake(&relay_cmd_queue.futex);
568 }
569 }
570 }
571
572 error:
573 error_poll_add:
574 lttng_poll_clean(&events);
575 error_create_poll:
576 if (control_sock->fd >= 0) {
577 ret = control_sock->ops->close(control_sock);
578 if (ret) {
579 PERROR("close");
580 }
581 lttcomm_destroy_sock(control_sock);
582 }
583 if (data_sock->fd >= 0) {
584 ret = data_sock->ops->close(data_sock);
585 if (ret) {
586 PERROR("close");
587 }
588 lttcomm_destroy_sock(data_sock);
589 }
590
591 DBG("Relay listener thread cleanup complete");
592 stop_threads();
593 error_sock:
594 return NULL;
595 }
596
597 /*
598 * This thread manages the dispatching of the requests to worker threads
599 */
600 static
601 void *relay_thread_dispatcher(void *data)
602 {
603 int ret;
604 struct cds_wfq_node *node;
605 struct relay_command *relay_cmd = NULL;
606
607 DBG("[thread] Relay dispatcher started");
608
609 while (!dispatch_thread_exit) {
610 /* Atomically prepare the queue futex */
611 futex_nto1_prepare(&relay_cmd_queue.futex);
612
613 do {
614 /* Dequeue commands */
615 node = cds_wfq_dequeue_blocking(&relay_cmd_queue.queue);
616 if (node == NULL) {
617 DBG("Woken up but nothing in the relay command queue");
618 /* Continue thread execution */
619 break;
620 }
621
622 relay_cmd = caa_container_of(node, struct relay_command, node);
623 DBG("Dispatching request waiting on sock %d", relay_cmd->sock->fd);
624
625 /*
626 * Inform worker thread of the new request. This
627 * call is blocking so we can be assured that the data will be read
628 * at some point in time or wait to the end of the world :)
629 */
630 ret = write(relay_cmd_pipe[1], relay_cmd,
631 sizeof(struct relay_command));
632 free(relay_cmd);
633 if (ret < 0) {
634 PERROR("write cmd pipe");
635 goto error;
636 }
637 } while (node != NULL);
638
639 /* Futex wait on queue. Blocking call on futex() */
640 futex_nto1_wait(&relay_cmd_queue.futex);
641 }
642
643 error:
644 DBG("Dispatch thread dying");
645 stop_threads();
646 return NULL;
647 }
648
649 /*
650 * Return the realpath(3) of the path even if the last directory token does not
651 * exist. For example, with /tmp/test1/test2, if test2/ does not exist but the
652 * /tmp/test1 does, the real path is returned. In normal time, realpath(3)
653 * fails if the end point directory does not exist.
654 */
655 static
656 char *expand_full_path(const char *path)
657 {
658 const char *end_path = path;
659 char *next, *cut_path, *expanded_path;
660
661 /* Find last token delimited by '/' */
662 while ((next = strpbrk(end_path + 1, "/"))) {
663 end_path = next;
664 }
665
666 /* Cut last token from original path */
667 cut_path = strndup(path, end_path - path);
668
669 expanded_path = malloc(PATH_MAX);
670 if (expanded_path == NULL) {
671 goto error;
672 }
673
674 expanded_path = realpath((char *)cut_path, expanded_path);
675 if (expanded_path == NULL) {
676 switch (errno) {
677 case ENOENT:
678 ERR("%s: No such file or directory", cut_path);
679 break;
680 default:
681 PERROR("realpath");
682 break;
683 }
684 goto error;
685 }
686
687 /* Add end part to expanded path */
688 strcat(expanded_path, end_path);
689
690 free(cut_path);
691 return expanded_path;
692
693 error:
694 free(cut_path);
695 return NULL;
696 }
697
698
699 /*
700 * config_get_default_path
701 *
702 * Returns the HOME directory path. Caller MUST NOT free(3) the return pointer.
703 */
704 static
705 char *config_get_default_path(void)
706 {
707 return getenv("HOME");
708 }
709
710 /*
711 * Create recursively directory using the FULL path.
712 */
713 static
714 int mkdir_recursive(char *path, mode_t mode)
715 {
716 char *p, tmp[PATH_MAX];
717 struct stat statbuf;
718 size_t len;
719 int ret;
720
721 ret = snprintf(tmp, sizeof(tmp), "%s", path);
722 if (ret < 0) {
723 PERROR("snprintf mkdir");
724 goto error;
725 }
726
727 len = ret;
728 if (tmp[len - 1] == '/') {
729 tmp[len - 1] = 0;
730 }
731
732 for (p = tmp + 1; *p; p++) {
733 if (*p == '/') {
734 *p = 0;
735 if (tmp[strlen(tmp) - 1] == '.' &&
736 tmp[strlen(tmp) - 2] == '.' &&
737 tmp[strlen(tmp) - 3] == '/') {
738 ERR("Using '/../' is not permitted in the trace path (%s)",
739 tmp);
740 ret = -1;
741 goto error;
742 }
743 ret = stat(tmp, &statbuf);
744 if (ret < 0) {
745 ret = mkdir(tmp, mode);
746 if (ret < 0) {
747 if (!(errno == EEXIST)) {
748 PERROR("mkdir recursive");
749 ret = -errno;
750 goto error;
751 }
752 }
753 }
754 *p = '/';
755 }
756 }
757
758 ret = mkdir(tmp, mode);
759 if (ret < 0) {
760 if (!(errno == EEXIST)) {
761 PERROR("mkdir recursive last piece");
762 ret = -errno;
763 } else {
764 ret = 0;
765 }
766 }
767
768 error:
769 return ret;
770 }
771
772 /*
773 * create_output_path: create the output trace directory
774 */
775 static
776 char *create_output_path(char *path_name)
777 {
778 int ret = 0;
779 char *alloc_path = NULL;
780 char *traces_path = NULL;
781 char *full_path = NULL;
782
783 /* Auto output path */
784 if (opt_output_path == NULL) {
785 alloc_path = strdup(config_get_default_path());
786 if (alloc_path == NULL) {
787 ERR("Home path not found.\n \
788 Please specify an output path using -o, --output PATH");
789 ret = -1;
790 goto exit;
791 }
792
793 ret = asprintf(&traces_path, "%s/" DEFAULT_TRACE_DIR_NAME
794 "/%s", alloc_path, path_name);
795 if (ret < 0) {
796 PERROR("asprintf trace dir name");
797 goto exit;
798 }
799 } else {
800 full_path = expand_full_path(opt_output_path);
801 ret = asprintf(&traces_path, "%s/%s", full_path, path_name);
802 if (ret < 0) {
803 PERROR("asprintf trace dir name");
804 goto exit;
805 }
806 }
807 free(alloc_path);
808 free(full_path);
809
810 exit:
811 return traces_path;
812 }
813
814 /*
815 * relay_delete_session: Free all memory associated with a session and
816 * close all the FDs
817 */
818 static
819 void relay_delete_session(struct relay_command *cmd, struct lttng_ht *streams_ht)
820 {
821 struct lttng_ht_iter iter;
822 struct lttng_ht_node_ulong *node;
823 struct relay_stream *stream;
824 int ret;
825
826 if (!cmd->session)
827 return;
828
829 DBG("Relay deleting session %lu", cmd->session->id);
830 free(cmd->session->sock);
831
832 cds_lfht_for_each_entry(streams_ht->ht, &iter.iter, node, node) {
833 node = lttng_ht_iter_get_node_ulong(&iter);
834 if (node) {
835 stream = caa_container_of(node,
836 struct relay_stream, stream_n);
837 if (stream->session == cmd->session) {
838 close(stream->fd);
839 ret = lttng_ht_del(streams_ht, &iter);
840 assert(!ret);
841 free(stream);
842 }
843 }
844 }
845 }
846
847 /*
848 * relay_add_stream: allocate a new stream for a session
849 */
850 static
851 int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr,
852 struct relay_command *cmd, struct lttng_ht *streams_ht)
853 {
854 struct relay_session *session = cmd->session;
855 struct lttcomm_relayd_add_stream stream_info;
856 struct relay_stream *stream = NULL;
857 struct lttcomm_relayd_status_stream reply;
858 char *path = NULL, *root_path = NULL;
859 int ret, send_ret;
860
861 if (!session || session->version_check_done == 0) {
862 ERR("Trying to add a stream before version check");
863 ret = -1;
864 goto end_no_session;
865 }
866
867 /* FIXME : use data_size for something ? */
868 ret = cmd->sock->ops->recvmsg(cmd->sock, &stream_info,
869 sizeof(struct lttcomm_relayd_add_stream), MSG_WAITALL);
870 if (ret < sizeof(struct lttcomm_relayd_add_stream)) {
871 ERR("Relay didn't receive valid add_stream struct size : %d", ret);
872 ret = -1;
873 goto end_no_session;
874 }
875 stream = zmalloc(sizeof(struct relay_stream));
876 if (stream == NULL) {
877 PERROR("relay stream zmalloc");
878 ret = -1;
879 goto end_no_session;
880 }
881
882 stream->stream_handle = ++last_relay_stream_id;
883 stream->seq = 0;
884 stream->session = session;
885
886 root_path = create_output_path(stream_info.pathname);
887 if (!root_path) {
888 ret = -1;
889 goto end;
890 }
891 ret = mkdir_recursive(root_path, S_IRWXU | S_IRWXG);
892 if (ret < 0) {
893 free(root_path);
894 ERR("relay creating output directory");
895 goto end;
896 }
897
898 ret = asprintf(&path, "%s/%s", root_path, stream_info.channel_name);
899 if (ret < 0) {
900 PERROR("asprintf stream path");
901 goto end;
902 }
903
904 ret = open(path, O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO);
905 if (ret < 0) {
906 PERROR("Relay creating trace file");
907 goto end;
908 }
909
910 stream->fd = ret;
911 DBG("Tracefile %s created", path);
912
913 lttng_ht_node_init_ulong(&stream->stream_n,
914 (unsigned long) stream->stream_handle);
915 lttng_ht_add_unique_ulong(streams_ht,
916 &stream->stream_n);
917
918 DBG("Relay new stream added %s", stream_info.channel_name);
919
920 end:
921 free(path);
922 free(root_path);
923 /* send the session id to the client or a negative return code on error */
924 if (ret < 0) {
925 reply.ret_code = htobe32(LTTCOMM_ERR);
926 } else {
927 reply.ret_code = htobe32(LTTCOMM_OK);
928 }
929 reply.handle = htobe64(stream->stream_handle);
930 send_ret = cmd->sock->ops->sendmsg(cmd->sock, &reply,
931 sizeof(struct lttcomm_relayd_status_stream), 0);
932 if (send_ret < 0) {
933 ERR("Relay sending stream id");
934 }
935
936 end_no_session:
937 return ret;
938 }
939
940 /*
941 * relay_unknown_command: send -1 if received unknown command
942 */
943 static
944 void relay_unknown_command(struct relay_command *cmd)
945 {
946 struct lttcomm_relayd_generic_reply reply;
947 int ret;
948
949 reply.ret_code = htobe32(LTTCOMM_ERR);
950 ret = cmd->sock->ops->sendmsg(cmd->sock, &reply,
951 sizeof(struct lttcomm_relayd_generic_reply), 0);
952 if (ret < 0) {
953 ERR("Relay sending unknown command");
954 }
955 }
956
957 /*
958 * relay_start: send an acknowledgment to the client to tell if we are
959 * ready to receive data. We are ready if a session is established.
960 */
961 static
962 int relay_start(struct lttcomm_relayd_hdr *recv_hdr,
963 struct relay_command *cmd)
964 {
965 int ret = htobe32(LTTCOMM_OK);
966 struct lttcomm_relayd_generic_reply reply;
967 struct relay_session *session = cmd->session;
968
969 if (!session) {
970 DBG("Trying to start the streaming without a session established");
971 ret = htobe32(LTTCOMM_ERR);
972 }
973
974 reply.ret_code = ret;
975 ret = cmd->sock->ops->sendmsg(cmd->sock, &reply,
976 sizeof(struct lttcomm_relayd_generic_reply), 0);
977 if (ret < 0) {
978 ERR("Relay sending start ack");
979 }
980
981 return ret;
982 }
983
984 /*
985 * Get stream from stream id.
986 */
987 static
988 struct relay_stream *relay_stream_from_stream_id(uint64_t stream_id,
989 struct lttng_ht *streams_ht)
990 {
991 struct lttng_ht_node_ulong *node;
992 struct lttng_ht_iter iter;
993 struct relay_stream *ret;
994
995 lttng_ht_lookup(streams_ht,
996 (void *)((unsigned long) stream_id),
997 &iter);
998 node = lttng_ht_iter_get_node_ulong(&iter);
999 if (node == NULL) {
1000 DBG("Relay stream %lu not found", stream_id);
1001 ret = NULL;
1002 goto end;
1003 }
1004
1005 ret = caa_container_of(node, struct relay_stream, stream_n);
1006
1007 end:
1008 return ret;
1009 }
1010
1011 /*
1012 * relay_recv_metadata: receive the metada for the session.
1013 */
1014 static
1015 int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr,
1016 struct relay_command *cmd, struct lttng_ht *streams_ht)
1017 {
1018 int ret = htobe32(LTTCOMM_OK);
1019 struct relay_session *session = cmd->session;
1020 struct lttcomm_relayd_metadata_payload *metadata_struct;
1021 struct relay_stream *metadata_stream;
1022 uint64_t data_size, payload_size;
1023
1024 if (!session) {
1025 ERR("Metadata sent before version check");
1026 ret = -1;
1027 goto end;
1028 }
1029
1030 data_size = be64toh(recv_hdr->data_size);
1031 payload_size = data_size - sizeof(uint64_t);
1032 if (data_buffer_size < data_size) {
1033 data_buffer = realloc(data_buffer, data_size);
1034 if (!data_buffer) {
1035 ERR("Allocating data buffer");
1036 ret = -1;
1037 goto end;
1038 }
1039 data_buffer_size = data_size;
1040 }
1041 memset(data_buffer, 0, data_size);
1042 DBG2("Relay receiving metadata, waiting for %lu bytes", data_size);
1043 ret = cmd->sock->ops->recvmsg(cmd->sock, data_buffer, data_size, MSG_WAITALL);
1044 if (ret < 0 || ret != data_size) {
1045 ret = -1;
1046 ERR("Relay didn't receive the whole metadata");
1047 goto end;
1048 }
1049 metadata_struct = (struct lttcomm_relayd_metadata_payload *) data_buffer;
1050 metadata_stream = relay_stream_from_stream_id(
1051 be64toh(metadata_struct->stream_id), streams_ht);
1052 if (!metadata_stream) {
1053 ret = -1;
1054 goto end;
1055 }
1056
1057 ret = write(metadata_stream->fd, metadata_struct->payload,
1058 payload_size);
1059 if (ret < (payload_size)) {
1060 ERR("Relay error writing metadata on file");
1061 ret = -1;
1062 goto end;
1063 }
1064 DBG2("Relay metadata written");
1065
1066 end:
1067 return ret;
1068 }
1069
1070 /*
1071 * relay_send_version: send relayd version number
1072 */
1073 static
1074 int relay_send_version(struct lttcomm_relayd_hdr *recv_hdr,
1075 struct relay_command *cmd)
1076 {
1077 int ret = htobe32(LTTCOMM_OK);
1078 struct lttcomm_relayd_version reply;
1079 struct relay_session *session = NULL;
1080
1081 if (cmd->session == NULL) {
1082 session = zmalloc(sizeof(struct relay_session));
1083 if (session == NULL) {
1084 PERROR("relay session zmalloc");
1085 ret = -1;
1086 goto end;
1087 }
1088 session->id = ++last_relay_session_id;
1089 DBG("Created session %lu", session->id);
1090 cmd->session = session;
1091 }
1092 session->version_check_done = 1;
1093
1094 sscanf(VERSION, "%u.%u", &reply.major, &reply.minor);
1095 reply.major = htobe32(reply.major);
1096 reply.minor = htobe32(reply.minor);
1097 ret = cmd->sock->ops->sendmsg(cmd->sock, &reply,
1098 sizeof(struct lttcomm_relayd_version), 0);
1099 if (ret < 0) {
1100 ERR("Relay sending version");
1101 }
1102 DBG("Version check done");
1103
1104 end:
1105 return ret;
1106 }
1107
1108 /*
1109 * relay_process_control: Process the commands received on the control socket
1110 */
1111 static
1112 int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr,
1113 struct relay_command *cmd, struct lttng_ht *streams_ht)
1114 {
1115 int ret = 0;
1116
1117 switch (be32toh(recv_hdr->cmd)) {
1118 /*
1119 case RELAYD_CREATE_SESSION:
1120 ret = relay_create_session(recv_hdr, cmd);
1121 break;
1122 */
1123 case RELAYD_ADD_STREAM:
1124 ret = relay_add_stream(recv_hdr, cmd, streams_ht);
1125 break;
1126 case RELAYD_START_DATA:
1127 ret = relay_start(recv_hdr, cmd);
1128 break;
1129 case RELAYD_SEND_METADATA:
1130 ret = relay_recv_metadata(recv_hdr, cmd, streams_ht);
1131 break;
1132 case RELAYD_VERSION:
1133 ret = relay_send_version(recv_hdr, cmd);
1134 break;
1135 case RELAYD_UPDATE_SYNC_INFO:
1136 default:
1137 ERR("Received unknown command (%u)", be32toh(recv_hdr->cmd));
1138 relay_unknown_command(cmd);
1139 ret = -1;
1140 goto end;
1141 }
1142
1143 end:
1144 return ret;
1145 }
1146
1147 /*
1148 * relay_process_data: Process the data received on the data socket
1149 */
1150 static
1151 int relay_process_data(struct relay_command *cmd, struct lttng_ht *streams_ht)
1152 {
1153 int ret = 0;
1154 struct relay_stream *stream;
1155 struct lttcomm_relayd_data_hdr data_hdr;
1156 uint64_t stream_id;
1157 uint32_t data_size;
1158
1159 ret = cmd->sock->ops->recvmsg(cmd->sock, &data_hdr,
1160 sizeof(struct lttcomm_relayd_data_hdr), MSG_WAITALL);
1161 if (ret <= 0) {
1162 ERR("Connections seems to be closed");
1163 ret = -1;
1164 goto end;
1165 }
1166
1167 stream_id = be64toh(data_hdr.stream_id);
1168 stream = relay_stream_from_stream_id(stream_id, streams_ht);
1169 if (!stream) {
1170 ret = -1;
1171 goto end;
1172 }
1173
1174 data_size = be32toh(data_hdr.data_size);
1175 if (data_buffer_size < data_size) {
1176 data_buffer = realloc(data_buffer, data_size);
1177 if (!data_buffer) {
1178 ERR("Allocating data buffer");
1179 ret = -1;
1180 goto end;
1181 }
1182 data_buffer_size = data_size;
1183 }
1184 memset(data_buffer, 0, data_size);
1185
1186 ret = cmd->sock->ops->recvmsg(cmd->sock, data_buffer, data_size, MSG_WAITALL);
1187 if (ret <= 0) {
1188 ret = -1;
1189 goto end;
1190 }
1191
1192 ret = write(stream->fd, data_buffer, data_size);
1193 if (ret < data_size) {
1194 ERR("Relay error writing data to file");
1195 ret = -1;
1196 goto end;
1197 }
1198 DBG2("Relay wrote %d bytes to tracefile for stream id %lu", ret, stream->stream_handle);
1199
1200 end:
1201 return ret;
1202 }
1203
1204 static
1205 void relay_cleanup_connection(struct lttng_ht *relay_connections_ht, struct lttng_poll_event *events,
1206 struct lttng_ht *streams_ht, int pollfd, struct lttng_ht_iter *iter)
1207 {
1208 int ret;
1209
1210 ret = lttng_ht_del(relay_connections_ht, iter);
1211 assert(!ret);
1212 lttng_poll_del(events, pollfd);
1213
1214 ret = close(pollfd);
1215 if (ret < 0) {
1216 ERR("Closing pollfd %d", pollfd);
1217 }
1218 }
1219
1220 static
1221 int relay_add_connection(int fd, struct lttng_poll_event *events,
1222 struct lttng_ht *relay_connections_ht)
1223 {
1224 int ret;
1225 struct relay_command *relay_connection;
1226
1227 relay_connection = zmalloc(sizeof(struct relay_command));
1228 if (relay_connection == NULL) {
1229 PERROR("Relay command zmalloc");
1230 ret = -1;
1231 goto end;
1232 }
1233 ret = read(fd, relay_connection, sizeof(struct relay_command));
1234 if (ret < 0 || ret < sizeof(relay_connection)) {
1235 PERROR("read relay cmd pipe");
1236 ret = -1;
1237 goto end;
1238 }
1239
1240 lttng_ht_node_init_ulong(&relay_connection->sock_n,
1241 (unsigned long) relay_connection->sock->fd);
1242 lttng_ht_add_unique_ulong(relay_connections_ht,
1243 &relay_connection->sock_n);
1244 ret = lttng_poll_add(events,
1245 relay_connection->sock->fd,
1246 LPOLLIN | LPOLLRDHUP);
1247
1248 end:
1249 return ret;
1250 }
1251
1252 /*
1253 * This thread does the actual work
1254 */
1255 static
1256 void *relay_thread_worker(void *data)
1257 {
1258 int i, ret, pollfd;
1259 uint32_t revents, nb_fd;
1260 struct relay_command *relay_connection;
1261 struct lttng_poll_event events;
1262 struct lttng_ht *relay_connections_ht;
1263 struct lttng_ht_node_ulong *node;
1264 struct lttng_ht_iter iter;
1265 struct lttng_ht *streams_ht;
1266 struct lttcomm_relayd_hdr recv_hdr;
1267
1268 DBG("[thread] Relay worker started");
1269
1270 /* table of connections indexed on socket */
1271 relay_connections_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
1272
1273 /* tables of streams indexed by stream ID */
1274 streams_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
1275
1276 ret = create_thread_poll_set(&events, 2);
1277 if (ret < 0) {
1278 goto error_poll_create;
1279 }
1280
1281 ret = lttng_poll_add(&events, relay_cmd_pipe[0], LPOLLIN | LPOLLRDHUP);
1282 if (ret < 0) {
1283 goto error;
1284 }
1285
1286 while (1) {
1287 /* Zeroed the events structure */
1288 lttng_poll_reset(&events);
1289
1290 nb_fd = LTTNG_POLL_GETNB(&events);
1291
1292 /* Infinite blocking call, waiting for transmission */
1293 restart:
1294 ret = lttng_poll_wait(&events, -1);
1295 if (ret < 0) {
1296 /*
1297 * Restart interrupted system call.
1298 */
1299 if (errno == EINTR) {
1300 goto restart;
1301 }
1302 goto error;
1303 }
1304
1305 for (i = 0; i < nb_fd; i++) {
1306 /* Fetch once the poll data */
1307 revents = LTTNG_POLL_GETEV(&events, i);
1308 pollfd = LTTNG_POLL_GETFD(&events, i);
1309
1310 /* Thread quit pipe has been closed. Killing thread. */
1311 ret = check_thread_quit_pipe(pollfd, revents);
1312 if (ret) {
1313 goto error;
1314 }
1315
1316 /* Inspect the relay cmd pipe for new connection */
1317 if (pollfd == relay_cmd_pipe[0]) {
1318 if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
1319 ERR("Relay pipe error");
1320 goto error;
1321 } else if (revents & LPOLLIN) {
1322 DBG("Relay command received");
1323 ret = relay_add_connection(relay_cmd_pipe[0],
1324 &events, relay_connections_ht);
1325 if (ret < 0) {
1326 goto error;
1327 }
1328 }
1329 } else if (revents > 0) {
1330 lttng_ht_lookup(relay_connections_ht,
1331 (void *)((unsigned long) pollfd),
1332 &iter);
1333 node = lttng_ht_iter_get_node_ulong(&iter);
1334 if (node == NULL) {
1335 DBG2("Relay sock %d not found", pollfd);
1336 goto error;
1337 }
1338 relay_connection = caa_container_of(node,
1339 struct relay_command, sock_n);
1340
1341 if (revents & (LPOLLERR)) {
1342 ERR("POLL ERROR");
1343 relay_cleanup_connection(relay_connections_ht,
1344 &events, streams_ht, pollfd, &iter);
1345 free(relay_connection);
1346 } else if (revents & (LPOLLHUP | LPOLLRDHUP)) {
1347 DBG("Socket %d hung up", pollfd);
1348 relay_cleanup_connection(relay_connections_ht,
1349 &events, streams_ht, pollfd, &iter);
1350 if (relay_connection->type == RELAY_CONTROL) {
1351 relay_delete_session(relay_connection, streams_ht);
1352 }
1353 free(relay_connection);
1354 } else if (revents & LPOLLIN) {
1355 /* control socket */
1356 if (relay_connection->type == RELAY_CONTROL) {
1357 ret = relay_connection->sock->ops->recvmsg(
1358 relay_connection->sock, &recv_hdr,
1359 sizeof(struct lttcomm_relayd_hdr), MSG_WAITALL);
1360 /* connection closed */
1361 if (ret <= 0) {
1362 relay_cleanup_connection(relay_connections_ht,
1363 &events, streams_ht, pollfd, &iter);
1364 relay_delete_session(relay_connection, streams_ht);
1365 free(relay_connection);
1366 DBG("Control connection closed with %d", pollfd);
1367 } else {
1368 if (relay_connection->session) {
1369 DBG2("Relay worker receiving data for session : %lu",
1370 relay_connection->session->id);
1371 }
1372 ret = relay_process_control(&recv_hdr,
1373 relay_connection,
1374 streams_ht);
1375 /*
1376 * there was an error in processing a control
1377 * command: clear the session
1378 * */
1379 if (ret < 0) {
1380 relay_cleanup_connection(relay_connections_ht,
1381 &events, streams_ht, pollfd, &iter);
1382 free(relay_connection);
1383 DBG("Connection closed with %d", pollfd);
1384 }
1385 }
1386 /* data socket */
1387 } else if (relay_connection->type == RELAY_DATA) {
1388 ret = relay_process_data(relay_connection, streams_ht);
1389 /* connection closed */
1390 if (ret < 0) {
1391 relay_cleanup_connection(relay_connections_ht,
1392 &events, streams_ht, pollfd, &iter);
1393 relay_delete_session(relay_connection, streams_ht);
1394 DBG("Data connection closed with %d", pollfd);
1395 }
1396 }
1397 }
1398 }
1399 }
1400 }
1401
1402 error:
1403 lttng_poll_clean(&events);
1404
1405 /* empty the hash table and free the memory */
1406 cds_lfht_for_each_entry(relay_connections_ht->ht, &iter.iter, node, node) {
1407 node = lttng_ht_iter_get_node_ulong(&iter);
1408 if (node) {
1409 relay_connection = caa_container_of(node,
1410 struct relay_command, sock_n);
1411 free(relay_connection);
1412 }
1413 ret = lttng_ht_del(relay_connections_ht, &iter);
1414 assert(!ret);
1415 }
1416 error_poll_create:
1417 free(data_buffer);
1418 lttng_ht_destroy(relay_connections_ht);
1419 DBG("Worker thread cleanup complete");
1420 stop_threads();
1421 return NULL;
1422 }
1423
1424 /*
1425 * Create the relay command pipe to wake thread_manage_apps.
1426 * Closed in cleanup().
1427 */
1428 static int create_relay_cmd_pipe(void)
1429 {
1430 int ret;
1431
1432 ret = utils_create_pipe_cloexec(relay_cmd_pipe);
1433
1434 return ret;
1435 }
1436
1437 /*
1438 * main
1439 */
1440 int main(int argc, char **argv)
1441 {
1442 int ret = 0;
1443 void *status;
1444
1445 /* Create thread quit pipe */
1446 if ((ret = init_thread_quit_pipe()) < 0) {
1447 goto error;
1448 }
1449
1450 /* Parse arguments */
1451 progname = argv[0];
1452 if ((ret = parse_args(argc, argv) < 0)) {
1453 goto exit;
1454 }
1455
1456 if ((ret = set_signal_handler()) < 0) {
1457 goto exit;
1458 }
1459
1460 /* Daemonize */
1461 if (opt_daemon) {
1462 ret = daemon(0, 0);
1463 if (ret < 0) {
1464 PERROR("daemon");
1465 goto exit;
1466 }
1467 }
1468
1469 /* Check if daemon is UID = 0 */
1470 is_root = !getuid();
1471
1472 if (!is_root) {
1473 if (control_uri->port < 1024 || data_uri->port < 1024) {
1474 ERR("Need to be root to use ports < 1024");
1475 ret = -1;
1476 goto exit;
1477 }
1478 }
1479
1480 /* Setup the thread apps communication pipe. */
1481 if ((ret = create_relay_cmd_pipe()) < 0) {
1482 goto exit;
1483 }
1484
1485 /* Init relay command queue. */
1486 cds_wfq_init(&relay_cmd_queue.queue);
1487
1488 /* Set up max poll set size */
1489 lttng_poll_set_max_size();
1490
1491 /* Setup the dispatcher thread */
1492 ret = pthread_create(&dispatcher_thread, NULL,
1493 relay_thread_dispatcher, (void *) NULL);
1494 if (ret != 0) {
1495 PERROR("pthread_create dispatcher");
1496 goto exit_dispatcher;
1497 }
1498
1499 /* Setup the worker thread */
1500 ret = pthread_create(&worker_thread, NULL,
1501 relay_thread_worker, (void *) NULL);
1502 if (ret != 0) {
1503 PERROR("pthread_create worker");
1504 goto exit_worker;
1505 }
1506
1507 /* Setup the listener thread */
1508 ret = pthread_create(&listener_thread, NULL,
1509 relay_thread_listener, (void *) NULL);
1510 if (ret != 0) {
1511 PERROR("pthread_create listener");
1512 goto exit_listener;
1513 }
1514
1515 exit_listener:
1516 ret = pthread_join(listener_thread, &status);
1517 if (ret != 0) {
1518 PERROR("pthread_join");
1519 goto error; /* join error, exit without cleanup */
1520 }
1521
1522 exit_worker:
1523 ret = pthread_join(worker_thread, &status);
1524 if (ret != 0) {
1525 PERROR("pthread_join");
1526 goto error; /* join error, exit without cleanup */
1527 }
1528
1529 exit_dispatcher:
1530 ret = pthread_join(dispatcher_thread, &status);
1531 if (ret != 0) {
1532 PERROR("pthread_join");
1533 goto error; /* join error, exit without cleanup */
1534 }
1535
1536 exit:
1537 cleanup();
1538 if (!ret) {
1539 exit(EXIT_SUCCESS);
1540 }
1541
1542 error:
1543 exit(EXIT_FAILURE);
1544 }
This page took 0.100829 seconds and 4 git commands to generate.