2 * Copyright (C) 2012 - Julien Desfossez <jdesfossez@efficios.com>
3 * David Goulet <dgoulet@efficios.com>
4 * 2013 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
5 * 2015 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License, version 2 only,
9 * as published by the Free Software Foundation.
11 * This program is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
16 * You should have received a copy of the GNU General Public License along
17 * with this program; if not, write to the Free Software Foundation, Inc.,
18 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
31 #include <sys/mount.h>
32 #include <sys/resource.h>
33 #include <sys/socket.h>
35 #include <sys/types.h>
38 #include <urcu/futex.h>
39 #include <urcu/uatomic.h>
40 #include <urcu/rculist.h>
45 #include <lttng/lttng.h>
46 #include <common/common.h>
47 #include <common/compat/poll.h>
48 #include <common/compat/socket.h>
49 #include <common/compat/endian.h>
50 #include <common/compat/getenv.h>
51 #include <common/defaults.h>
52 #include <common/daemonize.h>
53 #include <common/futex.h>
54 #include <common/sessiond-comm/sessiond-comm.h>
55 #include <common/sessiond-comm/inet.h>
56 #include <common/sessiond-comm/relayd.h>
57 #include <common/uri.h>
58 #include <common/utils.h>
59 #include <common/align.h>
60 #include <common/config/session-config.h>
61 #include <common/dynamic-buffer.h>
62 #include <common/buffer-view.h>
63 #include <common/string-utils/format.h>
67 #include "ctf-trace.h"
70 #include "lttng-relayd.h"
72 #include "health-relayd.h"
73 #include "testpoint.h"
74 #include "viewer-stream.h"
77 #include "connection.h"
78 #include "tracefile-array.h"
79 #include "tcp_keep_alive.h"
80 #include "sessiond-trace-chunks.h"
82 static const char *help_msg
=
83 #ifdef LTTNG_EMBED_HELP
84 #include <lttng-relayd.8.h>
90 enum relay_connection_status
{
91 RELAY_CONNECTION_STATUS_OK
,
92 /* An error occurred while processing an event on the connection. */
93 RELAY_CONNECTION_STATUS_ERROR
,
94 /* Connection closed/shutdown cleanly. */
95 RELAY_CONNECTION_STATUS_CLOSED
,
98 /* command line options */
99 char *opt_output_path
;
100 static int opt_daemon
, opt_background
, opt_print_version
;
103 * We need to wait for listener and live listener threads, as well as
104 * health check thread, before being ready to signal readiness.
106 #define NR_LTTNG_RELAY_READY 3
107 static int lttng_relay_ready
= NR_LTTNG_RELAY_READY
;
109 /* Size of receive buffer. */
110 #define RECV_DATA_BUFFER_SIZE 65536
112 static int recv_child_signal
; /* Set to 1 when a SIGUSR1 signal is received. */
113 static pid_t child_ppid
; /* Internal parent PID use with daemonize. */
115 static struct lttng_uri
*control_uri
;
116 static struct lttng_uri
*data_uri
;
117 static struct lttng_uri
*live_uri
;
119 const char *progname
;
121 const char *tracing_group_name
= DEFAULT_TRACING_GROUP
;
122 static int tracing_group_name_override
;
124 const char * const config_section_name
= "relayd";
127 * Quit pipe for all threads. This permits a single cancellation point
128 * for all threads when receiving an event on the pipe.
130 int thread_quit_pipe
[2] = { -1, -1 };
133 * This pipe is used to inform the worker thread that a command is queued and
134 * ready to be processed.
136 static int relay_conn_pipe
[2] = { -1, -1 };
138 /* Shared between threads */
139 static int dispatch_thread_exit
;
141 static pthread_t listener_thread
;
142 static pthread_t dispatcher_thread
;
143 static pthread_t worker_thread
;
144 static pthread_t health_thread
;
147 * last_relay_stream_id_lock protects last_relay_stream_id increment
148 * atomicity on 32-bit architectures.
150 static pthread_mutex_t last_relay_stream_id_lock
= PTHREAD_MUTEX_INITIALIZER
;
151 static uint64_t last_relay_stream_id
;
154 * Relay command queue.
156 * The relay_thread_listener and relay_thread_dispatcher communicate with this
159 static struct relay_conn_queue relay_conn_queue
;
161 /* Global relay stream hash table. */
162 struct lttng_ht
*relay_streams_ht
;
164 /* Global relay viewer stream hash table. */
165 struct lttng_ht
*viewer_streams_ht
;
167 /* Global relay sessions hash table. */
168 struct lttng_ht
*sessions_ht
;
170 /* Relayd health monitoring */
171 struct health_app
*health_relayd
;
173 struct sessiond_trace_chunk_registry
*sessiond_trace_chunk_registry
;
175 static struct option long_options
[] = {
176 { "control-port", 1, 0, 'C', },
177 { "data-port", 1, 0, 'D', },
178 { "live-port", 1, 0, 'L', },
179 { "daemonize", 0, 0, 'd', },
180 { "background", 0, 0, 'b', },
181 { "group", 1, 0, 'g', },
182 { "help", 0, 0, 'h', },
183 { "output", 1, 0, 'o', },
184 { "verbose", 0, 0, 'v', },
185 { "config", 1, 0, 'f' },
186 { "version", 0, 0, 'V' },
190 static const char *config_ignore_options
[] = { "help", "config", "version" };
192 static void print_version(void) {
193 fprintf(stdout
, "%s\n", VERSION
);
196 static void relayd_config_log(void)
198 DBG("LTTng-relayd " VERSION
" - " VERSION_NAME
"%s%s",
199 GIT_VERSION
[0] == '\0' ? "" : " - " GIT_VERSION
,
200 EXTRA_VERSION_NAME
[0] == '\0' ? "" : " - " EXTRA_VERSION_NAME
);
201 if (EXTRA_VERSION_DESCRIPTION
[0] != '\0') {
202 DBG("LTTng-relayd extra version description:\n\t" EXTRA_VERSION_DESCRIPTION
"\n");
204 if (EXTRA_VERSION_PATCHES
[0] != '\0') {
205 DBG("LTTng-relayd extra patches:\n\t" EXTRA_VERSION_PATCHES
"\n");
210 * Take an option from the getopt output and set it in the right variable to be
213 * Return 0 on success else a negative value.
215 static int set_option(int opt
, const char *arg
, const char *optname
)
221 fprintf(stderr
, "option %s", optname
);
223 fprintf(stderr
, " with arg %s\n", arg
);
227 if (lttng_is_setuid_setgid()) {
228 WARN("Getting '%s' argument from setuid/setgid binary refused for security reasons.",
229 "-C, --control-port");
231 ret
= uri_parse(arg
, &control_uri
);
233 ERR("Invalid control URI specified");
236 if (control_uri
->port
== 0) {
237 control_uri
->port
= DEFAULT_NETWORK_CONTROL_PORT
;
242 if (lttng_is_setuid_setgid()) {
243 WARN("Getting '%s' argument from setuid/setgid binary refused for security reasons.",
246 ret
= uri_parse(arg
, &data_uri
);
248 ERR("Invalid data URI specified");
251 if (data_uri
->port
== 0) {
252 data_uri
->port
= DEFAULT_NETWORK_DATA_PORT
;
257 if (lttng_is_setuid_setgid()) {
258 WARN("Getting '%s' argument from setuid/setgid binary refused for security reasons.",
261 ret
= uri_parse(arg
, &live_uri
);
263 ERR("Invalid live URI specified");
266 if (live_uri
->port
== 0) {
267 live_uri
->port
= DEFAULT_NETWORK_VIEWER_PORT
;
278 if (lttng_is_setuid_setgid()) {
279 WARN("Getting '%s' argument from setuid/setgid binary refused for security reasons.",
282 tracing_group_name
= strdup(arg
);
283 if (tracing_group_name
== NULL
) {
288 tracing_group_name_override
= 1;
292 ret
= utils_show_help(8, "lttng-relayd", help_msg
);
294 ERR("Cannot show --help for `lttng-relayd`");
299 opt_print_version
= 1;
302 if (lttng_is_setuid_setgid()) {
303 WARN("Getting '%s' argument from setuid/setgid binary refused for security reasons.",
306 ret
= asprintf(&opt_output_path
, "%s", arg
);
309 PERROR("asprintf opt_output_path");
315 /* Verbose level can increase using multiple -v */
317 lttng_opt_verbose
= config_parse_value(arg
);
319 /* Only 3 level of verbosity (-vvv). */
320 if (lttng_opt_verbose
< 3) {
321 lttng_opt_verbose
+= 1;
326 /* Unknown option or other error.
327 * Error is printed by getopt, just return */
340 * config_entry_handler_cb used to handle options read from a config file.
341 * See config_entry_handler_cb comment in common/config/session-config.h for the
342 * return value conventions.
344 static int config_entry_handler(const struct config_entry
*entry
, void *unused
)
348 if (!entry
|| !entry
->name
|| !entry
->value
) {
353 /* Check if the option is to be ignored */
354 for (i
= 0; i
< sizeof(config_ignore_options
) / sizeof(char *); i
++) {
355 if (!strcmp(entry
->name
, config_ignore_options
[i
])) {
360 for (i
= 0; i
< (sizeof(long_options
) / sizeof(struct option
)) - 1; i
++) {
361 /* Ignore if entry name is not fully matched. */
362 if (strcmp(entry
->name
, long_options
[i
].name
)) {
367 * If the option takes no argument on the command line,
368 * we have to check if the value is "true". We support
369 * non-zero numeric values, true, on and yes.
371 if (!long_options
[i
].has_arg
) {
372 ret
= config_parse_value(entry
->value
);
375 WARN("Invalid configuration value \"%s\" for option %s",
376 entry
->value
, entry
->name
);
378 /* False, skip boolean config option. */
383 ret
= set_option(long_options
[i
].val
, entry
->value
, entry
->name
);
387 WARN("Unrecognized option \"%s\" in daemon configuration file.",
394 static int set_options(int argc
, char **argv
)
396 int c
, ret
= 0, option_index
= 0, retval
= 0;
397 int orig_optopt
= optopt
, orig_optind
= optind
;
398 char *default_address
, *optstring
;
399 const char *config_path
= NULL
;
401 optstring
= utils_generate_optstring(long_options
,
402 sizeof(long_options
) / sizeof(struct option
));
408 /* Check for the --config option */
410 while ((c
= getopt_long(argc
, argv
, optstring
, long_options
,
411 &option_index
)) != -1) {
415 } else if (c
!= 'f') {
419 if (lttng_is_setuid_setgid()) {
420 WARN("Getting '%s' argument from setuid/setgid binary refused for security reasons.",
423 config_path
= utils_expand_path(optarg
);
425 ERR("Failed to resolve path: %s", optarg
);
430 ret
= config_get_section_entries(config_path
, config_section_name
,
431 config_entry_handler
, NULL
);
434 ERR("Invalid configuration option at line %i", ret
);
440 /* Reset getopt's global state */
441 optopt
= orig_optopt
;
442 optind
= orig_optind
;
444 c
= getopt_long(argc
, argv
, optstring
, long_options
, &option_index
);
449 ret
= set_option(c
, optarg
, long_options
[option_index
].name
);
456 /* assign default values */
457 if (control_uri
== NULL
) {
458 ret
= asprintf(&default_address
,
459 "tcp://" DEFAULT_NETWORK_CONTROL_BIND_ADDRESS
":%d",
460 DEFAULT_NETWORK_CONTROL_PORT
);
462 PERROR("asprintf default data address");
467 ret
= uri_parse(default_address
, &control_uri
);
468 free(default_address
);
470 ERR("Invalid control URI specified");
475 if (data_uri
== NULL
) {
476 ret
= asprintf(&default_address
,
477 "tcp://" DEFAULT_NETWORK_DATA_BIND_ADDRESS
":%d",
478 DEFAULT_NETWORK_DATA_PORT
);
480 PERROR("asprintf default data address");
485 ret
= uri_parse(default_address
, &data_uri
);
486 free(default_address
);
488 ERR("Invalid data URI specified");
493 if (live_uri
== NULL
) {
494 ret
= asprintf(&default_address
,
495 "tcp://" DEFAULT_NETWORK_VIEWER_BIND_ADDRESS
":%d",
496 DEFAULT_NETWORK_VIEWER_PORT
);
498 PERROR("asprintf default viewer control address");
503 ret
= uri_parse(default_address
, &live_uri
);
504 free(default_address
);
506 ERR("Invalid viewer control URI specified");
517 static void print_global_objects(void)
519 rcu_register_thread();
521 print_viewer_streams();
522 print_relay_streams();
525 rcu_unregister_thread();
531 static void relayd_cleanup(void)
533 print_global_objects();
537 if (viewer_streams_ht
)
538 lttng_ht_destroy(viewer_streams_ht
);
539 if (relay_streams_ht
)
540 lttng_ht_destroy(relay_streams_ht
);
542 lttng_ht_destroy(sessions_ht
);
544 /* free the dynamically allocated opt_output_path */
545 free(opt_output_path
);
547 /* Close thread quit pipes */
548 utils_close_pipe(thread_quit_pipe
);
550 uri_free(control_uri
);
552 /* Live URI is freed in the live thread. */
554 if (tracing_group_name_override
) {
555 free((void *) tracing_group_name
);
560 * Write to writable pipe used to notify a thread.
562 static int notify_thread_pipe(int wpipe
)
566 ret
= lttng_write(wpipe
, "!", 1);
568 PERROR("write poll pipe");
576 static int notify_health_quit_pipe(int *pipe
)
580 ret
= lttng_write(pipe
[1], "4", 1);
582 PERROR("write relay health quit");
591 * Stop all relayd and relayd-live threads.
593 int lttng_relay_stop_threads(void)
597 /* Stopping all threads */
598 DBG("Terminating all threads");
599 if (notify_thread_pipe(thread_quit_pipe
[1])) {
600 ERR("write error on thread quit pipe");
604 if (notify_health_quit_pipe(health_quit_pipe
)) {
605 ERR("write error on health quit pipe");
608 /* Dispatch thread */
609 CMM_STORE_SHARED(dispatch_thread_exit
, 1);
610 futex_nto1_wake(&relay_conn_queue
.futex
);
612 if (relayd_live_stop()) {
613 ERR("Error stopping live threads");
620 * Signal handler for the daemon
622 * Simply stop all worker threads, leaving main() return gracefully after
623 * joining all threads and calling cleanup().
625 static void sighandler(int sig
)
629 DBG("SIGINT caught");
630 if (lttng_relay_stop_threads()) {
631 ERR("Error stopping threads");
635 DBG("SIGTERM caught");
636 if (lttng_relay_stop_threads()) {
637 ERR("Error stopping threads");
641 CMM_STORE_SHARED(recv_child_signal
, 1);
649 * Setup signal handler for :
650 * SIGINT, SIGTERM, SIGPIPE
652 static int set_signal_handler(void)
658 if ((ret
= sigemptyset(&sigset
)) < 0) {
659 PERROR("sigemptyset");
666 sa
.sa_handler
= sighandler
;
667 if ((ret
= sigaction(SIGTERM
, &sa
, NULL
)) < 0) {
672 if ((ret
= sigaction(SIGINT
, &sa
, NULL
)) < 0) {
677 if ((ret
= sigaction(SIGUSR1
, &sa
, NULL
)) < 0) {
682 sa
.sa_handler
= SIG_IGN
;
683 if ((ret
= sigaction(SIGPIPE
, &sa
, NULL
)) < 0) {
688 DBG("Signal handler set for SIGTERM, SIGUSR1, SIGPIPE and SIGINT");
693 void lttng_relay_notify_ready(void)
695 /* Notify the parent of the fork() process that we are ready. */
696 if (opt_daemon
|| opt_background
) {
697 if (uatomic_sub_return(<tng_relay_ready
, 1) == 0) {
698 kill(child_ppid
, SIGUSR1
);
704 * Init thread quit pipe.
706 * Return -1 on error or 0 if all pipes are created.
708 static int init_thread_quit_pipe(void)
712 ret
= utils_create_pipe_cloexec(thread_quit_pipe
);
718 * Create a poll set with O_CLOEXEC and add the thread quit pipe to the set.
720 static int create_thread_poll_set(struct lttng_poll_event
*events
, int size
)
724 if (events
== NULL
|| size
== 0) {
729 ret
= lttng_poll_create(events
, size
, LTTNG_CLOEXEC
);
735 ret
= lttng_poll_add(events
, thread_quit_pipe
[0], LPOLLIN
| LPOLLERR
);
747 * Check if the thread quit pipe was triggered.
749 * Return 1 if it was triggered else 0;
751 static int check_thread_quit_pipe(int fd
, uint32_t events
)
753 if (fd
== thread_quit_pipe
[0] && (events
& LPOLLIN
)) {
761 * Create and init socket from uri.
763 static struct lttcomm_sock
*relay_socket_create(struct lttng_uri
*uri
)
766 struct lttcomm_sock
*sock
= NULL
;
768 sock
= lttcomm_alloc_sock_from_uri(uri
);
770 ERR("Allocating socket");
774 ret
= lttcomm_create_sock(sock
);
778 DBG("Listening on sock %d", sock
->fd
);
780 ret
= sock
->ops
->bind(sock
);
782 PERROR("Failed to bind socket");
786 ret
= sock
->ops
->listen(sock
, -1);
796 lttcomm_destroy_sock(sock
);
802 * This thread manages the listening for new connections on the network
804 static void *relay_thread_listener(void *data
)
806 int i
, ret
, pollfd
, err
= -1;
807 uint32_t revents
, nb_fd
;
808 struct lttng_poll_event events
;
809 struct lttcomm_sock
*control_sock
, *data_sock
;
811 DBG("[thread] Relay listener started");
813 health_register(health_relayd
, HEALTH_RELAYD_TYPE_LISTENER
);
815 health_code_update();
817 control_sock
= relay_socket_create(control_uri
);
819 goto error_sock_control
;
822 data_sock
= relay_socket_create(data_uri
);
824 goto error_sock_relay
;
828 * Pass 3 as size here for the thread quit pipe, control and
831 ret
= create_thread_poll_set(&events
, 3);
833 goto error_create_poll
;
836 /* Add the control socket */
837 ret
= lttng_poll_add(&events
, control_sock
->fd
, LPOLLIN
| LPOLLRDHUP
);
842 /* Add the data socket */
843 ret
= lttng_poll_add(&events
, data_sock
->fd
, LPOLLIN
| LPOLLRDHUP
);
848 lttng_relay_notify_ready();
850 if (testpoint(relayd_thread_listener
)) {
851 goto error_testpoint
;
855 health_code_update();
857 DBG("Listener accepting connections");
861 ret
= lttng_poll_wait(&events
, -1);
865 * Restart interrupted system call.
867 if (errno
== EINTR
) {
875 DBG("Relay new connection received");
876 for (i
= 0; i
< nb_fd
; i
++) {
877 health_code_update();
879 /* Fetch once the poll data */
880 revents
= LTTNG_POLL_GETEV(&events
, i
);
881 pollfd
= LTTNG_POLL_GETFD(&events
, i
);
883 /* Thread quit pipe has been closed. Killing thread. */
884 ret
= check_thread_quit_pipe(pollfd
, revents
);
890 if (revents
& LPOLLIN
) {
892 * A new connection is requested, therefore a
893 * sessiond/consumerd connection is allocated in
894 * this thread, enqueued to a global queue and
895 * dequeued (and freed) in the worker thread.
898 struct relay_connection
*new_conn
;
899 struct lttcomm_sock
*newsock
;
900 enum connection_type type
;
902 if (pollfd
== data_sock
->fd
) {
904 newsock
= data_sock
->ops
->accept(data_sock
);
905 DBG("Relay data connection accepted, socket %d",
908 assert(pollfd
== control_sock
->fd
);
909 type
= RELAY_CONTROL
;
910 newsock
= control_sock
->ops
->accept(control_sock
);
911 DBG("Relay control connection accepted, socket %d",
915 PERROR("accepting sock");
919 ret
= setsockopt(newsock
->fd
, SOL_SOCKET
, SO_REUSEADDR
, &val
,
922 PERROR("setsockopt inet");
923 lttcomm_destroy_sock(newsock
);
927 ret
= socket_apply_keep_alive_config(newsock
->fd
);
929 ERR("Failed to apply TCP keep-alive configuration on socket (%i)",
931 lttcomm_destroy_sock(newsock
);
935 new_conn
= connection_create(newsock
, type
);
937 lttcomm_destroy_sock(newsock
);
941 /* Enqueue request for the dispatcher thread. */
942 cds_wfcq_enqueue(&relay_conn_queue
.head
, &relay_conn_queue
.tail
,
946 * Wake the dispatch queue futex.
947 * Implicit memory barrier with the
948 * exchange in cds_wfcq_enqueue.
950 futex_nto1_wake(&relay_conn_queue
.futex
);
951 } else if (revents
& (LPOLLERR
| LPOLLHUP
| LPOLLRDHUP
)) {
952 ERR("socket poll error");
955 ERR("Unexpected poll events %u for sock %d", revents
, pollfd
);
965 lttng_poll_clean(&events
);
967 if (data_sock
->fd
>= 0) {
968 ret
= data_sock
->ops
->close(data_sock
);
973 lttcomm_destroy_sock(data_sock
);
975 if (control_sock
->fd
>= 0) {
976 ret
= control_sock
->ops
->close(control_sock
);
981 lttcomm_destroy_sock(control_sock
);
985 ERR("Health error occurred in %s", __func__
);
987 health_unregister(health_relayd
);
988 DBG("Relay listener thread cleanup complete");
989 lttng_relay_stop_threads();
994 * This thread manages the dispatching of the requests to worker threads
996 static void *relay_thread_dispatcher(void *data
)
1000 struct cds_wfcq_node
*node
;
1001 struct relay_connection
*new_conn
= NULL
;
1003 DBG("[thread] Relay dispatcher started");
1005 health_register(health_relayd
, HEALTH_RELAYD_TYPE_DISPATCHER
);
1007 if (testpoint(relayd_thread_dispatcher
)) {
1008 goto error_testpoint
;
1011 health_code_update();
1014 health_code_update();
1016 /* Atomically prepare the queue futex */
1017 futex_nto1_prepare(&relay_conn_queue
.futex
);
1019 if (CMM_LOAD_SHARED(dispatch_thread_exit
)) {
1024 health_code_update();
1026 /* Dequeue commands */
1027 node
= cds_wfcq_dequeue_blocking(&relay_conn_queue
.head
,
1028 &relay_conn_queue
.tail
);
1030 DBG("Woken up but nothing in the relay command queue");
1031 /* Continue thread execution */
1034 new_conn
= caa_container_of(node
, struct relay_connection
, qnode
);
1036 DBG("Dispatching request waiting on sock %d", new_conn
->sock
->fd
);
1039 * Inform worker thread of the new request. This
1040 * call is blocking so we can be assured that
1041 * the data will be read at some point in time
1042 * or wait to the end of the world :)
1044 ret
= lttng_write(relay_conn_pipe
[1], &new_conn
, sizeof(new_conn
));
1046 PERROR("write connection pipe");
1047 connection_put(new_conn
);
1050 } while (node
!= NULL
);
1052 /* Futex wait on queue. Blocking call on futex() */
1053 health_poll_entry();
1054 futex_nto1_wait(&relay_conn_queue
.futex
);
1058 /* Normal exit, no error */
1065 ERR("Health error occurred in %s", __func__
);
1067 health_unregister(health_relayd
);
1068 DBG("Dispatch thread dying");
1069 lttng_relay_stop_threads();
1073 static bool session_streams_have_index(const struct relay_session
*session
)
1075 return session
->minor
>= 4 && !session
->snapshot
;
1079 * Handle the RELAYD_CREATE_SESSION command.
1081 * On success, send back the session id or else return a negative value.
1083 static int relay_create_session(const struct lttcomm_relayd_hdr
*recv_hdr
,
1084 struct relay_connection
*conn
,
1085 const struct lttng_buffer_view
*payload
)
1089 struct relay_session
*session
= NULL
;
1090 struct lttcomm_relayd_create_session_reply_2_11 reply
= {};
1091 char session_name
[LTTNG_NAME_MAX
] = {};
1092 char hostname
[LTTNG_HOST_NAME_MAX
] = {};
1093 uint32_t live_timer
= 0;
1094 bool snapshot
= false;
1095 bool session_name_contains_creation_timestamp
= false;
1096 /* Left nil for peers < 2.11. */
1097 char base_path
[LTTNG_PATH_MAX
] = {};
1098 lttng_uuid sessiond_uuid
= {};
1099 LTTNG_OPTIONAL(uint64_t) id_sessiond
= {};
1100 LTTNG_OPTIONAL(uint64_t) current_chunk_id
= {};
1101 LTTNG_OPTIONAL(time_t) creation_time
= {};
1102 struct lttng_dynamic_buffer reply_payload
;
1104 lttng_dynamic_buffer_init(&reply_payload
);
1106 if (conn
->minor
< 4) {
1107 /* From 2.1 to 2.3 */
1109 } else if (conn
->minor
>= 4 && conn
->minor
< 11) {
1110 /* From 2.4 to 2.10 */
1111 ret
= cmd_create_session_2_4(payload
, session_name
,
1112 hostname
, &live_timer
, &snapshot
);
1114 bool has_current_chunk
;
1115 uint64_t current_chunk_id_value
;
1116 time_t creation_time_value
;
1117 uint64_t id_sessiond_value
;
1119 /* From 2.11 to ... */
1120 ret
= cmd_create_session_2_11(payload
, session_name
, hostname
,
1121 base_path
, &live_timer
, &snapshot
, &id_sessiond_value
,
1122 sessiond_uuid
, &has_current_chunk
,
1123 ¤t_chunk_id_value
, &creation_time_value
,
1124 &session_name_contains_creation_timestamp
);
1125 if (lttng_uuid_is_nil(sessiond_uuid
)) {
1126 /* The nil UUID is reserved for pre-2.11 clients. */
1127 ERR("Illegal nil UUID announced by peer in create session command");
1131 LTTNG_OPTIONAL_SET(&id_sessiond
, id_sessiond_value
);
1132 LTTNG_OPTIONAL_SET(&creation_time
, creation_time_value
);
1133 if (has_current_chunk
) {
1134 LTTNG_OPTIONAL_SET(¤t_chunk_id
,
1135 current_chunk_id_value
);
1143 session
= session_create(session_name
, hostname
, base_path
, live_timer
,
1144 snapshot
, sessiond_uuid
,
1145 id_sessiond
.is_set
? &id_sessiond
.value
: NULL
,
1146 current_chunk_id
.is_set
? ¤t_chunk_id
.value
: NULL
,
1147 creation_time
.is_set
? &creation_time
.value
: NULL
,
1148 conn
->major
, conn
->minor
,
1149 session_name_contains_creation_timestamp
);
1154 assert(!conn
->session
);
1155 conn
->session
= session
;
1156 DBG("Created session %" PRIu64
, session
->id
);
1158 reply
.generic
.session_id
= htobe64(session
->id
);
1162 reply
.generic
.ret_code
= htobe32(LTTNG_ERR_FATAL
);
1164 reply
.generic
.ret_code
= htobe32(LTTNG_OK
);
1167 if (conn
->minor
< 11) {
1168 /* From 2.1 to 2.10 */
1169 ret
= lttng_dynamic_buffer_append(&reply_payload
,
1170 &reply
.generic
, sizeof(reply
.generic
));
1172 ERR("Failed to append \"create session\" command reply header to payload buffer");
1177 const uint32_t output_path_length
=
1178 session
? strlen(session
->output_path
) + 1 : 0;
1180 reply
.output_path_length
= htobe32(output_path_length
);
1181 ret
= lttng_dynamic_buffer_append(
1182 &reply_payload
, &reply
, sizeof(reply
));
1184 ERR("Failed to append \"create session\" command reply header to payload buffer");
1188 if (output_path_length
) {
1189 ret
= lttng_dynamic_buffer_append(&reply_payload
,
1190 session
->output_path
,
1191 output_path_length
);
1193 ERR("Failed to append \"create session\" command reply path to payload buffer");
1199 send_ret
= conn
->sock
->ops
->sendmsg(conn
->sock
, reply_payload
.data
,
1200 reply_payload
.size
, 0);
1201 if (send_ret
< (ssize_t
) reply_payload
.size
) {
1202 ERR("Failed to send \"create session\" command reply of %zu bytes (ret = %zd)",
1203 reply_payload
.size
, send_ret
);
1207 if (ret
< 0 && session
) {
1208 session_put(session
);
1210 lttng_dynamic_buffer_reset(&reply_payload
);
1215 * When we have received all the streams and the metadata for a channel,
1216 * we make them visible to the viewer threads.
1218 static void publish_connection_local_streams(struct relay_connection
*conn
)
1220 struct relay_stream
*stream
;
1221 struct relay_session
*session
= conn
->session
;
1224 * We publish all streams belonging to a session atomically wrt
1227 pthread_mutex_lock(&session
->lock
);
1229 cds_list_for_each_entry_rcu(stream
, &session
->recv_list
,
1231 stream_publish(stream
);
1236 * Inform the viewer that there are new streams in the session.
1238 if (session
->viewer_attached
) {
1239 uatomic_set(&session
->new_streams
, 1);
1241 pthread_mutex_unlock(&session
->lock
);
1244 static int conform_channel_path(char *channel_path
)
1248 if (strstr("../", channel_path
)) {
1249 ERR("Refusing channel path as it walks up the path hierarchy: \"%s\"",
1255 if (*channel_path
== '/') {
1256 const size_t len
= strlen(channel_path
);
1259 * Channel paths from peers prior to 2.11 are expressed as an
1260 * absolute path that is, in reality, relative to the relay
1261 * daemon's output directory. Remove the leading slash so it
1262 * is correctly interpreted as a relative path later on.
1264 * len (and not len - 1) is used to copy the trailing NULL.
1266 bcopy(channel_path
+ 1, channel_path
, len
);
1273 * relay_add_stream: allocate a new stream for a session
1275 static int relay_add_stream(const struct lttcomm_relayd_hdr
*recv_hdr
,
1276 struct relay_connection
*conn
,
1277 const struct lttng_buffer_view
*payload
)
1281 struct relay_session
*session
= conn
->session
;
1282 struct relay_stream
*stream
= NULL
;
1283 struct lttcomm_relayd_status_stream reply
;
1284 struct ctf_trace
*trace
= NULL
;
1285 uint64_t stream_handle
= -1ULL;
1286 char *path_name
= NULL
, *channel_name
= NULL
;
1287 uint64_t tracefile_size
= 0, tracefile_count
= 0;
1288 LTTNG_OPTIONAL(uint64_t) stream_chunk_id
= {};
1290 if (!session
|| !conn
->version_check_done
) {
1291 ERR("Trying to add a stream before version check");
1293 goto end_no_session
;
1296 if (session
->minor
== 1) {
1298 ret
= cmd_recv_stream_2_1(payload
, &path_name
,
1300 } else if (session
->minor
> 1 && session
->minor
< 11) {
1301 /* From 2.2 to 2.10 */
1302 ret
= cmd_recv_stream_2_2(payload
, &path_name
,
1303 &channel_name
, &tracefile_size
, &tracefile_count
);
1305 /* From 2.11 to ... */
1306 ret
= cmd_recv_stream_2_11(payload
, &path_name
,
1307 &channel_name
, &tracefile_size
, &tracefile_count
,
1308 &stream_chunk_id
.value
);
1309 stream_chunk_id
.is_set
= true;
1316 if (conform_channel_path(path_name
)) {
1320 trace
= ctf_trace_get_by_path_or_create(session
, path_name
);
1324 /* This stream here has one reference on the trace. */
1326 pthread_mutex_lock(&last_relay_stream_id_lock
);
1327 stream_handle
= ++last_relay_stream_id
;
1328 pthread_mutex_unlock(&last_relay_stream_id_lock
);
1330 /* We pass ownership of path_name and channel_name. */
1331 stream
= stream_create(trace
, stream_handle
, path_name
,
1332 channel_name
, tracefile_size
, tracefile_count
);
1334 channel_name
= NULL
;
1337 * Streams are the owners of their trace. Reference to trace is
1338 * kept within stream_create().
1340 ctf_trace_put(trace
);
1343 memset(&reply
, 0, sizeof(reply
));
1344 reply
.handle
= htobe64(stream_handle
);
1346 reply
.ret_code
= htobe32(LTTNG_ERR_UNK
);
1348 reply
.ret_code
= htobe32(LTTNG_OK
);
1351 send_ret
= conn
->sock
->ops
->sendmsg(conn
->sock
, &reply
,
1352 sizeof(struct lttcomm_relayd_status_stream
), 0);
1353 if (send_ret
< (ssize_t
) sizeof(reply
)) {
1354 ERR("Failed to send \"add stream\" command reply (ret = %zd)",
1366 * relay_close_stream: close a specific stream
1368 static int relay_close_stream(const struct lttcomm_relayd_hdr
*recv_hdr
,
1369 struct relay_connection
*conn
,
1370 const struct lttng_buffer_view
*payload
)
1374 struct relay_session
*session
= conn
->session
;
1375 struct lttcomm_relayd_close_stream stream_info
;
1376 struct lttcomm_relayd_generic_reply reply
;
1377 struct relay_stream
*stream
;
1379 DBG("Close stream received");
1381 if (!session
|| !conn
->version_check_done
) {
1382 ERR("Trying to close a stream before version check");
1384 goto end_no_session
;
1387 if (payload
->size
< sizeof(stream_info
)) {
1388 ERR("Unexpected payload size in \"relay_close_stream\": expected >= %zu bytes, got %zu bytes",
1389 sizeof(stream_info
), payload
->size
);
1391 goto end_no_session
;
1393 memcpy(&stream_info
, payload
->data
, sizeof(stream_info
));
1394 stream_info
.stream_id
= be64toh(stream_info
.stream_id
);
1395 stream_info
.last_net_seq_num
= be64toh(stream_info
.last_net_seq_num
);
1397 stream
= stream_get_by_id(stream_info
.stream_id
);
1404 * Set last_net_seq_num before the close flag. Required by data
1407 pthread_mutex_lock(&stream
->lock
);
1408 stream
->last_net_seq_num
= stream_info
.last_net_seq_num
;
1409 pthread_mutex_unlock(&stream
->lock
);
1412 * This is one of the conditions which may trigger a stream close
1413 * with the others being:
1414 * 1) A close command is received for a stream
1415 * 2) The control connection owning the stream is closed
1416 * 3) We have received all of the stream's data _after_ a close
1419 try_stream_close(stream
);
1420 if (stream
->is_metadata
) {
1421 struct relay_viewer_stream
*vstream
;
1423 vstream
= viewer_stream_get_by_id(stream
->stream_handle
);
1425 if (vstream
->metadata_sent
== stream
->metadata_received
) {
1427 * Since all the metadata has been sent to the
1428 * viewer and that we have a request to close
1429 * its stream, we can safely teardown the
1430 * corresponding metadata viewer stream.
1432 viewer_stream_put(vstream
);
1434 /* Put local reference. */
1435 viewer_stream_put(vstream
);
1442 memset(&reply
, 0, sizeof(reply
));
1444 reply
.ret_code
= htobe32(LTTNG_ERR_UNK
);
1446 reply
.ret_code
= htobe32(LTTNG_OK
);
1448 send_ret
= conn
->sock
->ops
->sendmsg(conn
->sock
, &reply
,
1449 sizeof(struct lttcomm_relayd_generic_reply
), 0);
1450 if (send_ret
< (ssize_t
) sizeof(reply
)) {
1451 ERR("Failed to send \"close stream\" command reply (ret = %zd)",
1461 * relay_reset_metadata: reset a metadata stream
1464 int relay_reset_metadata(const struct lttcomm_relayd_hdr
*recv_hdr
,
1465 struct relay_connection
*conn
,
1466 const struct lttng_buffer_view
*payload
)
1470 struct relay_session
*session
= conn
->session
;
1471 struct lttcomm_relayd_reset_metadata stream_info
;
1472 struct lttcomm_relayd_generic_reply reply
;
1473 struct relay_stream
*stream
;
1475 DBG("Reset metadata received");
1477 if (!session
|| !conn
->version_check_done
) {
1478 ERR("Trying to reset a metadata stream before version check");
1480 goto end_no_session
;
1483 if (payload
->size
< sizeof(stream_info
)) {
1484 ERR("Unexpected payload size in \"relay_reset_metadata\": expected >= %zu bytes, got %zu bytes",
1485 sizeof(stream_info
), payload
->size
);
1487 goto end_no_session
;
1489 memcpy(&stream_info
, payload
->data
, sizeof(stream_info
));
1490 stream_info
.stream_id
= be64toh(stream_info
.stream_id
);
1491 stream_info
.version
= be64toh(stream_info
.version
);
1493 DBG("Update metadata to version %" PRIu64
, stream_info
.version
);
1495 /* Unsupported for live sessions for now. */
1496 if (session
->live_timer
!= 0) {
1501 stream
= stream_get_by_id(stream_info
.stream_id
);
1506 pthread_mutex_lock(&stream
->lock
);
1507 if (!stream
->is_metadata
) {
1512 ret
= stream_reset_file(stream
);
1514 ERR("Failed to reset metadata stream %" PRIu64
1515 ": stream_path = %s, channel = %s",
1516 stream
->stream_handle
, stream
->path_name
,
1517 stream
->channel_name
);
1521 pthread_mutex_unlock(&stream
->lock
);
1525 memset(&reply
, 0, sizeof(reply
));
1527 reply
.ret_code
= htobe32(LTTNG_ERR_UNK
);
1529 reply
.ret_code
= htobe32(LTTNG_OK
);
1531 send_ret
= conn
->sock
->ops
->sendmsg(conn
->sock
, &reply
,
1532 sizeof(struct lttcomm_relayd_generic_reply
), 0);
1533 if (send_ret
< (ssize_t
) sizeof(reply
)) {
1534 ERR("Failed to send \"reset metadata\" command reply (ret = %zd)",
1544 * relay_unknown_command: send -1 if received unknown command
1546 static void relay_unknown_command(struct relay_connection
*conn
)
1548 struct lttcomm_relayd_generic_reply reply
;
1551 memset(&reply
, 0, sizeof(reply
));
1552 reply
.ret_code
= htobe32(LTTNG_ERR_UNK
);
1553 send_ret
= conn
->sock
->ops
->sendmsg(conn
->sock
, &reply
, sizeof(reply
), 0);
1554 if (send_ret
< sizeof(reply
)) {
1555 ERR("Failed to send \"unknown command\" command reply (ret = %zd)", send_ret
);
1560 * relay_start: send an acknowledgment to the client to tell if we are
1561 * ready to receive data. We are ready if a session is established.
1563 static int relay_start(const struct lttcomm_relayd_hdr
*recv_hdr
,
1564 struct relay_connection
*conn
,
1565 const struct lttng_buffer_view
*payload
)
1569 struct lttcomm_relayd_generic_reply reply
;
1570 struct relay_session
*session
= conn
->session
;
1573 DBG("Trying to start the streaming without a session established");
1574 ret
= htobe32(LTTNG_ERR_UNK
);
1577 memset(&reply
, 0, sizeof(reply
));
1578 reply
.ret_code
= htobe32(LTTNG_OK
);
1579 send_ret
= conn
->sock
->ops
->sendmsg(conn
->sock
, &reply
,
1581 if (send_ret
< (ssize_t
) sizeof(reply
)) {
1582 ERR("Failed to send \"relay_start\" command reply (ret = %zd)",
1591 * relay_recv_metadata: receive the metadata for the session.
1593 static int relay_recv_metadata(const struct lttcomm_relayd_hdr
*recv_hdr
,
1594 struct relay_connection
*conn
,
1595 const struct lttng_buffer_view
*payload
)
1598 struct relay_session
*session
= conn
->session
;
1599 struct lttcomm_relayd_metadata_payload metadata_payload_header
;
1600 struct relay_stream
*metadata_stream
;
1601 uint64_t metadata_payload_size
;
1602 struct lttng_buffer_view packet_view
;
1605 ERR("Metadata sent before version check");
1610 if (recv_hdr
->data_size
< sizeof(struct lttcomm_relayd_metadata_payload
)) {
1611 ERR("Incorrect data size");
1615 metadata_payload_size
= recv_hdr
->data_size
-
1616 sizeof(struct lttcomm_relayd_metadata_payload
);
1618 memcpy(&metadata_payload_header
, payload
->data
,
1619 sizeof(metadata_payload_header
));
1620 metadata_payload_header
.stream_id
= be64toh(
1621 metadata_payload_header
.stream_id
);
1622 metadata_payload_header
.padding_size
= be32toh(
1623 metadata_payload_header
.padding_size
);
1625 metadata_stream
= stream_get_by_id(metadata_payload_header
.stream_id
);
1626 if (!metadata_stream
) {
1631 packet_view
= lttng_buffer_view_from_view(payload
,
1632 sizeof(metadata_payload_header
), metadata_payload_size
);
1633 if (!packet_view
.data
) {
1634 ERR("Invalid metadata packet length announced by header");
1639 pthread_mutex_lock(&metadata_stream
->lock
);
1640 ret
= stream_write(metadata_stream
, &packet_view
,
1641 metadata_payload_header
.padding_size
);
1642 pthread_mutex_unlock(&metadata_stream
->lock
);
1648 stream_put(metadata_stream
);
1654 * relay_send_version: send relayd version number
1656 static int relay_send_version(const struct lttcomm_relayd_hdr
*recv_hdr
,
1657 struct relay_connection
*conn
,
1658 const struct lttng_buffer_view
*payload
)
1662 struct lttcomm_relayd_version reply
, msg
;
1663 bool compatible
= true;
1665 conn
->version_check_done
= true;
1667 /* Get version from the other side. */
1668 if (payload
->size
< sizeof(msg
)) {
1669 ERR("Unexpected payload size in \"relay_send_version\": expected >= %zu bytes, got %zu bytes",
1670 sizeof(msg
), payload
->size
);
1675 memcpy(&msg
, payload
->data
, sizeof(msg
));
1676 msg
.major
= be32toh(msg
.major
);
1677 msg
.minor
= be32toh(msg
.minor
);
1679 memset(&reply
, 0, sizeof(reply
));
1680 reply
.major
= RELAYD_VERSION_COMM_MAJOR
;
1681 reply
.minor
= RELAYD_VERSION_COMM_MINOR
;
1683 /* Major versions must be the same */
1684 if (reply
.major
!= msg
.major
) {
1685 DBG("Incompatible major versions (%u vs %u), deleting session",
1686 reply
.major
, msg
.major
);
1690 conn
->major
= reply
.major
;
1691 /* We adapt to the lowest compatible version */
1692 if (reply
.minor
<= msg
.minor
) {
1693 conn
->minor
= reply
.minor
;
1695 conn
->minor
= msg
.minor
;
1698 reply
.major
= htobe32(reply
.major
);
1699 reply
.minor
= htobe32(reply
.minor
);
1700 send_ret
= conn
->sock
->ops
->sendmsg(conn
->sock
, &reply
,
1702 if (send_ret
< (ssize_t
) sizeof(reply
)) {
1703 ERR("Failed to send \"send version\" command reply (ret = %zd)",
1716 DBG("Version check done using protocol %u.%u", conn
->major
,
1724 * Check for data pending for a given stream id from the session daemon.
1726 static int relay_data_pending(const struct lttcomm_relayd_hdr
*recv_hdr
,
1727 struct relay_connection
*conn
,
1728 const struct lttng_buffer_view
*payload
)
1730 struct relay_session
*session
= conn
->session
;
1731 struct lttcomm_relayd_data_pending msg
;
1732 struct lttcomm_relayd_generic_reply reply
;
1733 struct relay_stream
*stream
;
1736 uint64_t stream_seq
;
1738 DBG("Data pending command received");
1740 if (!session
|| !conn
->version_check_done
) {
1741 ERR("Trying to check for data before version check");
1743 goto end_no_session
;
1746 if (payload
->size
< sizeof(msg
)) {
1747 ERR("Unexpected payload size in \"relay_data_pending\": expected >= %zu bytes, got %zu bytes",
1748 sizeof(msg
), payload
->size
);
1750 goto end_no_session
;
1752 memcpy(&msg
, payload
->data
, sizeof(msg
));
1753 msg
.stream_id
= be64toh(msg
.stream_id
);
1754 msg
.last_net_seq_num
= be64toh(msg
.last_net_seq_num
);
1756 stream
= stream_get_by_id(msg
.stream_id
);
1757 if (stream
== NULL
) {
1762 pthread_mutex_lock(&stream
->lock
);
1764 if (session_streams_have_index(session
)) {
1766 * Ensure that both the index and stream data have been
1767 * flushed up to the requested point.
1769 stream_seq
= min(stream
->prev_data_seq
, stream
->prev_index_seq
);
1771 stream_seq
= stream
->prev_data_seq
;
1773 DBG("Data pending for stream id %" PRIu64
": prev_data_seq %" PRIu64
1774 ", prev_index_seq %" PRIu64
1775 ", and last_seq %" PRIu64
, msg
.stream_id
,
1776 stream
->prev_data_seq
, stream
->prev_index_seq
,
1777 msg
.last_net_seq_num
);
1779 /* Avoid wrapping issue */
1780 if (((int64_t) (stream_seq
- msg
.last_net_seq_num
)) >= 0) {
1781 /* Data has in fact been written and is NOT pending */
1784 /* Data still being streamed thus pending */
1788 stream
->data_pending_check_done
= true;
1789 pthread_mutex_unlock(&stream
->lock
);
1794 memset(&reply
, 0, sizeof(reply
));
1795 reply
.ret_code
= htobe32(ret
);
1796 send_ret
= conn
->sock
->ops
->sendmsg(conn
->sock
, &reply
, sizeof(reply
), 0);
1797 if (send_ret
< (ssize_t
) sizeof(reply
)) {
1798 ERR("Failed to send \"data pending\" command reply (ret = %zd)",
1808 * Wait for the control socket to reach a quiescent state.
1810 * Note that for now, when receiving this command from the session
1811 * daemon, this means that every subsequent commands or data received on
1812 * the control socket has been handled. So, this is why we simply return
1815 static int relay_quiescent_control(const struct lttcomm_relayd_hdr
*recv_hdr
,
1816 struct relay_connection
*conn
,
1817 const struct lttng_buffer_view
*payload
)
1821 struct relay_stream
*stream
;
1822 struct lttcomm_relayd_quiescent_control msg
;
1823 struct lttcomm_relayd_generic_reply reply
;
1825 DBG("Checking quiescent state on control socket");
1827 if (!conn
->session
|| !conn
->version_check_done
) {
1828 ERR("Trying to check for data before version check");
1830 goto end_no_session
;
1833 if (payload
->size
< sizeof(msg
)) {
1834 ERR("Unexpected payload size in \"relay_quiescent_control\": expected >= %zu bytes, got %zu bytes",
1835 sizeof(msg
), payload
->size
);
1837 goto end_no_session
;
1839 memcpy(&msg
, payload
->data
, sizeof(msg
));
1840 msg
.stream_id
= be64toh(msg
.stream_id
);
1842 stream
= stream_get_by_id(msg
.stream_id
);
1846 pthread_mutex_lock(&stream
->lock
);
1847 stream
->data_pending_check_done
= true;
1848 pthread_mutex_unlock(&stream
->lock
);
1850 DBG("Relay quiescent control pending flag set to %" PRIu64
, msg
.stream_id
);
1853 memset(&reply
, 0, sizeof(reply
));
1854 reply
.ret_code
= htobe32(LTTNG_OK
);
1855 send_ret
= conn
->sock
->ops
->sendmsg(conn
->sock
, &reply
, sizeof(reply
), 0);
1856 if (send_ret
< (ssize_t
) sizeof(reply
)) {
1857 ERR("Failed to send \"quiescent control\" command reply (ret = %zd)",
1869 * Initialize a data pending command. This means that a consumer is about
1870 * to ask for data pending for each stream it holds. Simply iterate over
1871 * all streams of a session and set the data_pending_check_done flag.
1873 * This command returns to the client a LTTNG_OK code.
1875 static int relay_begin_data_pending(const struct lttcomm_relayd_hdr
*recv_hdr
,
1876 struct relay_connection
*conn
,
1877 const struct lttng_buffer_view
*payload
)
1881 struct lttng_ht_iter iter
;
1882 struct lttcomm_relayd_begin_data_pending msg
;
1883 struct lttcomm_relayd_generic_reply reply
;
1884 struct relay_stream
*stream
;
1889 DBG("Init streams for data pending");
1891 if (!conn
->session
|| !conn
->version_check_done
) {
1892 ERR("Trying to check for data before version check");
1894 goto end_no_session
;
1897 if (payload
->size
< sizeof(msg
)) {
1898 ERR("Unexpected payload size in \"relay_begin_data_pending\": expected >= %zu bytes, got %zu bytes",
1899 sizeof(msg
), payload
->size
);
1901 goto end_no_session
;
1903 memcpy(&msg
, payload
->data
, sizeof(msg
));
1904 msg
.session_id
= be64toh(msg
.session_id
);
1907 * Iterate over all streams to set the begin data pending flag.
1908 * For now, the streams are indexed by stream handle so we have
1909 * to iterate over all streams to find the one associated with
1910 * the right session_id.
1913 cds_lfht_for_each_entry(relay_streams_ht
->ht
, &iter
.iter
, stream
,
1915 if (!stream_get(stream
)) {
1918 if (stream
->trace
->session
->id
== msg
.session_id
) {
1919 pthread_mutex_lock(&stream
->lock
);
1920 stream
->data_pending_check_done
= false;
1921 pthread_mutex_unlock(&stream
->lock
);
1922 DBG("Set begin data pending flag to stream %" PRIu64
,
1923 stream
->stream_handle
);
1929 memset(&reply
, 0, sizeof(reply
));
1930 /* All good, send back reply. */
1931 reply
.ret_code
= htobe32(LTTNG_OK
);
1933 send_ret
= conn
->sock
->ops
->sendmsg(conn
->sock
, &reply
, sizeof(reply
), 0);
1934 if (send_ret
< (ssize_t
) sizeof(reply
)) {
1935 ERR("Failed to send \"begin data pending\" command reply (ret = %zd)",
1947 * End data pending command. This will check, for a given session id, if
1948 * each stream associated with it has its data_pending_check_done flag
1949 * set. If not, this means that the client lost track of the stream but
1950 * the data is still being streamed on our side. In this case, we inform
1951 * the client that data is in flight.
1953 * Return to the client if there is data in flight or not with a ret_code.
1955 static int relay_end_data_pending(const struct lttcomm_relayd_hdr
*recv_hdr
,
1956 struct relay_connection
*conn
,
1957 const struct lttng_buffer_view
*payload
)
1961 struct lttng_ht_iter iter
;
1962 struct lttcomm_relayd_end_data_pending msg
;
1963 struct lttcomm_relayd_generic_reply reply
;
1964 struct relay_stream
*stream
;
1965 uint32_t is_data_inflight
= 0;
1967 DBG("End data pending command");
1969 if (!conn
->session
|| !conn
->version_check_done
) {
1970 ERR("Trying to check for data before version check");
1972 goto end_no_session
;
1975 if (payload
->size
< sizeof(msg
)) {
1976 ERR("Unexpected payload size in \"relay_end_data_pending\": expected >= %zu bytes, got %zu bytes",
1977 sizeof(msg
), payload
->size
);
1979 goto end_no_session
;
1981 memcpy(&msg
, payload
->data
, sizeof(msg
));
1982 msg
.session_id
= be64toh(msg
.session_id
);
1985 * Iterate over all streams to see if the begin data pending
1989 cds_lfht_for_each_entry(relay_streams_ht
->ht
, &iter
.iter
, stream
,
1991 if (!stream_get(stream
)) {
1994 if (stream
->trace
->session
->id
!= msg
.session_id
) {
1998 pthread_mutex_lock(&stream
->lock
);
1999 if (!stream
->data_pending_check_done
) {
2000 uint64_t stream_seq
;
2002 if (session_streams_have_index(conn
->session
)) {
2004 * Ensure that both the index and stream data have been
2005 * flushed up to the requested point.
2007 stream_seq
= min(stream
->prev_data_seq
, stream
->prev_index_seq
);
2009 stream_seq
= stream
->prev_data_seq
;
2011 if (!stream
->closed
|| !(((int64_t) (stream_seq
- stream
->last_net_seq_num
)) >= 0)) {
2012 is_data_inflight
= 1;
2013 DBG("Data is still in flight for stream %" PRIu64
,
2014 stream
->stream_handle
);
2015 pthread_mutex_unlock(&stream
->lock
);
2020 pthread_mutex_unlock(&stream
->lock
);
2025 memset(&reply
, 0, sizeof(reply
));
2026 /* All good, send back reply. */
2027 reply
.ret_code
= htobe32(is_data_inflight
);
2029 send_ret
= conn
->sock
->ops
->sendmsg(conn
->sock
, &reply
, sizeof(reply
), 0);
2030 if (send_ret
< (ssize_t
) sizeof(reply
)) {
2031 ERR("Failed to send \"end data pending\" command reply (ret = %zd)",
2043 * Receive an index for a specific stream.
2045 * Return 0 on success else a negative value.
2047 static int relay_recv_index(const struct lttcomm_relayd_hdr
*recv_hdr
,
2048 struct relay_connection
*conn
,
2049 const struct lttng_buffer_view
*payload
)
2053 struct relay_session
*session
= conn
->session
;
2054 struct lttcomm_relayd_index index_info
;
2055 struct lttcomm_relayd_generic_reply reply
;
2056 struct relay_stream
*stream
;
2061 DBG("Relay receiving index");
2063 if (!session
|| !conn
->version_check_done
) {
2064 ERR("Trying to close a stream before version check");
2066 goto end_no_session
;
2069 msg_len
= lttcomm_relayd_index_len(
2070 lttng_to_index_major(conn
->major
, conn
->minor
),
2071 lttng_to_index_minor(conn
->major
, conn
->minor
));
2072 if (payload
->size
< msg_len
) {
2073 ERR("Unexpected payload size in \"relay_recv_index\": expected >= %zu bytes, got %zu bytes",
2074 msg_len
, payload
->size
);
2076 goto end_no_session
;
2078 memcpy(&index_info
, payload
->data
, msg_len
);
2079 index_info
.relay_stream_id
= be64toh(index_info
.relay_stream_id
);
2080 index_info
.net_seq_num
= be64toh(index_info
.net_seq_num
);
2081 index_info
.packet_size
= be64toh(index_info
.packet_size
);
2082 index_info
.content_size
= be64toh(index_info
.content_size
);
2083 index_info
.timestamp_begin
= be64toh(index_info
.timestamp_begin
);
2084 index_info
.timestamp_end
= be64toh(index_info
.timestamp_end
);
2085 index_info
.events_discarded
= be64toh(index_info
.events_discarded
);
2086 index_info
.stream_id
= be64toh(index_info
.stream_id
);
2088 if (conn
->minor
>= 8) {
2089 index_info
.stream_instance_id
=
2090 be64toh(index_info
.stream_instance_id
);
2091 index_info
.packet_seq_num
= be64toh(index_info
.packet_seq_num
);
2094 stream
= stream_get_by_id(index_info
.relay_stream_id
);
2096 ERR("stream_get_by_id not found");
2101 pthread_mutex_lock(&stream
->lock
);
2102 ret
= stream_add_index(stream
, &index_info
);
2103 pthread_mutex_unlock(&stream
->lock
);
2105 goto end_stream_put
;
2111 memset(&reply
, 0, sizeof(reply
));
2113 reply
.ret_code
= htobe32(LTTNG_ERR_UNK
);
2115 reply
.ret_code
= htobe32(LTTNG_OK
);
2117 send_ret
= conn
->sock
->ops
->sendmsg(conn
->sock
, &reply
, sizeof(reply
), 0);
2118 if (send_ret
< (ssize_t
) sizeof(reply
)) {
2119 ERR("Failed to send \"recv index\" command reply (ret = %zd)", send_ret
);
2128 * Receive the streams_sent message.
2130 * Return 0 on success else a negative value.
2132 static int relay_streams_sent(const struct lttcomm_relayd_hdr
*recv_hdr
,
2133 struct relay_connection
*conn
,
2134 const struct lttng_buffer_view
*payload
)
2138 struct lttcomm_relayd_generic_reply reply
;
2142 DBG("Relay receiving streams_sent");
2144 if (!conn
->session
|| !conn
->version_check_done
) {
2145 ERR("Trying to close a stream before version check");
2147 goto end_no_session
;
2151 * Publish every pending stream in the connection recv list which are
2152 * now ready to be used by the viewer.
2154 publish_connection_local_streams(conn
);
2156 memset(&reply
, 0, sizeof(reply
));
2157 reply
.ret_code
= htobe32(LTTNG_OK
);
2158 send_ret
= conn
->sock
->ops
->sendmsg(conn
->sock
, &reply
, sizeof(reply
), 0);
2159 if (send_ret
< (ssize_t
) sizeof(reply
)) {
2160 ERR("Failed to send \"streams sent\" command reply (ret = %zd)",
2173 * relay_rotate_session_stream: rotate a stream to a new tracefile for the
2174 * session rotation feature (not the tracefile rotation feature).
2176 static int relay_rotate_session_streams(
2177 const struct lttcomm_relayd_hdr
*recv_hdr
,
2178 struct relay_connection
*conn
,
2179 const struct lttng_buffer_view
*payload
)
2184 enum lttng_error_code reply_code
= LTTNG_ERR_UNK
;
2185 struct relay_session
*session
= conn
->session
;
2186 struct lttcomm_relayd_rotate_streams rotate_streams
;
2187 struct lttcomm_relayd_generic_reply reply
= {};
2188 struct relay_stream
*stream
= NULL
;
2189 const size_t header_len
= sizeof(struct lttcomm_relayd_rotate_streams
);
2190 struct lttng_trace_chunk
*next_trace_chunk
= NULL
;
2191 struct lttng_buffer_view stream_positions
;
2192 char chunk_id_buf
[MAX_INT_DEC_LEN(uint64_t)];
2193 const char *chunk_id_str
= "none";
2195 if (!session
|| !conn
->version_check_done
) {
2196 ERR("Trying to rotate a stream before version check");
2201 if (session
->major
== 2 && session
->minor
< 11) {
2202 ERR("Unsupported feature before 2.11");
2207 if (payload
->size
< header_len
) {
2208 ERR("Unexpected payload size in \"relay_rotate_session_stream\": expected >= %zu bytes, got %zu bytes",
2209 header_len
, payload
->size
);
2214 memcpy(&rotate_streams
, payload
->data
, header_len
);
2216 /* Convert header to host endianness. */
2217 rotate_streams
= (typeof(rotate_streams
)) {
2218 .stream_count
= be32toh(rotate_streams
.stream_count
),
2219 .new_chunk_id
= (typeof(rotate_streams
.new_chunk_id
)) {
2220 .is_set
= !!rotate_streams
.new_chunk_id
.is_set
,
2221 .value
= be64toh(rotate_streams
.new_chunk_id
.value
),
2225 if (rotate_streams
.new_chunk_id
.is_set
) {
2227 * Retrieve the trace chunk the stream must transition to. As
2228 * per the protocol, this chunk should have been created
2229 * before this command is received.
2231 next_trace_chunk
= sessiond_trace_chunk_registry_get_chunk(
2232 sessiond_trace_chunk_registry
,
2233 session
->sessiond_uuid
, session
->id
,
2234 rotate_streams
.new_chunk_id
.value
);
2235 if (!next_trace_chunk
) {
2236 char uuid_str
[UUID_STR_LEN
];
2238 lttng_uuid_to_str(session
->sessiond_uuid
, uuid_str
);
2239 ERR("Unknown next trace chunk in ROTATE_STREAMS command: sessiond_uuid = {%s}, session_id = %" PRIu64
2240 ", trace_chunk_id = %" PRIu64
,
2241 uuid_str
, session
->id
,
2242 rotate_streams
.new_chunk_id
.value
);
2243 reply_code
= LTTNG_ERR_INVALID_PROTOCOL
;
2248 ret
= snprintf(chunk_id_buf
, sizeof(chunk_id_buf
), "%" PRIu64
,
2249 rotate_streams
.new_chunk_id
.value
);
2250 if (ret
< 0 || ret
>= sizeof(chunk_id_buf
)) {
2251 chunk_id_str
= "formatting error";
2253 chunk_id_str
= chunk_id_buf
;
2255 session
->has_rotated
= true;
2258 DBG("Rotate %" PRIu32
" streams of session \"%s\" to chunk \"%s\"",
2259 rotate_streams
.stream_count
, session
->session_name
,
2262 stream_positions
= lttng_buffer_view_from_view(payload
,
2263 sizeof(rotate_streams
), -1);
2264 if (!stream_positions
.data
||
2265 stream_positions
.size
<
2266 (rotate_streams
.stream_count
*
2267 sizeof(struct lttcomm_relayd_stream_rotation_position
))) {
2268 reply_code
= LTTNG_ERR_INVALID_PROTOCOL
;
2273 for (i
= 0; i
< rotate_streams
.stream_count
; i
++) {
2274 struct lttcomm_relayd_stream_rotation_position
*position_comm
=
2275 &((typeof(position_comm
)) stream_positions
.data
)[i
];
2276 const struct lttcomm_relayd_stream_rotation_position pos
= {
2277 .stream_id
= be64toh(position_comm
->stream_id
),
2278 .rotate_at_seq_num
= be64toh(
2279 position_comm
->rotate_at_seq_num
),
2282 stream
= stream_get_by_id(pos
.stream_id
);
2284 reply_code
= LTTNG_ERR_INVALID
;
2289 pthread_mutex_lock(&stream
->lock
);
2290 ret
= stream_set_pending_rotation(stream
, next_trace_chunk
,
2291 pos
.rotate_at_seq_num
);
2292 pthread_mutex_unlock(&stream
->lock
);
2294 reply_code
= LTTNG_ERR_FILE_CREATION_ERROR
;
2302 reply_code
= LTTNG_OK
;
2309 reply
.ret_code
= htobe32((uint32_t) reply_code
);
2310 send_ret
= conn
->sock
->ops
->sendmsg(conn
->sock
, &reply
,
2311 sizeof(struct lttcomm_relayd_generic_reply
), 0);
2312 if (send_ret
< (ssize_t
) sizeof(reply
)) {
2313 ERR("Failed to send \"rotate session stream\" command reply (ret = %zd)",
2318 lttng_trace_chunk_put(next_trace_chunk
);
2325 * relay_create_trace_chunk: create a new trace chunk
2327 static int relay_create_trace_chunk(const struct lttcomm_relayd_hdr
*recv_hdr
,
2328 struct relay_connection
*conn
,
2329 const struct lttng_buffer_view
*payload
)
2333 struct relay_session
*session
= conn
->session
;
2334 struct lttcomm_relayd_create_trace_chunk
*msg
;
2335 struct lttcomm_relayd_generic_reply reply
= {};
2336 struct lttng_buffer_view header_view
;
2337 struct lttng_buffer_view chunk_name_view
;
2338 struct lttng_trace_chunk
*chunk
= NULL
, *published_chunk
= NULL
;
2339 enum lttng_error_code reply_code
= LTTNG_OK
;
2340 enum lttng_trace_chunk_status chunk_status
;
2341 struct lttng_directory_handle session_output
;
2343 if (!session
|| !conn
->version_check_done
) {
2344 ERR("Trying to create a trace chunk before version check");
2349 if (session
->major
== 2 && session
->minor
< 11) {
2350 ERR("Chunk creation command is unsupported before 2.11");
2355 header_view
= lttng_buffer_view_from_view(payload
, 0, sizeof(*msg
));
2356 if (!header_view
.data
) {
2357 ERR("Failed to receive payload of chunk creation command");
2362 /* Convert to host endianness. */
2363 msg
= (typeof(msg
)) header_view
.data
;
2364 msg
->chunk_id
= be64toh(msg
->chunk_id
);
2365 msg
->creation_timestamp
= be64toh(msg
->creation_timestamp
);
2366 msg
->override_name_length
= be32toh(msg
->override_name_length
);
2368 chunk
= lttng_trace_chunk_create(
2369 msg
->chunk_id
, msg
->creation_timestamp
);
2371 ERR("Failed to create trace chunk in trace chunk creation command");
2373 reply_code
= LTTNG_ERR_NOMEM
;
2377 if (msg
->override_name_length
) {
2380 chunk_name_view
= lttng_buffer_view_from_view(payload
,
2382 msg
->override_name_length
);
2383 name
= chunk_name_view
.data
;
2384 if (!name
|| name
[msg
->override_name_length
- 1]) {
2385 ERR("Failed to receive payload of chunk creation command");
2387 reply_code
= LTTNG_ERR_INVALID
;
2391 chunk_status
= lttng_trace_chunk_override_name(
2392 chunk
, chunk_name_view
.data
);
2393 switch (chunk_status
) {
2394 case LTTNG_TRACE_CHUNK_STATUS_OK
:
2396 case LTTNG_TRACE_CHUNK_STATUS_INVALID_ARGUMENT
:
2397 ERR("Failed to set the name of new trace chunk in trace chunk creation command (invalid name)");
2398 reply_code
= LTTNG_ERR_INVALID
;
2402 ERR("Failed to set the name of new trace chunk in trace chunk creation command (unknown error)");
2403 reply_code
= LTTNG_ERR_UNK
;
2409 chunk_status
= lttng_trace_chunk_set_credentials_current_user(chunk
);
2410 if (chunk_status
!= LTTNG_TRACE_CHUNK_STATUS_OK
) {
2411 reply_code
= LTTNG_ERR_UNK
;
2416 ret
= session_init_output_directory_handle(
2417 conn
->session
, &session_output
);
2419 reply_code
= LTTNG_ERR_CREATE_DIR_FAIL
;
2422 chunk_status
= lttng_trace_chunk_set_as_owner(chunk
, &session_output
);
2423 lttng_directory_handle_fini(&session_output
);
2424 if (chunk_status
!= LTTNG_TRACE_CHUNK_STATUS_OK
) {
2425 reply_code
= LTTNG_ERR_UNK
;
2430 published_chunk
= sessiond_trace_chunk_registry_publish_chunk(
2431 sessiond_trace_chunk_registry
,
2432 conn
->session
->sessiond_uuid
,
2435 if (!published_chunk
) {
2436 char uuid_str
[UUID_STR_LEN
];
2438 lttng_uuid_to_str(conn
->session
->sessiond_uuid
, uuid_str
);
2439 ERR("Failed to publish chunk: sessiond_uuid = %s, session_id = %" PRIu64
", chunk_id = %" PRIu64
,
2444 reply_code
= LTTNG_ERR_NOMEM
;
2448 pthread_mutex_lock(&conn
->session
->lock
);
2449 if (conn
->session
->pending_closure_trace_chunk
) {
2451 * Invalid; this means a second create_trace_chunk command was
2452 * received before a close_trace_chunk.
2454 ERR("Invalid trace chunk close command received; a trace chunk is already waiting for a trace chunk close command");
2455 reply_code
= LTTNG_ERR_INVALID_PROTOCOL
;
2457 goto end_unlock_session
;
2459 conn
->session
->pending_closure_trace_chunk
=
2460 conn
->session
->current_trace_chunk
;
2461 conn
->session
->current_trace_chunk
= published_chunk
;
2462 published_chunk
= NULL
;
2464 pthread_mutex_unlock(&conn
->session
->lock
);
2466 reply
.ret_code
= htobe32((uint32_t) reply_code
);
2467 send_ret
= conn
->sock
->ops
->sendmsg(conn
->sock
,
2469 sizeof(struct lttcomm_relayd_generic_reply
),
2471 if (send_ret
< (ssize_t
) sizeof(reply
)) {
2472 ERR("Failed to send \"create trace chunk\" command reply (ret = %zd)",
2477 lttng_trace_chunk_put(chunk
);
2478 lttng_trace_chunk_put(published_chunk
);
2483 * relay_close_trace_chunk: close a trace chunk
2485 static int relay_close_trace_chunk(const struct lttcomm_relayd_hdr
*recv_hdr
,
2486 struct relay_connection
*conn
,
2487 const struct lttng_buffer_view
*payload
)
2489 int ret
= 0, buf_ret
;
2491 struct relay_session
*session
= conn
->session
;
2492 struct lttcomm_relayd_close_trace_chunk
*msg
;
2493 struct lttcomm_relayd_close_trace_chunk_reply reply
= {};
2494 struct lttng_buffer_view header_view
;
2495 struct lttng_trace_chunk
*chunk
= NULL
;
2496 enum lttng_error_code reply_code
= LTTNG_OK
;
2497 enum lttng_trace_chunk_status chunk_status
;
2499 LTTNG_OPTIONAL(enum lttng_trace_chunk_command_type
) close_command
= {};
2500 time_t close_timestamp
;
2501 char closed_trace_chunk_path
[LTTNG_PATH_MAX
];
2502 size_t path_length
= 0;
2503 const char *chunk_name
= NULL
;
2504 struct lttng_dynamic_buffer reply_payload
;
2506 lttng_dynamic_buffer_init(&reply_payload
);
2508 if (!session
|| !conn
->version_check_done
) {
2509 ERR("Trying to close a trace chunk before version check");
2514 if (session
->major
== 2 && session
->minor
< 11) {
2515 ERR("Chunk close command is unsupported before 2.11");
2520 header_view
= lttng_buffer_view_from_view(payload
, 0, sizeof(*msg
));
2521 if (!header_view
.data
) {
2522 ERR("Failed to receive payload of chunk close command");
2527 /* Convert to host endianness. */
2528 msg
= (typeof(msg
)) header_view
.data
;
2529 chunk_id
= be64toh(msg
->chunk_id
);
2530 close_timestamp
= (time_t) be64toh(msg
->close_timestamp
);
2531 close_command
= (typeof(close_command
)){
2532 .value
= be32toh(msg
->close_command
.value
),
2533 .is_set
= msg
->close_command
.is_set
,
2536 chunk
= sessiond_trace_chunk_registry_get_chunk(
2537 sessiond_trace_chunk_registry
,
2538 conn
->session
->sessiond_uuid
,
2542 char uuid_str
[UUID_STR_LEN
];
2544 lttng_uuid_to_str(conn
->session
->sessiond_uuid
, uuid_str
);
2545 ERR("Failed to find chunk to close: sessiond_uuid = %s, session_id = %" PRIu64
", chunk_id = %" PRIu64
,
2550 reply_code
= LTTNG_ERR_NOMEM
;
2554 pthread_mutex_lock(&session
->lock
);
2555 if (session
->pending_closure_trace_chunk
&&
2556 session
->pending_closure_trace_chunk
!= chunk
) {
2557 ERR("Trace chunk close command for session \"%s\" does not target the trace chunk pending closure",
2558 session
->session_name
);
2559 reply_code
= LTTNG_ERR_INVALID_PROTOCOL
;
2561 goto end_unlock_session
;
2564 chunk_status
= lttng_trace_chunk_set_close_timestamp(
2565 chunk
, close_timestamp
);
2566 if (chunk_status
!= LTTNG_TRACE_CHUNK_STATUS_OK
) {
2567 ERR("Failed to set trace chunk close timestamp");
2569 reply_code
= LTTNG_ERR_UNK
;
2570 goto end_unlock_session
;
2573 if (close_command
.is_set
) {
2574 chunk_status
= lttng_trace_chunk_set_close_command(
2575 chunk
, close_command
.value
);
2576 if (chunk_status
!= LTTNG_TRACE_CHUNK_STATUS_OK
) {
2578 reply_code
= LTTNG_ERR_INVALID
;
2579 goto end_unlock_session
;
2582 chunk_status
= lttng_trace_chunk_get_name(chunk
, &chunk_name
, NULL
);
2583 if (chunk_status
!= LTTNG_TRACE_CHUNK_STATUS_OK
) {
2584 ERR("Failed to get chunk name");
2586 reply_code
= LTTNG_ERR_UNK
;
2587 goto end_unlock_session
;
2589 if (!session
->has_rotated
&& !session
->snapshot
) {
2590 ret
= lttng_strncpy(closed_trace_chunk_path
,
2591 session
->output_path
,
2592 sizeof(closed_trace_chunk_path
));
2594 ERR("Failed to send trace chunk path: path length of %zu bytes exceeds the maximal allowed length of %zu bytes",
2595 strlen(session
->output_path
),
2596 sizeof(closed_trace_chunk_path
));
2597 reply_code
= LTTNG_ERR_NOMEM
;
2599 goto end_unlock_session
;
2602 if (session
->snapshot
) {
2603 ret
= snprintf(closed_trace_chunk_path
,
2604 sizeof(closed_trace_chunk_path
),
2605 "%s/%s", session
->output_path
,
2608 ret
= snprintf(closed_trace_chunk_path
,
2609 sizeof(closed_trace_chunk_path
),
2610 "%s/" DEFAULT_ARCHIVED_TRACE_CHUNKS_DIRECTORY
2612 session
->output_path
, chunk_name
);
2614 if (ret
< 0 || ret
== sizeof(closed_trace_chunk_path
)) {
2615 ERR("Failed to format closed trace chunk resulting path");
2616 reply_code
= ret
< 0 ? LTTNG_ERR_UNK
: LTTNG_ERR_NOMEM
;
2618 goto end_unlock_session
;
2621 DBG("Reply chunk path on close: %s", closed_trace_chunk_path
);
2622 path_length
= strlen(closed_trace_chunk_path
) + 1;
2623 if (path_length
> UINT32_MAX
) {
2624 ERR("Closed trace chunk path exceeds the maximal length allowed by the protocol");
2626 reply_code
= LTTNG_ERR_INVALID_PROTOCOL
;
2627 goto end_unlock_session
;
2630 if (session
->current_trace_chunk
== chunk
) {
2632 * After a trace chunk close command, no new streams
2633 * referencing the chunk may be created. Hence, on the
2634 * event that no new trace chunk have been created for
2635 * the session, the reference to the current trace chunk
2636 * is released in order to allow it to be reclaimed when
2637 * the last stream releases its reference to it.
2639 lttng_trace_chunk_put(session
->current_trace_chunk
);
2640 session
->current_trace_chunk
= NULL
;
2642 lttng_trace_chunk_put(session
->pending_closure_trace_chunk
);
2643 session
->pending_closure_trace_chunk
= NULL
;
2645 pthread_mutex_unlock(&session
->lock
);
2648 reply
.generic
.ret_code
= htobe32((uint32_t) reply_code
);
2649 reply
.path_length
= htobe32((uint32_t) path_length
);
2650 buf_ret
= lttng_dynamic_buffer_append(
2651 &reply_payload
, &reply
, sizeof(reply
));
2653 ERR("Failed to append \"close trace chunk\" command reply header to payload buffer");
2657 if (reply_code
== LTTNG_OK
) {
2658 buf_ret
= lttng_dynamic_buffer_append(&reply_payload
,
2659 closed_trace_chunk_path
, path_length
);
2661 ERR("Failed to append \"close trace chunk\" command reply path to payload buffer");
2666 send_ret
= conn
->sock
->ops
->sendmsg(conn
->sock
,
2670 if (send_ret
< reply_payload
.size
) {
2671 ERR("Failed to send \"close trace chunk\" command reply of %zu bytes (ret = %zd)",
2672 reply_payload
.size
, send_ret
);
2677 lttng_trace_chunk_put(chunk
);
2678 lttng_dynamic_buffer_reset(&reply_payload
);
2683 * relay_trace_chunk_exists: check if a trace chunk exists
2685 static int relay_trace_chunk_exists(const struct lttcomm_relayd_hdr
*recv_hdr
,
2686 struct relay_connection
*conn
,
2687 const struct lttng_buffer_view
*payload
)
2691 struct relay_session
*session
= conn
->session
;
2692 struct lttcomm_relayd_trace_chunk_exists
*msg
;
2693 struct lttcomm_relayd_trace_chunk_exists_reply reply
= {};
2694 struct lttng_buffer_view header_view
;
2698 if (!session
|| !conn
->version_check_done
) {
2699 ERR("Trying to close a trace chunk before version check");
2704 if (session
->major
== 2 && session
->minor
< 11) {
2705 ERR("Chunk close command is unsupported before 2.11");
2710 header_view
= lttng_buffer_view_from_view(payload
, 0, sizeof(*msg
));
2711 if (!header_view
.data
) {
2712 ERR("Failed to receive payload of chunk close command");
2717 /* Convert to host endianness. */
2718 msg
= (typeof(msg
)) header_view
.data
;
2719 chunk_id
= be64toh(msg
->chunk_id
);
2721 ret
= sessiond_trace_chunk_registry_chunk_exists(
2722 sessiond_trace_chunk_registry
,
2723 conn
->session
->sessiond_uuid
,
2725 chunk_id
, &chunk_exists
);
2727 * If ret is not 0, send the reply and report the error to the caller.
2728 * It is a protocol (or internal) error and the session/connection
2729 * should be torn down.
2731 reply
= (typeof(reply
)){
2732 .generic
.ret_code
= htobe32((uint32_t)
2733 (ret
== 0 ? LTTNG_OK
: LTTNG_ERR_INVALID_PROTOCOL
)),
2734 .trace_chunk_exists
= ret
== 0 ? chunk_exists
: 0,
2736 send_ret
= conn
->sock
->ops
->sendmsg(
2737 conn
->sock
, &reply
, sizeof(reply
), 0);
2738 if (send_ret
< (ssize_t
) sizeof(reply
)) {
2739 ERR("Failed to send \"create trace chunk\" command reply (ret = %zd)",
2747 #define DBG_CMD(cmd_name, conn) \
2748 DBG3("Processing \"%s\" command for socket %i", cmd_name, conn->sock->fd);
2750 static int relay_process_control_command(struct relay_connection
*conn
,
2751 const struct lttcomm_relayd_hdr
*header
,
2752 const struct lttng_buffer_view
*payload
)
2756 switch (header
->cmd
) {
2757 case RELAYD_CREATE_SESSION
:
2758 DBG_CMD("RELAYD_CREATE_SESSION", conn
);
2759 ret
= relay_create_session(header
, conn
, payload
);
2761 case RELAYD_ADD_STREAM
:
2762 DBG_CMD("RELAYD_ADD_STREAM", conn
);
2763 ret
= relay_add_stream(header
, conn
, payload
);
2765 case RELAYD_START_DATA
:
2766 DBG_CMD("RELAYD_START_DATA", conn
);
2767 ret
= relay_start(header
, conn
, payload
);
2769 case RELAYD_SEND_METADATA
:
2770 DBG_CMD("RELAYD_SEND_METADATA", conn
);
2771 ret
= relay_recv_metadata(header
, conn
, payload
);
2773 case RELAYD_VERSION
:
2774 DBG_CMD("RELAYD_VERSION", conn
);
2775 ret
= relay_send_version(header
, conn
, payload
);
2777 case RELAYD_CLOSE_STREAM
:
2778 DBG_CMD("RELAYD_CLOSE_STREAM", conn
);
2779 ret
= relay_close_stream(header
, conn
, payload
);
2781 case RELAYD_DATA_PENDING
:
2782 DBG_CMD("RELAYD_DATA_PENDING", conn
);
2783 ret
= relay_data_pending(header
, conn
, payload
);
2785 case RELAYD_QUIESCENT_CONTROL
:
2786 DBG_CMD("RELAYD_QUIESCENT_CONTROL", conn
);
2787 ret
= relay_quiescent_control(header
, conn
, payload
);
2789 case RELAYD_BEGIN_DATA_PENDING
:
2790 DBG_CMD("RELAYD_BEGIN_DATA_PENDING", conn
);
2791 ret
= relay_begin_data_pending(header
, conn
, payload
);
2793 case RELAYD_END_DATA_PENDING
:
2794 DBG_CMD("RELAYD_END_DATA_PENDING", conn
);
2795 ret
= relay_end_data_pending(header
, conn
, payload
);
2797 case RELAYD_SEND_INDEX
:
2798 DBG_CMD("RELAYD_SEND_INDEX", conn
);
2799 ret
= relay_recv_index(header
, conn
, payload
);
2801 case RELAYD_STREAMS_SENT
:
2802 DBG_CMD("RELAYD_STREAMS_SENT", conn
);
2803 ret
= relay_streams_sent(header
, conn
, payload
);
2805 case RELAYD_RESET_METADATA
:
2806 DBG_CMD("RELAYD_RESET_METADATA", conn
);
2807 ret
= relay_reset_metadata(header
, conn
, payload
);
2809 case RELAYD_ROTATE_STREAMS
:
2810 DBG_CMD("RELAYD_ROTATE_STREAMS", conn
);
2811 ret
= relay_rotate_session_streams(header
, conn
, payload
);
2813 case RELAYD_CREATE_TRACE_CHUNK
:
2814 DBG_CMD("RELAYD_CREATE_TRACE_CHUNK", conn
);
2815 ret
= relay_create_trace_chunk(header
, conn
, payload
);
2817 case RELAYD_CLOSE_TRACE_CHUNK
:
2818 DBG_CMD("RELAYD_CLOSE_TRACE_CHUNK", conn
);
2819 ret
= relay_close_trace_chunk(header
, conn
, payload
);
2821 case RELAYD_TRACE_CHUNK_EXISTS
:
2822 DBG_CMD("RELAYD_TRACE_CHUNK_EXISTS", conn
);
2823 ret
= relay_trace_chunk_exists(header
, conn
, payload
);
2825 case RELAYD_UPDATE_SYNC_INFO
:
2827 ERR("Received unknown command (%u)", header
->cmd
);
2828 relay_unknown_command(conn
);
2837 static enum relay_connection_status
relay_process_control_receive_payload(
2838 struct relay_connection
*conn
)
2841 enum relay_connection_status status
= RELAY_CONNECTION_STATUS_OK
;
2842 struct lttng_dynamic_buffer
*reception_buffer
=
2843 &conn
->protocol
.ctrl
.reception_buffer
;
2844 struct ctrl_connection_state_receive_payload
*state
=
2845 &conn
->protocol
.ctrl
.state
.receive_payload
;
2846 struct lttng_buffer_view payload_view
;
2848 if (state
->left_to_receive
== 0) {
2849 /* Short-circuit for payload-less commands. */
2850 goto reception_complete
;
2853 ret
= conn
->sock
->ops
->recvmsg(conn
->sock
,
2854 reception_buffer
->data
+ state
->received
,
2855 state
->left_to_receive
, MSG_DONTWAIT
);
2857 if (errno
!= EAGAIN
&& errno
!= EWOULDBLOCK
) {
2858 PERROR("Unable to receive command payload on sock %d",
2860 status
= RELAY_CONNECTION_STATUS_ERROR
;
2863 } else if (ret
== 0) {
2864 DBG("Socket %d performed an orderly shutdown (received EOF)", conn
->sock
->fd
);
2865 status
= RELAY_CONNECTION_STATUS_CLOSED
;
2870 assert(ret
<= state
->left_to_receive
);
2872 state
->left_to_receive
-= ret
;
2873 state
->received
+= ret
;
2875 if (state
->left_to_receive
> 0) {
2877 * Can't transition to the protocol's next state, wait to
2878 * receive the rest of the header.
2880 DBG3("Partial reception of control connection protocol payload (received %" PRIu64
" bytes, %" PRIu64
" bytes left to receive, fd = %i)",
2881 state
->received
, state
->left_to_receive
,
2887 DBG("Done receiving control command payload: fd = %i, payload size = %" PRIu64
" bytes",
2888 conn
->sock
->fd
, state
->received
);
2890 * The payload required to process the command has been received.
2891 * A view to the reception buffer is forwarded to the various
2892 * commands and the state of the control is reset on success.
2894 * Commands are responsible for sending their reply to the peer.
2896 payload_view
= lttng_buffer_view_from_dynamic_buffer(reception_buffer
,
2898 ret
= relay_process_control_command(conn
,
2899 &state
->header
, &payload_view
);
2901 status
= RELAY_CONNECTION_STATUS_ERROR
;
2905 ret
= connection_reset_protocol_state(conn
);
2907 status
= RELAY_CONNECTION_STATUS_ERROR
;
2913 static enum relay_connection_status
relay_process_control_receive_header(
2914 struct relay_connection
*conn
)
2917 enum relay_connection_status status
= RELAY_CONNECTION_STATUS_OK
;
2918 struct lttcomm_relayd_hdr header
;
2919 struct lttng_dynamic_buffer
*reception_buffer
=
2920 &conn
->protocol
.ctrl
.reception_buffer
;
2921 struct ctrl_connection_state_receive_header
*state
=
2922 &conn
->protocol
.ctrl
.state
.receive_header
;
2924 assert(state
->left_to_receive
!= 0);
2926 ret
= conn
->sock
->ops
->recvmsg(conn
->sock
,
2927 reception_buffer
->data
+ state
->received
,
2928 state
->left_to_receive
, MSG_DONTWAIT
);
2930 if (errno
!= EAGAIN
&& errno
!= EWOULDBLOCK
) {
2931 PERROR("Unable to receive control command header on sock %d",
2933 status
= RELAY_CONNECTION_STATUS_ERROR
;
2936 } else if (ret
== 0) {
2937 DBG("Socket %d performed an orderly shutdown (received EOF)", conn
->sock
->fd
);
2938 status
= RELAY_CONNECTION_STATUS_CLOSED
;
2943 assert(ret
<= state
->left_to_receive
);
2945 state
->left_to_receive
-= ret
;
2946 state
->received
+= ret
;
2948 if (state
->left_to_receive
> 0) {
2950 * Can't transition to the protocol's next state, wait to
2951 * receive the rest of the header.
2953 DBG3("Partial reception of control connection protocol header (received %" PRIu64
" bytes, %" PRIu64
" bytes left to receive, fd = %i)",
2954 state
->received
, state
->left_to_receive
,
2959 /* Transition to next state: receiving the command's payload. */
2960 conn
->protocol
.ctrl
.state_id
=
2961 CTRL_CONNECTION_STATE_RECEIVE_PAYLOAD
;
2962 memcpy(&header
, reception_buffer
->data
, sizeof(header
));
2963 header
.circuit_id
= be64toh(header
.circuit_id
);
2964 header
.data_size
= be64toh(header
.data_size
);
2965 header
.cmd
= be32toh(header
.cmd
);
2966 header
.cmd_version
= be32toh(header
.cmd_version
);
2967 memcpy(&conn
->protocol
.ctrl
.state
.receive_payload
.header
,
2968 &header
, sizeof(header
));
2970 DBG("Done receiving control command header: fd = %i, cmd = %" PRIu32
", cmd_version = %" PRIu32
", payload size = %" PRIu64
" bytes",
2971 conn
->sock
->fd
, header
.cmd
, header
.cmd_version
,
2974 if (header
.data_size
> DEFAULT_NETWORK_RELAYD_CTRL_MAX_PAYLOAD_SIZE
) {
2975 ERR("Command header indicates a payload (%" PRIu64
" bytes) that exceeds the maximal payload size allowed on a control connection.",
2977 status
= RELAY_CONNECTION_STATUS_ERROR
;
2981 conn
->protocol
.ctrl
.state
.receive_payload
.left_to_receive
=
2983 conn
->protocol
.ctrl
.state
.receive_payload
.received
= 0;
2984 ret
= lttng_dynamic_buffer_set_size(reception_buffer
,
2987 status
= RELAY_CONNECTION_STATUS_ERROR
;
2991 if (header
.data_size
== 0) {
2993 * Manually invoke the next state as the poll loop
2994 * will not wake-up to allow us to proceed further.
2996 status
= relay_process_control_receive_payload(conn
);
3003 * Process the commands received on the control socket
3005 static enum relay_connection_status
relay_process_control(
3006 struct relay_connection
*conn
)
3008 enum relay_connection_status status
;
3010 switch (conn
->protocol
.ctrl
.state_id
) {
3011 case CTRL_CONNECTION_STATE_RECEIVE_HEADER
:
3012 status
= relay_process_control_receive_header(conn
);
3014 case CTRL_CONNECTION_STATE_RECEIVE_PAYLOAD
:
3015 status
= relay_process_control_receive_payload(conn
);
3018 ERR("Unknown control connection protocol state encountered.");
3025 static enum relay_connection_status
relay_process_data_receive_header(
3026 struct relay_connection
*conn
)
3029 enum relay_connection_status status
= RELAY_CONNECTION_STATUS_OK
;
3030 struct data_connection_state_receive_header
*state
=
3031 &conn
->protocol
.data
.state
.receive_header
;
3032 struct lttcomm_relayd_data_hdr header
;
3033 struct relay_stream
*stream
;
3035 assert(state
->left_to_receive
!= 0);
3037 ret
= conn
->sock
->ops
->recvmsg(conn
->sock
,
3038 state
->header_reception_buffer
+ state
->received
,
3039 state
->left_to_receive
, MSG_DONTWAIT
);
3041 if (errno
!= EAGAIN
&& errno
!= EWOULDBLOCK
) {
3042 PERROR("Unable to receive data header on sock %d", conn
->sock
->fd
);
3043 status
= RELAY_CONNECTION_STATUS_ERROR
;
3046 } else if (ret
== 0) {
3047 /* Orderly shutdown. Not necessary to print an error. */
3048 DBG("Socket %d performed an orderly shutdown (received EOF)", conn
->sock
->fd
);
3049 status
= RELAY_CONNECTION_STATUS_CLOSED
;
3054 assert(ret
<= state
->left_to_receive
);
3056 state
->left_to_receive
-= ret
;
3057 state
->received
+= ret
;
3059 if (state
->left_to_receive
> 0) {
3061 * Can't transition to the protocol's next state, wait to
3062 * receive the rest of the header.
3064 DBG3("Partial reception of data connection header (received %" PRIu64
" bytes, %" PRIu64
" bytes left to receive, fd = %i)",
3065 state
->received
, state
->left_to_receive
,
3070 /* Transition to next state: receiving the payload. */
3071 conn
->protocol
.data
.state_id
= DATA_CONNECTION_STATE_RECEIVE_PAYLOAD
;
3073 memcpy(&header
, state
->header_reception_buffer
, sizeof(header
));
3074 header
.circuit_id
= be64toh(header
.circuit_id
);
3075 header
.stream_id
= be64toh(header
.stream_id
);
3076 header
.data_size
= be32toh(header
.data_size
);
3077 header
.net_seq_num
= be64toh(header
.net_seq_num
);
3078 header
.padding_size
= be32toh(header
.padding_size
);
3079 memcpy(&conn
->protocol
.data
.state
.receive_payload
.header
, &header
, sizeof(header
));
3081 conn
->protocol
.data
.state
.receive_payload
.left_to_receive
=
3083 conn
->protocol
.data
.state
.receive_payload
.received
= 0;
3084 conn
->protocol
.data
.state
.receive_payload
.rotate_index
= false;
3086 DBG("Received data connection header on fd %i: circuit_id = %" PRIu64
", stream_id = %" PRIu64
", data_size = %" PRIu32
", net_seq_num = %" PRIu64
", padding_size = %" PRIu32
,
3087 conn
->sock
->fd
, header
.circuit_id
,
3088 header
.stream_id
, header
.data_size
,
3089 header
.net_seq_num
, header
.padding_size
);
3091 stream
= stream_get_by_id(header
.stream_id
);
3093 DBG("relay_process_data_receive_payload: Cannot find stream %" PRIu64
,
3095 /* Protocol error. */
3096 status
= RELAY_CONNECTION_STATUS_ERROR
;
3100 pthread_mutex_lock(&stream
->lock
);
3101 /* Prepare stream for the reception of a new packet. */
3102 ret
= stream_init_packet(stream
, header
.data_size
,
3103 &conn
->protocol
.data
.state
.receive_payload
.rotate_index
);
3104 pthread_mutex_unlock(&stream
->lock
);
3106 ERR("Failed to rotate stream output file");
3107 status
= RELAY_CONNECTION_STATUS_ERROR
;
3108 goto end_stream_unlock
;
3117 static enum relay_connection_status
relay_process_data_receive_payload(
3118 struct relay_connection
*conn
)
3121 enum relay_connection_status status
= RELAY_CONNECTION_STATUS_OK
;
3122 struct relay_stream
*stream
;
3123 struct data_connection_state_receive_payload
*state
=
3124 &conn
->protocol
.data
.state
.receive_payload
;
3125 const size_t chunk_size
= RECV_DATA_BUFFER_SIZE
;
3126 char data_buffer
[chunk_size
];
3127 bool partial_recv
= false;
3128 bool new_stream
= false, close_requested
= false, index_flushed
= false;
3129 uint64_t left_to_receive
= state
->left_to_receive
;
3130 struct relay_session
*session
;
3132 DBG3("Receiving data for stream id %" PRIu64
" seqnum %" PRIu64
", %" PRIu64
" bytes received, %" PRIu64
" bytes left to receive",
3133 state
->header
.stream_id
, state
->header
.net_seq_num
,
3134 state
->received
, left_to_receive
);
3136 stream
= stream_get_by_id(state
->header
.stream_id
);
3138 /* Protocol error. */
3139 ERR("relay_process_data_receive_payload: cannot find stream %" PRIu64
,
3140 state
->header
.stream_id
);
3141 status
= RELAY_CONNECTION_STATUS_ERROR
;
3145 pthread_mutex_lock(&stream
->lock
);
3146 session
= stream
->trace
->session
;
3147 if (!conn
->session
) {
3148 ret
= connection_set_session(conn
, session
);
3150 status
= RELAY_CONNECTION_STATUS_ERROR
;
3151 goto end_stream_unlock
;
3156 * The size of the "chunk" received on any iteration is bounded by:
3157 * - the data left to receive,
3158 * - the data immediately available on the socket,
3159 * - the on-stack data buffer
3161 while (left_to_receive
> 0 && !partial_recv
) {