X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.c;h=4c99b56f67dcb23ee726b361e96d81d6ba0f0967;hb=62c43103c60bd704cd8ed7acaaa22465802f5673;hp=ea46ec72276117359b32fa499b584357b5aeb401;hpb=0072e5e28fb5f18daffb930be7efd55d2474e6ad;p=lttng-tools.git diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index ea46ec722..4c99b56f6 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -70,6 +70,15 @@ #include "stream.h" #include "connection.h" #include "tracefile-array.h" +#include "tcp_keep_alive.h" + +static const char *help_msg = +#ifdef LTTNG_EMBED_HELP +#include +#else +NULL +#endif +; /* command line options */ char *opt_output_path; @@ -161,10 +170,11 @@ static struct option long_options[] = { { "output", 1, 0, 'o', }, { "verbose", 0, 0, 'v', }, { "config", 1, 0, 'f' }, + { "version", 0, 0, 'V' }, { NULL, 0, 0, 0, }, }; -static const char *config_ignore_options[] = { "help", "config" }; +static const char *config_ignore_options[] = { "help", "config", "version" }; /* * Take an option from the getopt output and set it in the right variable to be @@ -249,12 +259,15 @@ static int set_option(int opt, const char *arg, const char *optname) } break; case 'h': - ret = utils_show_man_page(8, "lttng-relayd"); + ret = utils_show_help(8, "lttng-relayd", help_msg); if (ret) { - ERR("Cannot view man page lttng-relayd(8)"); + ERR("Cannot show --help for `lttng-relayd`"); perror("exec"); } exit(EXIT_FAILURE); + case 'V': + fprintf(stdout, "%s\n", VERSION); + exit(EXIT_SUCCESS); case 'o': if (lttng_is_setuid_setgid()) { WARN("Getting '%s' argument from setuid/setgid binary refused for security reasons.", @@ -887,6 +900,15 @@ restart: lttcomm_destroy_sock(newsock); goto error; } + + ret = socket_apply_keep_alive_config(newsock->fd); + if (ret < 0) { + ERR("Failed to apply TCP keep-alive configuration on socket (%i)", + newsock->fd); + lttcomm_destroy_sock(newsock); + goto error; + } + new_conn = connection_create(newsock, type); if (!new_conn) { lttcomm_destroy_sock(newsock); @@ -965,12 +987,16 @@ static void *relay_thread_dispatcher(void *data) health_code_update(); - while (!CMM_LOAD_SHARED(dispatch_thread_exit)) { + for (;;) { health_code_update(); /* Atomically prepare the queue futex */ futex_nto1_prepare(&relay_conn_queue.futex); + if (CMM_LOAD_SHARED(dispatch_thread_exit)) { + break; + } + do { health_code_update(); @@ -1575,6 +1601,7 @@ static int relay_send_version(struct lttcomm_relayd_hdr *recv_hdr, { int ret; struct lttcomm_relayd_version reply, msg; + bool compatible = true; conn->version_check_done = 1; @@ -1599,9 +1626,7 @@ static int relay_send_version(struct lttcomm_relayd_hdr *recv_hdr, if (reply.major != be32toh(msg.major)) { DBG("Incompatible major versions (%u vs %u), deleting session", reply.major, be32toh(msg.major)); - connection_put(conn); - ret = 0; - goto end; + compatible = false; } conn->major = reply.major; @@ -1620,6 +1645,11 @@ static int relay_send_version(struct lttcomm_relayd_hdr *recv_hdr, ERR("Relay sending version"); } + if (!compatible) { + ret = -1; + goto end; + } + DBG("Version check done using protocol %u.%u", conn->major, conn->minor); @@ -1942,6 +1972,7 @@ static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, struct lttcomm_relayd_generic_reply reply; struct relay_stream *stream; uint64_t net_seq_num; + size_t msg_len; assert(conn); @@ -1953,9 +1984,12 @@ static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, goto end_no_session; } + msg_len = lttcomm_relayd_index_len( + lttng_to_index_major(conn->major, conn->minor), + lttng_to_index_minor(conn->major, conn->minor)); ret = conn->sock->ops->recvmsg(conn->sock, &index_info, - sizeof(index_info), 0); - if (ret < sizeof(index_info)) { + msg_len, 0); + if (ret < msg_len) { if (ret == 0) { /* Orderly shutdown. Not necessary to print an error. */ DBG("Socket %d did an orderly shutdown", conn->sock->fd); @@ -2088,6 +2122,285 @@ end_no_session: return ret; } +/* + * relay_mkdir: Create a folder on the disk. + */ +static int relay_mkdir(struct lttcomm_relayd_hdr *recv_hdr, + struct relay_connection *conn) +{ + int ret; + ssize_t network_ret; + struct relay_session *session = conn->session; + struct lttcomm_relayd_mkdir path_info_header; + struct lttcomm_relayd_mkdir *path_info = NULL; + struct lttcomm_relayd_generic_reply reply; + char *path = NULL; + + if (!session || !conn->version_check_done) { + ERR("Trying to create a directory before version check"); + ret = -1; + goto end_no_session; + } + + if (session->major == 2 && session->minor < 11) { + /* + * This client is not supposed to use this command since + * it predates its introduction. + */ + ERR("relay_mkdir command is unsupported before LTTng 2.11"); + ret = -1; + goto end_no_session; + } + + network_ret = conn->sock->ops->recvmsg(conn->sock, &path_info_header, + sizeof(path_info_header), 0); + if (network_ret < (ssize_t) sizeof(path_info_header)) { + if (network_ret == 0) { + /* Orderly shutdown. Not necessary to print an error. */ + DBG("Socket %d did an orderly shutdown", conn->sock->fd); + } else { + ERR("Reception of mkdir command argument length failed with ret = %zi, expected %zu", + network_ret, sizeof(path_info_header)); + } + ret = -1; + goto end_no_session; + } + + path_info_header.length = be32toh(path_info_header.length); + + /* Ensure that it fits in local path length. */ + if (path_info_header.length >= LTTNG_PATH_MAX) { + ret = -ENAMETOOLONG; + ERR("Path name argument of mkdir command (%" PRIu32 " bytes) exceeds the maximal length allowed (%d bytes)", + path_info_header.length, LTTNG_PATH_MAX); + goto end; + } + + path_info = zmalloc(sizeof(path_info_header) + path_info_header.length); + if (!path_info) { + PERROR("zmalloc of mkdir command path"); + ret = -1; + goto end; + } + + network_ret = conn->sock->ops->recvmsg(conn->sock, path_info->path, + path_info_header.length, 0); + if (network_ret < (ssize_t) path_info_header.length) { + if (network_ret == 0) { + /* Orderly shutdown. Not necessary to print an error. */ + DBG("Socket %d did an orderly shutdown", conn->sock->fd); + } else { + ERR("Reception of mkdir path argument failed with ret = %zi, expected %" PRIu32, + network_ret, path_info_header.length); + } + ret = -1; + goto end_no_session; + } + + path = create_output_path(path_info->path); + if (!path) { + ERR("Failed to create output path"); + ret = -1; + goto end; + } + + ret = utils_mkdir_recursive(path, S_IRWXU | S_IRWXG, -1, -1); + if (ret < 0) { + ERR("relay creating output directory"); + goto end; + } + + ret = 0; + +end: + memset(&reply, 0, sizeof(reply)); + if (ret < 0) { + reply.ret_code = htobe32(LTTNG_ERR_UNK); + } else { + reply.ret_code = htobe32(LTTNG_OK); + } + network_ret = conn->sock->ops->sendmsg(conn->sock, &reply, + sizeof(struct lttcomm_relayd_generic_reply), 0); + if (network_ret < (ssize_t) sizeof(struct lttcomm_relayd_generic_reply)) { + ERR("Failed to send mkdir command status code with ret = %zi, expected %zu", + network_ret, + sizeof(struct lttcomm_relayd_generic_reply)); + ret = -1; + } + +end_no_session: + free(path); + free(path_info); + return ret; +} + +static int validate_rotate_rename_path_length(const char *path_type, + uint32_t path_length) +{ + int ret = 0; + + if (path_length > LTTNG_PATH_MAX) { + ret = -ENAMETOOLONG; + ERR("rotate rename \"%s\" path name length (%" PRIu32 " bytes) exceeds the allowed size of %i bytes", + path_type, path_length, LTTNG_PATH_MAX); + } else if (path_length == 0) { + ret = -EINVAL; + ERR("rotate rename \"%s\" path name has an illegal length of 0", path_type); + } + return ret; +} + +/* + * relay_rotate_rename: rename the trace folder after a rotation is + * completed. We are not closing any fd here, just moving the folder, so it + * works even if data is still in-flight. + */ +static int relay_rotate_rename(struct lttcomm_relayd_hdr *recv_hdr, + struct relay_connection *conn) +{ + int ret; + ssize_t network_ret; + struct relay_session *session = conn->session; + struct lttcomm_relayd_generic_reply reply; + struct lttcomm_relayd_rotate_rename header; + char *received_paths = NULL; + size_t received_paths_size; + const char *received_old_path, *received_new_path; + char *complete_old_path = NULL, *complete_new_path = NULL; + + if (!session || !conn->version_check_done) { + ERR("Trying to rename a trace folder before version check"); + ret = -1; + goto end_no_reply; + } + + if (session->major == 2 && session->minor < 11) { + ERR("relay_rotate_rename command is unsupported before LTTng 2.11"); + ret = -1; + goto end_no_reply; + } + + network_ret = conn->sock->ops->recvmsg(conn->sock, &header, + sizeof(header), 0); + if (network_ret < (ssize_t) sizeof(header)) { + if (network_ret == 0) { + /* Orderly shutdown. Not necessary to print an error. */ + DBG("Socket %d did an orderly shutdown", + conn->sock->fd); + } else { + ERR("Relay didn't receive a valid rotate_rename command header: expected %zu bytes, recvmsg() returned %zi", + sizeof(header), network_ret); + } + ret = -1; + goto end_no_reply; + } + + header.old_path_length = be32toh(header.old_path_length); + header.new_path_length = be32toh(header.new_path_length); + received_paths_size = header.old_path_length + header.new_path_length; + + /* Ensure the paths don't exceed their allowed size. */ + ret = validate_rotate_rename_path_length("old", header.old_path_length); + if (ret) { + goto end; + } + ret = validate_rotate_rename_path_length("new", header.new_path_length); + if (ret) { + goto end; + } + + received_paths = zmalloc(received_paths_size); + if (!received_paths) { + PERROR("Could not allocate rotate commands paths reception buffer"); + ret = -1; + goto end; + } + + network_ret = conn->sock->ops->recvmsg(conn->sock, received_paths, + received_paths_size, 0); + if (network_ret < (ssize_t) received_paths_size) { + if (network_ret == 0) { + /* Orderly shutdown. Not necessary to print an error. */ + DBG("Socket %d did an orderly shutdown", + conn->sock->fd); + } else { + ERR("Relay failed to received rename command paths (%zu bytes): recvmsg() returned %zi", + received_paths_size, network_ret); + } + ret = -1; + goto end_no_reply; + } + + /* Validate that both paths received are NULL terminated. */ + if (received_paths[header.old_path_length - 1] != '\0') { + ERR("relay_rotate_rename command's \"old\" path is invalid (not NULL terminated)"); + ret = -1; + goto end; + } + if (received_paths[received_paths_size - 1] != '\0') { + ERR("relay_rotate_rename command's \"new\" path is invalid (not NULL terminated)"); + ret = -1; + goto end; + } + + received_old_path = received_paths; + received_new_path = received_paths + header.old_path_length; + + complete_old_path = create_output_path(received_old_path); + if (!complete_old_path) { + ERR("Failed to build old output path in rotate_rename command"); + ret = -1; + goto end; + } + + complete_new_path = create_output_path(received_new_path); + if (!complete_new_path) { + ERR("Failed to build new output path in rotate_rename command"); + ret = -1; + goto end; + } + + ret = utils_mkdir_recursive(complete_new_path, S_IRWXU | S_IRWXG, + -1, -1); + if (ret < 0) { + ERR("Failed to mkdir() rotate_rename's \"new\" output directory at \"%s\"", + complete_new_path); + goto end; + } + + /* + * If a domain has not yet created its channel, the domain-specific + * folder might not exist, but this is not an error. + */ + ret = rename(complete_old_path, complete_new_path); + if (ret < 0 && errno != ENOENT) { + PERROR("Renaming chunk in rotate_rename command from \"%s\" to \"%s\"", + complete_old_path, complete_new_path); + goto end; + } + ret = 0; + +end: + memset(&reply, 0, sizeof(reply)); + if (ret < 0) { + reply.ret_code = htobe32(LTTNG_ERR_UNK); + } else { + reply.ret_code = htobe32(LTTNG_OK); + } + network_ret = conn->sock->ops->sendmsg(conn->sock, &reply, + sizeof(struct lttcomm_relayd_generic_reply), 0); + if (network_ret < sizeof(struct lttcomm_relayd_generic_reply)) { + ERR("Relay sending stream id"); + ret = -1; + } + +end_no_reply: + free(received_paths); + free(complete_old_path); + free(complete_new_path); + return ret; +} + /* * Process the commands received on the control socket */ @@ -2136,6 +2449,12 @@ static int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr, case RELAYD_RESET_METADATA: ret = relay_reset_metadata(recv_hdr, conn); break; + case RELAYD_ROTATE_RENAME: + ret = relay_rotate_rename(recv_hdr, conn); + break; + case RELAYD_MKDIR: + ret = relay_mkdir(recv_hdr, conn); + break; case RELAYD_UPDATE_SYNC_INFO: default: ERR("Received unknown command (%u)", be32toh(recv_hdr->cmd)); @@ -2179,41 +2498,36 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num, goto end; } - if (rotate_index || !stream->index_fd) { - int fd; + if (rotate_index || !stream->index_file) { + uint32_t major, minor; - /* Put ref on previous index_fd. */ - if (stream->index_fd) { - stream_fd_put(stream->index_fd); - stream->index_fd = NULL; + /* Put ref on previous index_file. */ + if (stream->index_file) { + lttng_index_file_put(stream->index_file); + stream->index_file = NULL; } - - fd = index_create_file(stream->path_name, stream->channel_name, + major = stream->trace->session->major; + minor = stream->trace->session->minor; + stream->index_file = lttng_index_file_create(stream->path_name, + stream->channel_name, -1, -1, stream->tracefile_size, - tracefile_array_get_file_index_head(stream->tfa)); - if (fd < 0) { + tracefile_array_get_file_index_head(stream->tfa), + lttng_to_index_major(major, minor), + lttng_to_index_minor(major, minor)); + if (!stream->index_file) { ret = -1; /* Put self-ref for this index due to error. */ relay_index_put(index); - goto end; - } - stream->index_fd = stream_fd_create(fd); - if (!stream->index_fd) { - ret = -1; - if (close(fd)) { - PERROR("Error closing FD %d", fd); - } - /* Put self-ref for this index due to error. */ - relay_index_put(index); - /* Will put the local ref. */ + index = NULL; goto end; } } - if (relay_index_set_fd(index, stream->index_fd, data_offset)) { + if (relay_index_set_file(index, stream->index_file, data_offset)) { ret = -1; /* Put self-ref for this index due to error. */ relay_index_put(index); + index = NULL; goto end; } @@ -2227,6 +2541,7 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num, } else { /* Put self-ref for this index due to error. */ relay_index_put(index); + index = NULL; ret = -1; } end: @@ -2676,6 +2991,11 @@ error: destroy_conn, sock_n.node) { health_code_update(); + + if (session_abort(destroy_conn->session)) { + assert(0); + } + /* * No need to grab another ref, because we own * destroy_conn. @@ -2928,6 +3248,12 @@ exit_init_data: health_app_destroy(health_relayd); exit_health_app_create: exit_options: + /* + * Wait for all pending call_rcu work to complete before tearing + * down data structures. call_rcu worker may be trying to + * perform lookups in those structures. + */ + rcu_barrier(); relayd_cleanup(); /* Ensure all prior call_rcu are done. */