X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.c;h=cd4f058bf0243df63dd78e573e97fca202b751d3;hb=d40f035921a2d3be652d31bdcc250db521f96209;hp=007f9513158d20aed0d89941f34e6f598e0606b9;hpb=22dad56815ce0201c5ae7d5ef5d79cc0c6a42c5e;p=lttng-tools.git diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index 007f95131..cd4f058bf 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -70,6 +70,15 @@ #include "stream.h" #include "connection.h" #include "tracefile-array.h" +#include "tcp_keep_alive.h" + +static const char *help_msg = +#ifdef LTTNG_EMBED_HELP +#include +#else +NULL +#endif +; /* command line options */ char *opt_output_path; @@ -161,10 +170,11 @@ static struct option long_options[] = { { "output", 1, 0, 'o', }, { "verbose", 0, 0, 'v', }, { "config", 1, 0, 'f' }, + { "version", 0, 0, 'V' }, { NULL, 0, 0, 0, }, }; -static const char *config_ignore_options[] = { "help", "config" }; +static const char *config_ignore_options[] = { "help", "config", "version" }; /* * Take an option from the getopt output and set it in the right variable to be @@ -249,12 +259,15 @@ static int set_option(int opt, const char *arg, const char *optname) } break; case 'h': - ret = utils_show_man_page(8, "lttng-relayd"); + ret = utils_show_help(8, "lttng-relayd", help_msg); if (ret) { - ERR("Cannot view man page lttng-relayd(8)"); + ERR("Cannot show --help for `lttng-relayd`"); perror("exec"); } exit(EXIT_FAILURE); + case 'V': + fprintf(stdout, "%s\n", VERSION); + exit(EXIT_SUCCESS); case 'o': if (lttng_is_setuid_setgid()) { WARN("Getting '%s' argument from setuid/setgid binary refused for security reasons.", @@ -582,9 +595,6 @@ int lttng_relay_stop_threads(void) static void sighandler(int sig) { switch (sig) { - case SIGPIPE: - DBG("SIGPIPE caught"); - return; case SIGINT: DBG("SIGINT caught"); if (lttng_relay_stop_threads()) { @@ -620,9 +630,10 @@ static int set_signal_handler(void) return ret; } - sa.sa_handler = sighandler; sa.sa_mask = sigset; sa.sa_flags = 0; + + sa.sa_handler = sighandler; if ((ret = sigaction(SIGTERM, &sa, NULL)) < 0) { PERROR("sigaction"); return ret; @@ -633,12 +644,13 @@ static int set_signal_handler(void) return ret; } - if ((ret = sigaction(SIGPIPE, &sa, NULL)) < 0) { + if ((ret = sigaction(SIGUSR1, &sa, NULL)) < 0) { PERROR("sigaction"); return ret; } - if ((ret = sigaction(SIGUSR1, &sa, NULL)) < 0) { + sa.sa_handler = SIG_IGN; + if ((ret = sigaction(SIGPIPE, &sa, NULL)) < 0) { PERROR("sigaction"); return ret; } @@ -888,6 +900,15 @@ restart: lttcomm_destroy_sock(newsock); goto error; } + + ret = socket_apply_keep_alive_config(newsock->fd); + if (ret < 0) { + ERR("Failed to apply TCP keep-alive configuration on socket (%i)", + newsock->fd); + lttcomm_destroy_sock(newsock); + goto error; + } + new_conn = connection_create(newsock, type); if (!new_conn) { lttcomm_destroy_sock(newsock); @@ -966,12 +987,16 @@ static void *relay_thread_dispatcher(void *data) health_code_update(); - while (!CMM_LOAD_SHARED(dispatch_thread_exit)) { + for (;;) { health_code_update(); /* Atomically prepare the queue futex */ futex_nto1_prepare(&relay_conn_queue.futex); + if (CMM_LOAD_SHARED(dispatch_thread_exit)) { + break; + } + do { health_code_update(); @@ -1576,6 +1601,7 @@ static int relay_send_version(struct lttcomm_relayd_hdr *recv_hdr, { int ret; struct lttcomm_relayd_version reply, msg; + bool compatible = true; conn->version_check_done = 1; @@ -1600,9 +1626,7 @@ static int relay_send_version(struct lttcomm_relayd_hdr *recv_hdr, if (reply.major != be32toh(msg.major)) { DBG("Incompatible major versions (%u vs %u), deleting session", reply.major, be32toh(msg.major)); - connection_put(conn); - ret = 0; - goto end; + compatible = false; } conn->major = reply.major; @@ -1621,6 +1645,11 @@ static int relay_send_version(struct lttcomm_relayd_hdr *recv_hdr, ERR("Relay sending version"); } + if (!compatible) { + ret = -1; + goto end; + } + DBG("Version check done using protocol %u.%u", conn->major, conn->minor); @@ -1943,6 +1972,7 @@ static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, struct lttcomm_relayd_generic_reply reply; struct relay_stream *stream; uint64_t net_seq_num; + size_t msg_len; assert(conn); @@ -1954,9 +1984,12 @@ static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, goto end_no_session; } + msg_len = lttcomm_relayd_index_len( + lttng_to_index_major(conn->major, conn->minor), + lttng_to_index_minor(conn->major, conn->minor)); ret = conn->sock->ops->recvmsg(conn->sock, &index_info, - sizeof(index_info), 0); - if (ret < sizeof(index_info)) { + msg_len, 0); + if (ret < msg_len) { if (ret == 0) { /* Orderly shutdown. Not necessary to print an error. */ DBG("Socket %d did an orderly shutdown", conn->sock->fd); @@ -2180,41 +2213,36 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num, goto end; } - if (rotate_index || !stream->index_fd) { - int fd; + if (rotate_index || !stream->index_file) { + uint32_t major, minor; - /* Put ref on previous index_fd. */ - if (stream->index_fd) { - stream_fd_put(stream->index_fd); - stream->index_fd = NULL; + /* Put ref on previous index_file. */ + if (stream->index_file) { + lttng_index_file_put(stream->index_file); + stream->index_file = NULL; } - - fd = index_create_file(stream->path_name, stream->channel_name, + major = stream->trace->session->major; + minor = stream->trace->session->minor; + stream->index_file = lttng_index_file_create(stream->path_name, + stream->channel_name, -1, -1, stream->tracefile_size, - tracefile_array_get_file_index_head(stream->tfa)); - if (fd < 0) { + tracefile_array_get_file_index_head(stream->tfa), + lttng_to_index_major(major, minor), + lttng_to_index_minor(major, minor)); + if (!stream->index_file) { ret = -1; /* Put self-ref for this index due to error. */ relay_index_put(index); - goto end; - } - stream->index_fd = stream_fd_create(fd); - if (!stream->index_fd) { - ret = -1; - if (close(fd)) { - PERROR("Error closing FD %d", fd); - } - /* Put self-ref for this index due to error. */ - relay_index_put(index); - /* Will put the local ref. */ + index = NULL; goto end; } } - if (relay_index_set_fd(index, stream->index_fd, data_offset)) { + if (relay_index_set_file(index, stream->index_file, data_offset)) { ret = -1; /* Put self-ref for this index due to error. */ relay_index_put(index); + index = NULL; goto end; } @@ -2228,6 +2256,7 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num, } else { /* Put self-ref for this index due to error. */ relay_index_put(index); + index = NULL; ret = -1; } end: @@ -2677,6 +2706,11 @@ error: destroy_conn, sock_n.node) { health_code_update(); + + if (session_abort(destroy_conn->session)) { + assert(0); + } + /* * No need to grab another ref, because we own * destroy_conn. @@ -2778,7 +2812,6 @@ int main(int argc, char **argv) } } - /* Initialize thread health monitoring */ health_relayd = health_app_create(NR_HEALTH_RELAYD_TYPES); if (!health_relayd) { @@ -2834,7 +2867,7 @@ int main(int argc, char **argv) } /* Create thread to manage the client socket */ - ret = pthread_create(&health_thread, NULL, + ret = pthread_create(&health_thread, default_pthread_attr(), thread_manage_health, (void *) NULL); if (ret) { errno = ret; @@ -2844,7 +2877,7 @@ int main(int argc, char **argv) } /* Setup the dispatcher thread */ - ret = pthread_create(&dispatcher_thread, NULL, + ret = pthread_create(&dispatcher_thread, default_pthread_attr(), relay_thread_dispatcher, (void *) NULL); if (ret) { errno = ret; @@ -2854,7 +2887,7 @@ int main(int argc, char **argv) } /* Setup the worker thread */ - ret = pthread_create(&worker_thread, NULL, + ret = pthread_create(&worker_thread, default_pthread_attr(), relay_thread_worker, NULL); if (ret) { errno = ret; @@ -2864,7 +2897,7 @@ int main(int argc, char **argv) } /* Setup the listener thread */ - ret = pthread_create(&listener_thread, NULL, + ret = pthread_create(&listener_thread, default_pthread_attr(), relay_thread_listener, (void *) NULL); if (ret) { errno = ret; @@ -2930,6 +2963,12 @@ exit_init_data: health_app_destroy(health_relayd); exit_health_app_create: exit_options: + /* + * Wait for all pending call_rcu work to complete before tearing + * down data structures. call_rcu worker may be trying to + * perform lookups in those structures. + */ + rcu_barrier(); relayd_cleanup(); /* Ensure all prior call_rcu are done. */