X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.c;h=25a32a70b7b8488435474461e7a92ea9b161bf73;hp=2ffa75078779b7d0b9986696f5643267ca5ea7aa;hb=7ceefac4c8474606312e25ab822e510c41a3d644;hpb=8d382dd4d4e95ea6ff88d6bd9f8a8fc85970ee3b diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index 2ffa75078..25a32a70b 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -34,6 +34,7 @@ #include #include #include +#include #include #include #include @@ -41,6 +42,7 @@ #include #include #include +#include #include #include @@ -61,22 +63,26 @@ #include #include #include +#include +#include +#include "backward-compatibility-group-by.h" #include "cmd.h" +#include "connection.h" #include "ctf-trace.h" +#include "health-relayd.h" #include "index.h" -#include "utils.h" -#include "lttng-relayd.h" #include "live.h" -#include "health-relayd.h" -#include "testpoint.h" -#include "viewer-stream.h" +#include "lttng-relayd.h" #include "session.h" +#include "sessiond-trace-chunks.h" #include "stream.h" -#include "connection.h" -#include "tracefile-array.h" #include "tcp_keep_alive.h" -#include "sessiond-trace-chunks.h" +#include "testpoint.h" +#include "tracefile-array.h" +#include "utils.h" +#include "version.h" +#include "viewer-stream.h" static const char *help_msg = #ifdef LTTNG_EMBED_HELP @@ -95,8 +101,9 @@ enum relay_connection_status { }; /* command line options */ -char *opt_output_path; -static int opt_daemon, opt_background; +char *opt_output_path, *opt_working_directory; +static int opt_daemon, opt_background, opt_print_version, opt_allow_clear = 1; +enum relay_group_output_by opt_group_output_by = RELAYD_GROUP_OUTPUT_BY_UNKNOWN; /* * We need to wait for listener and live listener threads, as well as @@ -157,6 +164,9 @@ static uint64_t last_relay_stream_id; */ static struct relay_conn_queue relay_conn_queue; +/* Cap of file desriptors to be in simultaneous use by the relay daemon. */ +static unsigned int lttng_opt_fd_pool_size = -1; + /* Global relay stream hash table. */ struct lttng_ht *relay_streams_ht; @@ -171,6 +181,9 @@ struct health_app *health_relayd; struct sessiond_trace_chunk_registry *sessiond_trace_chunk_registry; +/* Global fd tracker. */ +struct fd_tracker *the_fd_tracker; + static struct option long_options[] = { { "control-port", 1, 0, 'C', }, { "data-port", 1, 0, 'D', }, @@ -178,16 +191,38 @@ static struct option long_options[] = { { "daemonize", 0, 0, 'd', }, { "background", 0, 0, 'b', }, { "group", 1, 0, 'g', }, + { "fd-pool-size", 1, 0, '\0', }, { "help", 0, 0, 'h', }, { "output", 1, 0, 'o', }, { "verbose", 0, 0, 'v', }, { "config", 1, 0, 'f' }, { "version", 0, 0, 'V' }, + { "working-directory", 1, 0, 'w', }, + { "group-output-by-session", 0, 0, 's', }, + { "group-output-by-host", 0, 0, 'p', }, + { "disallow-clear", 0, 0, 'x' }, { NULL, 0, 0, 0, }, }; static const char *config_ignore_options[] = { "help", "config", "version" }; +static void print_version(void) { + fprintf(stdout, "%s\n", VERSION); +} + +static void relayd_config_log(void) +{ + DBG("LTTng-relayd " VERSION " - " VERSION_NAME "%s%s", + GIT_VERSION[0] == '\0' ? "" : " - " GIT_VERSION, + EXTRA_VERSION_NAME[0] == '\0' ? "" : " - " EXTRA_VERSION_NAME); + if (EXTRA_VERSION_DESCRIPTION[0] != '\0') { + DBG("LTTng-relayd extra version description:\n\t" EXTRA_VERSION_DESCRIPTION "\n"); + } + if (EXTRA_VERSION_PATCHES[0] != '\0') { + DBG("LTTng-relayd extra patches:\n\t" EXTRA_VERSION_PATCHES "\n"); + } +} + /* * Take an option from the getopt output and set it in the right variable to be * used later. @@ -200,9 +235,27 @@ static int set_option(int opt, const char *arg, const char *optname) switch (opt) { case 0: - fprintf(stderr, "option %s", optname); - if (arg) { - fprintf(stderr, " with arg %s\n", arg); + if (!strcmp(optname, "fd-pool-size")) { + unsigned long v; + + errno = 0; + v = strtoul(arg, NULL, 0); + if (errno != 0 || !isdigit(arg[0])) { + ERR("Wrong value in --fd-pool-size parameter: %s", arg); + ret = -1; + goto end; + } + if (v >= UINT_MAX) { + ERR("File descriptor cap overflow in --fd-pool-size parameter: %s", arg); + ret = -1; + goto end; + } + lttng_opt_fd_pool_size = (unsigned int) v; + } else { + fprintf(stderr, "unknown option %s", optname); + if (arg) { + fprintf(stderr, " with arg %s\n", arg); + } } break; case 'C': @@ -278,8 +331,8 @@ static int set_option(int opt, const char *arg, const char *optname) } exit(EXIT_FAILURE); case 'V': - fprintf(stdout, "%s\n", VERSION); - exit(EXIT_SUCCESS); + opt_print_version = 1; + break; case 'o': if (lttng_is_setuid_setgid()) { WARN("Getting '%s' argument from setuid/setgid binary refused for security reasons.", @@ -293,6 +346,20 @@ static int set_option(int opt, const char *arg, const char *optname) } } break; + case 'w': + if (lttng_is_setuid_setgid()) { + WARN("Getting '%s' argument from setuid/setgid binary refused for security reasons.", + "-w, --working-directory"); + } else { + ret = asprintf(&opt_working_directory, "%s", arg); + if (ret < 0) { + ret = -errno; + PERROR("asprintf opt_working_directory"); + goto end; + } + } + break; + case 'v': /* Verbose level can increase using multiple -v */ if (arg) { @@ -304,6 +371,24 @@ static int set_option(int opt, const char *arg, const char *optname) } } break; + case 's': + if (opt_group_output_by != RELAYD_GROUP_OUTPUT_BY_UNKNOWN) { + ERR("Cannot set --group-output-by-session, another --group-output-by argument is present"); + exit(EXIT_FAILURE); + } + opt_group_output_by = RELAYD_GROUP_OUTPUT_BY_SESSION; + break; + case 'p': + if (opt_group_output_by != RELAYD_GROUP_OUTPUT_BY_UNKNOWN) { + ERR("Cannot set --group-output-by-host, another --group-output-by argument is present"); + exit(EXIT_FAILURE); + } + opt_group_output_by = RELAYD_GROUP_OUTPUT_BY_HOST; + break; + case 'x': + /* Disallow clear */ + opt_allow_clear = 0; + break; default: /* Unknown option or other error. * Error is printed by getopt, just return */ @@ -373,6 +458,73 @@ end: return ret; } +static int parse_env_options(void) +{ + int ret = 0; + char *value = NULL; + + value = lttng_secure_getenv(DEFAULT_LTTNG_RELAYD_WORKING_DIRECTORY_ENV); + if (value) { + opt_working_directory = strdup(value); + if (!opt_working_directory) { + ERR("Failed to allocate working directory string (\"%s\")", + value); + ret = -1; + } + } + return ret; +} + +static int set_fd_pool_size(void) +{ + int ret = 0; + struct rlimit rlimit; + + ret = getrlimit(RLIMIT_NOFILE, &rlimit); + if (ret) { + PERROR("Failed to get file descriptor limit"); + ret = -1; + goto end; + } + + DBG("File descriptor count limits are %" PRIu64 " (soft) and %" PRIu64 " (hard)", + (uint64_t) rlimit.rlim_cur, + (uint64_t) rlimit.rlim_max); + if (lttng_opt_fd_pool_size == -1) { + /* Use default value (soft limit - reserve). */ + if (rlimit.rlim_cur < DEFAULT_RELAYD_MIN_FD_POOL_SIZE) { + ERR("The process' file number limit is too low (%" PRIu64 "). The process' file number limit must be set to at least %i.", + (uint64_t) rlimit.rlim_cur, DEFAULT_RELAYD_MIN_FD_POOL_SIZE); + ret = -1; + goto end; + } + lttng_opt_fd_pool_size = rlimit.rlim_cur - + DEFAULT_RELAYD_FD_POOL_SIZE_RESERVE; + goto end; + } + + if (lttng_opt_fd_pool_size < DEFAULT_RELAYD_MIN_FD_POOL_SIZE) { + ERR("File descriptor pool size must be set to at least %d", + DEFAULT_RELAYD_MIN_FD_POOL_SIZE); + ret = -1; + goto end; + } + + if (lttng_opt_fd_pool_size > rlimit.rlim_cur) { + ERR("File descriptor pool size argument (%u) exceeds the process' soft limit (%" PRIu64 ").", + lttng_opt_fd_pool_size, (uint64_t) rlimit.rlim_cur); + ret = -1; + goto end; + } + + DBG("File descriptor pool size argument (%u) adjusted to %u to accomodate transient fd uses", + lttng_opt_fd_pool_size, + lttng_opt_fd_pool_size - DEFAULT_RELAYD_FD_POOL_SIZE_RESERVE); + lttng_opt_fd_pool_size -= DEFAULT_RELAYD_FD_POOL_SIZE_RESERVE; +end: + return ret; +} + static int set_options(int argc, char **argv) { int c, ret = 0, option_index = 0, retval = 0; @@ -490,6 +642,28 @@ static int set_options(int argc, char **argv) goto exit; } } + ret = set_fd_pool_size(); + if (ret) { + retval = -1; + goto exit; + } + + if (opt_group_output_by == RELAYD_GROUP_OUTPUT_BY_UNKNOWN) { + opt_group_output_by = RELAYD_GROUP_OUTPUT_BY_HOST; + } + if (opt_allow_clear) { + /* Check if env variable exists. */ + const char *value = lttng_secure_getenv(DEFAULT_LTTNG_RELAYD_DISALLOW_CLEAR_ENV); + if (value) { + ret = config_parse_value(value); + if (ret < 0) { + ERR("Invalid value for %s specified", DEFAULT_LTTNG_RELAYD_DISALLOW_CLEAR_ENV); + retval = -1; + goto exit; + } + opt_allow_clear = !ret; + } + } exit: free(optstring); @@ -498,13 +672,26 @@ exit: static void print_global_objects(void) { - rcu_register_thread(); - print_viewer_streams(); print_relay_streams(); print_sessions(); +} - rcu_unregister_thread(); +static int noop_close(void *data, int *fds) +{ + return 0; +} + +static void untrack_stdio(void) +{ + int fds[] = { fileno(stdout), fileno(stderr) }; + + /* + * noop_close is used since we don't really want to close + * the stdio output fds; we merely want to stop tracking them. + */ + (void) fd_tracker_close_unsuspendable_fd(the_fd_tracker, + fds, 2, noop_close, NULL); } /* @@ -523,11 +710,33 @@ static void relayd_cleanup(void) if (sessions_ht) lttng_ht_destroy(sessions_ht); - /* free the dynamically allocated opt_output_path */ free(opt_output_path); + free(opt_working_directory); + if (health_relayd) { + health_app_destroy(health_relayd); + } /* Close thread quit pipes */ - utils_close_pipe(thread_quit_pipe); + if (health_quit_pipe[0] != -1) { + (void) fd_tracker_util_pipe_close( + the_fd_tracker, health_quit_pipe); + } + if (thread_quit_pipe[0] != -1) { + (void) fd_tracker_util_pipe_close( + the_fd_tracker, thread_quit_pipe); + } + if (sessiond_trace_chunk_registry) { + sessiond_trace_chunk_registry_destroy( + sessiond_trace_chunk_registry); + } + if (the_fd_tracker) { + untrack_stdio(); + /* + * fd_tracker_destroy() will log the contents of the fd-tracker + * if a leak is detected. + */ + fd_tracker_destroy(the_fd_tracker); + } uri_free(control_uri); uri_free(data_uri); @@ -689,17 +898,26 @@ void lttng_relay_notify_ready(void) */ static int init_thread_quit_pipe(void) { - int ret; - - ret = utils_create_pipe_cloexec(thread_quit_pipe); + return fd_tracker_util_pipe_open_cloexec( + the_fd_tracker, "Quit pipe", thread_quit_pipe); +} - return ret; +/* + * Init health quit pipe. + * + * Return -1 on error or 0 if all pipes are created. + */ +static int init_health_quit_pipe(void) +{ + return fd_tracker_util_pipe_open_cloexec(the_fd_tracker, + "Health quit pipe", health_quit_pipe); } /* * Create a poll set with O_CLOEXEC and add the thread quit pipe to the set. */ -static int create_thread_poll_set(struct lttng_poll_event *events, int size) +static int create_named_thread_poll_set(struct lttng_poll_event *events, + int size, const char *name) { int ret; @@ -708,10 +926,8 @@ static int create_thread_poll_set(struct lttng_poll_event *events, int size) goto error; } - ret = lttng_poll_create(events, size, LTTNG_CLOEXEC); - if (ret < 0) { - goto error; - } + ret = fd_tracker_util_poll_create(the_fd_tracker, + name, events, 1, LTTNG_CLOEXEC); /* Add quit pipe */ ret = lttng_poll_add(events, thread_quit_pipe[0], LPOLLIN | LPOLLERR); @@ -739,13 +955,38 @@ static int check_thread_quit_pipe(int fd, uint32_t events) return 0; } +static int create_sock(void *data, int *out_fd) +{ + int ret; + struct lttcomm_sock *sock = data; + + ret = lttcomm_create_sock(sock); + if (ret < 0) { + goto end; + } + + *out_fd = sock->fd; +end: + return ret; +} + +static int close_sock(void *data, int *in_fd) +{ + struct lttcomm_sock *sock = data; + + return sock->ops->close(sock); +} + /* * Create and init socket from uri. */ -static struct lttcomm_sock *relay_socket_create(struct lttng_uri *uri) +static struct lttcomm_sock *relay_socket_create(struct lttng_uri *uri, + const char *name) { - int ret; + int ret, sock_fd; struct lttcomm_sock *sock = NULL; + char uri_str[PATH_MAX]; + char *formated_name = NULL; sock = lttcomm_alloc_sock_from_uri(uri); if (sock == NULL) { @@ -753,11 +994,25 @@ static struct lttcomm_sock *relay_socket_create(struct lttng_uri *uri) goto error; } - ret = lttcomm_create_sock(sock); - if (ret < 0) { - goto error; + /* + * Don't fail to create the socket if the name can't be built as it is + * only used for debugging purposes. + */ + ret = uri_to_str_url(uri, uri_str, sizeof(uri_str)); + uri_str[sizeof(uri_str) - 1] = '\0'; + if (ret >= 0) { + ret = asprintf(&formated_name, "%s socket @ %s", name, + uri_str); + if (ret < 0) { + formated_name = NULL; + } } - DBG("Listening on sock %d", sock->fd); + + ret = fd_tracker_open_unsuspendable_fd(the_fd_tracker, &sock_fd, + (const char **) (formated_name ? &formated_name : NULL), + 1, create_sock, sock); + free(formated_name); + DBG("Listening on %s socket %d", name, sock->fd); ret = sock->ops->bind(sock); if (ret < 0) { @@ -796,12 +1051,12 @@ static void *relay_thread_listener(void *data) health_code_update(); - control_sock = relay_socket_create(control_uri); + control_sock = relay_socket_create(control_uri, "Control listener"); if (!control_sock) { goto error_sock_control; } - data_sock = relay_socket_create(data_uri); + data_sock = relay_socket_create(data_uri, "Data listener"); if (!data_sock) { goto error_sock_relay; } @@ -810,7 +1065,7 @@ static void *relay_thread_listener(void *data) * Pass 3 as size here for the thread quit pipe, control and * data socket. */ - ret = create_thread_poll_set(&events, 3); + ret = create_named_thread_poll_set(&events, 3, "Listener thread epoll"); if (ret < 0) { goto error_create_poll; } @@ -944,21 +1199,31 @@ exit: error: error_poll_add: error_testpoint: - lttng_poll_clean(&events); + (void) fd_tracker_util_poll_clean(the_fd_tracker, &events); error_create_poll: if (data_sock->fd >= 0) { - ret = data_sock->ops->close(data_sock); + int data_sock_fd = data_sock->fd; + + ret = fd_tracker_close_unsuspendable_fd(the_fd_tracker, + &data_sock_fd, 1, close_sock, + data_sock); if (ret) { - PERROR("close"); + PERROR("Failed to close the data listener socket file descriptor"); } + data_sock->fd = -1; } lttcomm_destroy_sock(data_sock); error_sock_relay: if (control_sock->fd >= 0) { - ret = control_sock->ops->close(control_sock); + int control_sock_fd = control_sock->fd; + + ret = fd_tracker_close_unsuspendable_fd(the_fd_tracker, + &control_sock_fd, 1, close_sock, + control_sock); if (ret) { - PERROR("close"); + PERROR("Failed to close the control listener socket file descriptor"); } + control_sock->fd = -1; } lttcomm_destroy_sock(control_sock); error_sock_control: @@ -1299,12 +1564,57 @@ static int relay_add_stream(const struct lttcomm_relayd_hdr *recv_hdr, goto send_reply; } + /* + * Backward compatibility for --group-output-by-session. + * Prior to lttng 2.11, the complete path is passed by the stream. + * Starting at 2.11, lttng-relayd uses chunk. When dealing with producer + * >=2.11 the chunk is responsible for the output path. When dealing + * with producer < 2.11 the chunk output_path is the root output path + * and the stream carries the complete path (path_name). + * To support --group-output-by-session with older producer (<2.11), we + * need to craft the path based on the stream path. + */ + if (opt_group_output_by == RELAYD_GROUP_OUTPUT_BY_SESSION) { + if (conn->minor < 4) { + /* + * From 2.1 to 2.3, the session_name is not passed on + * the RELAYD_CREATE_SESSION command. The session name + * is necessary to detect the presence of a base_path + * inside the stream path. Without it we cannot perform + * a valid group-output-by-session transformation. + */ + WARN("Unable to perform a --group-by-session transformation for session %" PRIu64 + " for stream with path \"%s\" as it is produced by a peer using a protocol older than v2.4", + session->id, path_name); + } else if (conn->minor >= 4 && conn->minor < 11) { + char *group_by_session_path_name; + + assert(session->session_name[0] != '\0'); + + group_by_session_path_name = + backward_compat_group_by_session( + path_name, + session->session_name); + if (!group_by_session_path_name) { + ERR("Failed to apply group by session to stream of session %" PRIu64, + session->id); + goto send_reply; + } + + DBG("Transformed session path from \"%s\" to \"%s\" to honor per-session name grouping", + path_name, group_by_session_path_name); + + free(path_name); + path_name = group_by_session_path_name; + } + } + trace = ctf_trace_get_by_path_or_create(session, path_name); if (!trace) { goto send_reply; } - /* This stream here has one reference on the trace. */ + /* This stream here has one reference on the trace. */ pthread_mutex_lock(&last_relay_stream_id_lock); stream_handle = ++last_relay_stream_id; pthread_mutex_unlock(&last_relay_stream_id_lock); @@ -1404,7 +1714,7 @@ static int relay_close_stream(const struct lttcomm_relayd_hdr *recv_hdr, vstream = viewer_stream_get_by_id(stream->stream_handle); if (vstream) { - if (vstream->metadata_sent == stream->metadata_received) { + if (stream->no_new_metadata_notified) { /* * Since all the metadata has been sent to the * viewer and that we have a request to close @@ -2071,6 +2381,9 @@ static int relay_recv_index(const struct lttcomm_relayd_hdr *recv_hdr, index_info.stream_instance_id = be64toh(index_info.stream_instance_id); index_info.packet_seq_num = be64toh(index_info.packet_seq_num); + } else { + index_info.stream_instance_id = -1ULL; + index_info.packet_seq_num = -1ULL; } stream = stream_get_by_id(index_info.relay_stream_id); @@ -2215,7 +2528,7 @@ static int relay_rotate_session_streams( session->sessiond_uuid, session->id, rotate_streams.new_chunk_id.value); if (!next_trace_chunk) { - char uuid_str[UUID_STR_LEN]; + char uuid_str[LTTNG_UUID_STR_LEN]; lttng_uuid_to_str(session->sessiond_uuid, uuid_str); ERR("Unknown next trace chunk in ROTATE_STREAMS command: sessiond_uuid = {%s}, session_id = %" PRIu64 @@ -2234,7 +2547,6 @@ static int relay_rotate_session_streams( } else { chunk_id_str = chunk_id_buf; } - session->has_rotated = true; } DBG("Rotate %" PRIu32 " streams of session \"%s\" to chunk \"%s\"", @@ -2282,6 +2594,7 @@ static int relay_rotate_session_streams( } reply_code = LTTNG_OK; + ret = 0; end: if (stream) { stream_put(stream); @@ -2295,8 +2608,6 @@ end: send_ret); ret = -1; } - - ret = 0; end_no_reply: lttng_trace_chunk_put(next_trace_chunk); return ret; @@ -2321,7 +2632,7 @@ static int relay_create_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr, struct lttng_trace_chunk *chunk = NULL, *published_chunk = NULL; enum lttng_error_code reply_code = LTTNG_OK; enum lttng_trace_chunk_status chunk_status; - struct lttng_directory_handle session_output; + const char *new_path; if (!session || !conn->version_check_done) { ERR("Trying to create a trace chunk before version check"); @@ -2348,8 +2659,29 @@ static int relay_create_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr, msg->creation_timestamp = be64toh(msg->creation_timestamp); msg->override_name_length = be32toh(msg->override_name_length); + if (session->current_trace_chunk && + !lttng_trace_chunk_get_name_overridden(session->current_trace_chunk)) { + chunk_status = lttng_trace_chunk_rename_path(session->current_trace_chunk, + DEFAULT_CHUNK_TMP_OLD_DIRECTORY); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + ERR("Failed to rename old chunk"); + ret = -1; + reply_code = LTTNG_ERR_UNK; + goto end; + } + } + session->ongoing_rotation = true; + if (!session->current_trace_chunk) { + if (!session->has_rotated) { + new_path = ""; + } else { + new_path = NULL; + } + } else { + new_path = DEFAULT_CHUNK_TMP_NEW_DIRECTORY; + } chunk = lttng_trace_chunk_create( - msg->chunk_id, msg->creation_timestamp); + msg->chunk_id, msg->creation_timestamp, new_path); if (!chunk) { ERR("Failed to create trace chunk in trace chunk creation command"); ret = -1; @@ -2396,14 +2728,9 @@ static int relay_create_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr, goto end; } - ret = session_init_output_directory_handle( - conn->session, &session_output); - if (ret) { - reply_code = LTTNG_ERR_CREATE_DIR_FAIL; - goto end; - } - chunk_status = lttng_trace_chunk_set_as_owner(chunk, &session_output); - lttng_directory_handle_fini(&session_output); + assert(conn->session->output_directory); + chunk_status = lttng_trace_chunk_set_as_owner(chunk, + conn->session->output_directory); if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { reply_code = LTTNG_ERR_UNK; ret = -1; @@ -2416,7 +2743,7 @@ static int relay_create_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr, conn->session->id, chunk); if (!published_chunk) { - char uuid_str[UUID_STR_LEN]; + char uuid_str[LTTNG_UUID_STR_LEN]; lttng_uuid_to_str(conn->session->sessiond_uuid, uuid_str); ERR("Failed to publish chunk: sessiond_uuid = %s, session_id = %" PRIu64 ", chunk_id = %" PRIu64, @@ -2443,6 +2770,9 @@ static int relay_create_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr, conn->session->current_trace_chunk; conn->session->current_trace_chunk = published_chunk; published_chunk = NULL; + if (!conn->session->pending_closure_trace_chunk) { + session->ongoing_rotation = false; + } end_unlock_session: pthread_mutex_unlock(&conn->session->lock); end: @@ -2469,7 +2799,7 @@ static int relay_close_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr, struct relay_connection *conn, const struct lttng_buffer_view *payload) { - int ret = 0; + int ret = 0, buf_ret; ssize_t send_ret; struct relay_session *session = conn->session; struct lttcomm_relayd_close_trace_chunk *msg; @@ -2485,6 +2815,7 @@ static int relay_close_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr, size_t path_length = 0; const char *chunk_name = NULL; struct lttng_dynamic_buffer reply_payload; + const char *new_path; lttng_dynamic_buffer_init(&reply_payload); @@ -2522,7 +2853,7 @@ static int relay_close_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr, conn->session->id, chunk_id); if (!chunk) { - char uuid_str[UUID_STR_LEN]; + char uuid_str[LTTNG_UUID_STR_LEN]; lttng_uuid_to_str(conn->session->sessiond_uuid, uuid_str); ERR("Failed to find chunk to close: sessiond_uuid = %s, session_id = %" PRIu64 ", chunk_id = %" PRIu64, @@ -2535,6 +2866,20 @@ static int relay_close_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr, } pthread_mutex_lock(&session->lock); + if (close_command.is_set && + close_command.value == LTTNG_TRACE_CHUNK_COMMAND_TYPE_DELETE) { + /* + * Clear command. It is a protocol error to ask for a + * clear on a relay which does not allow it. Querying + * the configuration allows figuring out whether + * clearing is allowed before doing the clear. + */ + if (!opt_allow_clear) { + ret = -1; + reply_code = LTTNG_ERR_INVALID_PROTOCOL; + goto end_unlock_session; + } + } if (session->pending_closure_trace_chunk && session->pending_closure_trace_chunk != chunk) { ERR("Trace chunk close command for session \"%s\" does not target the trace chunk pending closure", @@ -2544,6 +2889,43 @@ static int relay_close_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr, goto end_unlock_session; } + if (session->current_trace_chunk && session->current_trace_chunk != chunk && + !lttng_trace_chunk_get_name_overridden(session->current_trace_chunk)) { + if (close_command.is_set && + close_command.value == LTTNG_TRACE_CHUNK_COMMAND_TYPE_DELETE && + !session->has_rotated) { + /* New chunk stays in session output directory. */ + new_path = ""; + } else { + /* Use chunk name for new chunk. */ + new_path = NULL; + } + /* Rename new chunk path. */ + chunk_status = lttng_trace_chunk_rename_path(session->current_trace_chunk, + new_path); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + ret = -1; + goto end; + } + session->ongoing_rotation = false; + } + if ((!close_command.is_set || + close_command.value == LTTNG_TRACE_CHUNK_COMMAND_TYPE_NO_OPERATION) && + !lttng_trace_chunk_get_name_overridden(chunk)) { + const char *old_path; + + if (!session->has_rotated) { + old_path = ""; + } else { + old_path = NULL; + } + /* We need to move back the .tmp_old_chunk to its rightful place. */ + chunk_status = lttng_trace_chunk_rename_path(chunk, old_path); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + ret = -1; + goto end; + } + } chunk_status = lttng_trace_chunk_set_close_timestamp( chunk, close_timestamp); if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { @@ -2601,6 +2983,10 @@ static int relay_close_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr, goto end_unlock_session; } } + if (close_command.is_set && + close_command.value == LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED) { + session->has_rotated = true; + } DBG("Reply chunk path on close: %s", closed_trace_chunk_path); path_length = strlen(closed_trace_chunk_path) + 1; if (path_length > UINT32_MAX) { @@ -2630,17 +3016,17 @@ end_unlock_session: end: reply.generic.ret_code = htobe32((uint32_t) reply_code); reply.path_length = htobe32((uint32_t) path_length); - ret = lttng_dynamic_buffer_append( + buf_ret = lttng_dynamic_buffer_append( &reply_payload, &reply, sizeof(reply)); - if (ret) { + if (buf_ret) { ERR("Failed to append \"close trace chunk\" command reply header to payload buffer"); goto end_no_reply; } if (reply_code == LTTNG_OK) { - ret = lttng_dynamic_buffer_append(&reply_payload, + buf_ret = lttng_dynamic_buffer_append(&reply_payload, closed_trace_chunk_path, path_length); - if (ret) { + if (buf_ret) { ERR("Failed to append \"close trace chunk\" command reply path to payload buffer"); goto end_no_reply; } @@ -2675,8 +3061,8 @@ static int relay_trace_chunk_exists(const struct lttcomm_relayd_hdr *recv_hdr, struct lttcomm_relayd_trace_chunk_exists *msg; struct lttcomm_relayd_trace_chunk_exists_reply reply = {}; struct lttng_buffer_view header_view; - struct lttng_trace_chunk *chunk = NULL; uint64_t chunk_id; + bool chunk_exists; if (!session || !conn->version_check_done) { ERR("Trying to close a trace chunk before version check"); @@ -2701,25 +3087,80 @@ static int relay_trace_chunk_exists(const struct lttcomm_relayd_hdr *recv_hdr, msg = (typeof(msg)) header_view.data; chunk_id = be64toh(msg->chunk_id); - chunk = sessiond_trace_chunk_registry_get_chunk( + ret = sessiond_trace_chunk_registry_chunk_exists( sessiond_trace_chunk_registry, conn->session->sessiond_uuid, conn->session->id, - chunk_id); - - reply = (typeof(reply)) { - .generic.ret_code = htobe32((uint32_t) LTTNG_OK), - .trace_chunk_exists = !!chunk, + chunk_id, &chunk_exists); + /* + * If ret is not 0, send the reply and report the error to the caller. + * It is a protocol (or internal) error and the session/connection + * should be torn down. + */ + reply = (typeof(reply)){ + .generic.ret_code = htobe32((uint32_t) + (ret == 0 ? LTTNG_OK : LTTNG_ERR_INVALID_PROTOCOL)), + .trace_chunk_exists = ret == 0 ? chunk_exists : 0, }; - send_ret = conn->sock->ops->sendmsg(conn->sock, - &reply, sizeof(reply), 0); + send_ret = conn->sock->ops->sendmsg( + conn->sock, &reply, sizeof(reply), 0); if (send_ret < (ssize_t) sizeof(reply)) { ERR("Failed to send \"create trace chunk\" command reply (ret = %zd)", send_ret); ret = -1; } end_no_reply: - lttng_trace_chunk_put(chunk); + return ret; +} + +/* + * relay_get_configuration: query whether feature is available + */ +static int relay_get_configuration(const struct lttcomm_relayd_hdr *recv_hdr, + struct relay_connection *conn, + const struct lttng_buffer_view *payload) +{ + int ret = 0; + ssize_t send_ret; + struct lttcomm_relayd_get_configuration *msg; + struct lttcomm_relayd_get_configuration_reply reply = {}; + struct lttng_buffer_view header_view; + uint64_t query_flags = 0; + uint64_t result_flags = 0; + + header_view = lttng_buffer_view_from_view(payload, 0, sizeof(*msg)); + if (!header_view.data) { + ERR("Failed to receive payload of chunk close command"); + ret = -1; + goto end_no_reply; + } + + /* Convert to host endianness. */ + msg = (typeof(msg)) header_view.data; + query_flags = be64toh(msg->query_flags); + + if (query_flags) { + ret = LTTNG_ERR_INVALID_PROTOCOL; + goto reply; + } + if (opt_allow_clear) { + result_flags |= LTTCOMM_RELAYD_CONFIGURATION_FLAG_CLEAR_ALLOWED; + } + ret = 0; +reply: + reply = (typeof(reply)){ + .generic.ret_code = htobe32((uint32_t) + (ret == 0 ? LTTNG_OK : LTTNG_ERR_INVALID_PROTOCOL)), + .relayd_configuration_flags = htobe64(result_flags), + }; + send_ret = conn->sock->ops->sendmsg( + conn->sock, &reply, sizeof(reply), 0); + if (send_ret < (ssize_t) sizeof(reply)) { + ERR("Failed to send \"get configuration\" command reply (ret = %zd)", + send_ret); + ret = -1; + } +end_no_reply: return ret; } @@ -2801,6 +3242,10 @@ static int relay_process_control_command(struct relay_connection *conn, DBG_CMD("RELAYD_TRACE_CHUNK_EXISTS", conn); ret = relay_trace_chunk_exists(header, conn, payload); break; + case RELAYD_GET_CONFIGURATION: + DBG_CMD("RELAYD_GET_CONFIGURATION", conn); + ret = relay_get_configuration(header, conn, payload); + break; case RELAYD_UPDATE_SYNC_INFO: default: ERR("Received unknown command (%u)", header->cmd); @@ -3339,7 +3784,7 @@ static void *relay_thread_worker(void *data) goto relay_connections_ht_error; } - ret = create_thread_poll_set(&events, 2); + ret = create_named_thread_poll_set(&events, 2, "Worker thread epoll"); if (ret < 0) { goto error_poll_create; } @@ -3400,8 +3845,13 @@ restart: if (ret < 0) { goto error; } - lttng_poll_add(&events, conn->sock->fd, + ret = lttng_poll_add(&events, + conn->sock->fd, LPOLLIN | LPOLLRDHUP); + if (ret) { + ERR("Failed to add new connection file descriptor to poll set"); + goto error; + } connection_ht_add(relay_connections_ht, conn); DBG("Connection socket %d added", conn->sock->fd); } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { @@ -3603,12 +4053,13 @@ error: } rcu_read_unlock(); - lttng_poll_clean(&events); + (void) fd_tracker_util_poll_clean(the_fd_tracker, &events); error_poll_create: lttng_ht_destroy(relay_connections_ht); relay_connections_ht_error: /* Close relay conn pipes */ - utils_close_pipe(relay_conn_pipe); + (void) fd_tracker_util_pipe_close(the_fd_tracker, + relay_conn_pipe); if (err) { DBG("Thread exited with error"); } @@ -3630,11 +4081,24 @@ error_testpoint: */ static int create_relay_conn_pipe(void) { - int ret; + return fd_tracker_util_pipe_open_cloexec(the_fd_tracker, + "Relayd connection pipe", relay_conn_pipe); +} + +static int stdio_open(void *data, int *fds) +{ + fds[0] = fileno(stdout); + fds[1] = fileno(stderr); + return 0; +} - ret = utils_create_pipe_cloexec(relay_conn_pipe); +static int track_stdio(void) +{ + int fds[2]; + const char *names[] = { "stdout", "stderr" }; - return ret; + return fd_tracker_open_unsuspendable_fd(the_fd_tracker, fds, + names, 2, stdio_open, NULL); } /* @@ -3642,10 +4106,21 @@ static int create_relay_conn_pipe(void) */ int main(int argc, char **argv) { + bool thread_is_rcu_registered = false; int ret = 0, retval = 0; void *status; - /* Parse arguments */ + /* Parse environment variables */ + ret = parse_env_options(); + if (ret) { + retval = -1; + goto exit_options; + } + + /* + * Parse arguments. + * Command line arguments overwrite environment. + */ progname = argv[0]; if (set_options(argc, argv)) { retval = -1; @@ -3657,6 +4132,22 @@ int main(int argc, char **argv) goto exit_options; } + relayd_config_log(); + + if (opt_print_version) { + print_version(); + retval = 0; + goto exit_options; + } + + ret = fclose(stdin); + if (ret) { + PERROR("Failed to close stdin"); + goto exit_options; + } + + DBG("Clear command %s", opt_allow_clear ? "allowed" : "disallowed"); + /* Try to create directory if -o, --output is specified. */ if (opt_output_path) { if (*opt_output_path != '/') { @@ -3676,22 +4167,19 @@ int main(int argc, char **argv) /* Daemonize */ if (opt_daemon || opt_background) { - int i; - ret = lttng_daemonize(&child_ppid, &recv_child_signal, !opt_background); if (ret < 0) { retval = -1; goto exit_options; } + } - /* - * We are in the child. Make sure all other file - * descriptors are closed, in case we are called with - * more opened file descriptors than the standard ones. - */ - for (i = 3; i < sysconf(_SC_OPEN_MAX); i++) { - (void) close(i); + if (opt_working_directory) { + ret = utils_change_working_directory(opt_working_directory); + if (ret) { + /* All errors are already logged. */ + goto exit_options; } } @@ -3699,7 +4187,28 @@ int main(int argc, char **argv) if (!sessiond_trace_chunk_registry) { ERR("Failed to initialize session daemon trace chunk registry"); retval = -1; - goto exit_sessiond_trace_chunk_registry; + goto exit_options; + } + + /* + * The RCU thread registration (and use, through the fd-tracker's + * creation) is done after the daemonization to allow us to not + * deal with liburcu's fork() management as the call RCU needs to + * be restored. + */ + rcu_register_thread(); + thread_is_rcu_registered = true; + + the_fd_tracker = fd_tracker_create(lttng_opt_fd_pool_size); + if (!the_fd_tracker) { + retval = -1; + goto exit_options; + } + + ret = track_stdio(); + if (ret) { + retval = -1; + goto exit_options; } /* Initialize thread health monitoring */ @@ -3707,19 +4216,19 @@ int main(int argc, char **argv) if (!health_relayd) { PERROR("health_app_create error"); retval = -1; - goto exit_health_app_create; + goto exit_options; } /* Create thread quit pipe */ if (init_thread_quit_pipe()) { retval = -1; - goto exit_init_data; + goto exit_options; } /* Setup the thread apps communication pipe. */ if (create_relay_conn_pipe()) { retval = -1; - goto exit_init_data; + goto exit_options; } /* Init relay command queue. */ @@ -3733,27 +4242,27 @@ int main(int argc, char **argv) sessions_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); if (!sessions_ht) { retval = -1; - goto exit_init_data; + goto exit_options; } /* tables of streams indexed by stream ID */ relay_streams_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); if (!relay_streams_ht) { retval = -1; - goto exit_init_data; + goto exit_options; } /* tables of streams indexed by stream ID */ viewer_streams_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); if (!viewer_streams_ht) { retval = -1; - goto exit_init_data; + goto exit_options; } - ret = utils_create_pipe(health_quit_pipe); + ret = init_health_quit_pipe(); if (ret) { retval = -1; - goto exit_health_quit_pipe; + goto exit_options; } /* Create thread to manage the client socket */ @@ -3763,7 +4272,7 @@ int main(int argc, char **argv) errno = ret; PERROR("pthread_create health"); retval = -1; - goto exit_health_thread; + goto exit_options; } /* Setup the dispatcher thread */ @@ -3844,16 +4353,6 @@ exit_dispatcher_thread: PERROR("pthread_join health_thread"); retval = -1; } -exit_health_thread: - - utils_close_pipe(health_quit_pipe); -exit_health_quit_pipe: - -exit_init_data: - health_app_destroy(health_relayd); - sessiond_trace_chunk_registry_destroy(sessiond_trace_chunk_registry); -exit_health_app_create: -exit_sessiond_trace_chunk_registry: exit_options: /* * Wait for all pending call_rcu work to complete before tearing @@ -3866,6 +4365,10 @@ exit_options: /* Ensure all prior call_rcu are done. */ rcu_barrier(); + if (thread_is_rcu_registered) { + rcu_unregister_thread(); + } + if (!retval) { exit(EXIT_SUCCESS); } else {