X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.c;h=4c99b56f67dcb23ee726b361e96d81d6ba0f0967;hb=62c43103c60bd704cd8ed7acaaa22465802f5673;hp=dcaaaa8dc0ec2c284eb211ebde21ab7234745b31;hpb=03e431550191df8609f921c7b4054c57ee4644d8;p=lttng-tools.git diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index dcaaaa8dc..4c99b56f6 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -18,7 +18,6 @@ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ -#define _GNU_SOURCE #define _LGPL_SOURCE #include #include @@ -40,7 +39,6 @@ #include #include #include -#include #include #include @@ -56,7 +54,7 @@ #include #include #include -#include +#include #include #include "cmd.h" @@ -72,6 +70,15 @@ #include "stream.h" #include "connection.h" #include "tracefile-array.h" +#include "tcp_keep_alive.h" + +static const char *help_msg = +#ifdef LTTNG_EMBED_HELP +#include +#else +NULL +#endif +; /* command line options */ char *opt_output_path; @@ -83,6 +90,10 @@ static int opt_daemon, opt_background; */ #define NR_LTTNG_RELAY_READY 3 static int lttng_relay_ready = NR_LTTNG_RELAY_READY; + +/* Size of receive buffer. */ +#define RECV_DATA_BUFFER_SIZE 65536 + static int recv_child_signal; /* Set to 1 when a SIGUSR1 signal is received. */ static pid_t child_ppid; /* Internal parent PID use with daemonize. */ @@ -159,28 +170,11 @@ static struct option long_options[] = { { "output", 1, 0, 'o', }, { "verbose", 0, 0, 'v', }, { "config", 1, 0, 'f' }, + { "version", 0, 0, 'V' }, { NULL, 0, 0, 0, }, }; -static const char *config_ignore_options[] = { "help", "config" }; - -/* - * usage function on stderr - */ -static void usage(void) -{ - fprintf(stderr, "Usage: %s OPTIONS\n\nOptions:\n", progname); - fprintf(stderr, " -h, --help Display this usage.\n"); - fprintf(stderr, " -d, --daemonize Start as a daemon.\n"); - fprintf(stderr, " -b, --background Start as a daemon, keeping console open.\n"); - fprintf(stderr, " -C, --control-port URL Control port listening.\n"); - fprintf(stderr, " -D, --data-port URL Data port listening.\n"); - fprintf(stderr, " -L, --live-port URL Live view port listening.\n"); - fprintf(stderr, " -o, --output PATH Output path for traces. Must use an absolute path.\n"); - fprintf(stderr, " -v, --verbose Verbose mode. Activate DBG() macro.\n"); - fprintf(stderr, " -g, --group NAME Specify the tracing group name. (default: tracing)\n"); - fprintf(stderr, " -f --config Load daemon configuration file\n"); -} +static const char *config_ignore_options[] = { "help", "config", "version" }; /* * Take an option from the getopt output and set it in the right variable to be @@ -265,8 +259,15 @@ static int set_option(int opt, const char *arg, const char *optname) } break; case 'h': - usage(); + ret = utils_show_help(8, "lttng-relayd", help_msg); + if (ret) { + ERR("Cannot show --help for `lttng-relayd`"); + perror("exec"); + } exit(EXIT_FAILURE); + case 'V': + fprintf(stdout, "%s\n", VERSION); + exit(EXIT_SUCCESS); case 'o': if (lttng_is_setuid_setgid()) { WARN("Getting '%s' argument from setuid/setgid binary refused for security reasons.", @@ -307,7 +308,7 @@ end: /* * config_entry_handler_cb used to handle options read from a config file. - * See config_entry_handler_cb comment in common/config/config.h for the + * See config_entry_handler_cb comment in common/config/session-config.h for the * return value conventions. */ static int config_entry_handler(const struct config_entry *entry, void *unused) @@ -594,9 +595,6 @@ int lttng_relay_stop_threads(void) static void sighandler(int sig) { switch (sig) { - case SIGPIPE: - DBG("SIGPIPE caught"); - return; case SIGINT: DBG("SIGINT caught"); if (lttng_relay_stop_threads()) { @@ -632,9 +630,10 @@ static int set_signal_handler(void) return ret; } - sa.sa_handler = sighandler; sa.sa_mask = sigset; sa.sa_flags = 0; + + sa.sa_handler = sighandler; if ((ret = sigaction(SIGTERM, &sa, NULL)) < 0) { PERROR("sigaction"); return ret; @@ -645,12 +644,13 @@ static int set_signal_handler(void) return ret; } - if ((ret = sigaction(SIGPIPE, &sa, NULL)) < 0) { + if ((ret = sigaction(SIGUSR1, &sa, NULL)) < 0) { PERROR("sigaction"); return ret; } - if ((ret = sigaction(SIGUSR1, &sa, NULL)) < 0) { + sa.sa_handler = SIG_IGN; + if ((ret = sigaction(SIGPIPE, &sa, NULL)) < 0) { PERROR("sigaction"); return ret; } @@ -900,6 +900,15 @@ restart: lttcomm_destroy_sock(newsock); goto error; } + + ret = socket_apply_keep_alive_config(newsock->fd); + if (ret < 0) { + ERR("Failed to apply TCP keep-alive configuration on socket (%i)", + newsock->fd); + lttcomm_destroy_sock(newsock); + goto error; + } + new_conn = connection_create(newsock, type); if (!new_conn) { lttcomm_destroy_sock(newsock); @@ -978,12 +987,16 @@ static void *relay_thread_dispatcher(void *data) health_code_update(); - while (!CMM_LOAD_SHARED(dispatch_thread_exit)) { + for (;;) { health_code_update(); /* Atomically prepare the queue futex */ futex_nto1_prepare(&relay_conn_queue.futex); + if (CMM_LOAD_SHARED(dispatch_thread_exit)) { + break; + } + do { health_code_update(); @@ -1038,7 +1051,8 @@ error_testpoint: * Set index data from the control port to a given index object. */ static int set_index_control_data(struct relay_index *index, - struct lttcomm_relayd_index *data) + struct lttcomm_relayd_index *data, + struct relay_connection *conn) { struct ctf_packet_index index_data; @@ -1054,6 +1068,12 @@ static int set_index_control_data(struct relay_index *index, index_data.timestamp_end = data->timestamp_end; index_data.events_discarded = data->events_discarded; index_data.stream_id = data->stream_id; + + if (conn->minor >= 8) { + index->index_data.stream_instance_id = data->stream_instance_id; + index->index_data.packet_seq_num = data->packet_seq_num; + } + return relay_index_set_data(index, &index_data); } @@ -1068,13 +1088,13 @@ static int relay_create_session(struct lttcomm_relayd_hdr *recv_hdr, int ret = 0, send_ret; struct relay_session *session; struct lttcomm_relayd_status_session reply; - char session_name[NAME_MAX]; - char hostname[HOST_NAME_MAX]; + char session_name[LTTNG_NAME_MAX]; + char hostname[LTTNG_HOST_NAME_MAX]; uint32_t live_timer = 0; bool snapshot = false; - memset(session_name, 0, NAME_MAX); - memset(hostname, 0, HOST_NAME_MAX); + memset(session_name, 0, LTTNG_NAME_MAX); + memset(hostname, 0, LTTNG_HOST_NAME_MAX); memset(&reply, 0, sizeof(reply)); @@ -1325,6 +1345,90 @@ end_no_session: return ret; } +/* + * relay_reset_metadata: reset a metadata stream + */ +static +int relay_reset_metadata(struct lttcomm_relayd_hdr *recv_hdr, + struct relay_connection *conn) +{ + int ret, send_ret; + struct relay_session *session = conn->session; + struct lttcomm_relayd_reset_metadata stream_info; + struct lttcomm_relayd_generic_reply reply; + struct relay_stream *stream; + + DBG("Reset metadata received"); + + if (!session || conn->version_check_done == 0) { + ERR("Trying to reset a metadata stream before version check"); + ret = -1; + goto end_no_session; + } + + ret = conn->sock->ops->recvmsg(conn->sock, &stream_info, + sizeof(struct lttcomm_relayd_reset_metadata), 0); + if (ret < sizeof(struct lttcomm_relayd_reset_metadata)) { + if (ret == 0) { + /* Orderly shutdown. Not necessary to print an error. */ + DBG("Socket %d did an orderly shutdown", conn->sock->fd); + } else { + ERR("Relay didn't receive valid reset_metadata struct " + "size : %d", ret); + } + ret = -1; + goto end_no_session; + } + DBG("Update metadata to version %" PRIu64, be64toh(stream_info.version)); + + /* Unsupported for live sessions for now. */ + if (session->live_timer != 0) { + ret = -1; + goto end; + } + + stream = stream_get_by_id(be64toh(stream_info.stream_id)); + if (!stream) { + ret = -1; + goto end; + } + pthread_mutex_lock(&stream->lock); + if (!stream->is_metadata) { + ret = -1; + goto end_unlock; + } + + ret = utils_rotate_stream_file(stream->path_name, stream->channel_name, + 0, 0, -1, -1, stream->stream_fd->fd, NULL, + &stream->stream_fd->fd); + if (ret < 0) { + ERR("Failed to rotate metadata file %s of channel %s", + stream->path_name, stream->channel_name); + goto end_unlock; + } + +end_unlock: + pthread_mutex_unlock(&stream->lock); + stream_put(stream); + +end: + memset(&reply, 0, sizeof(reply)); + if (ret < 0) { + reply.ret_code = htobe32(LTTNG_ERR_UNK); + } else { + reply.ret_code = htobe32(LTTNG_OK); + } + send_ret = conn->sock->ops->sendmsg(conn->sock, &reply, + sizeof(struct lttcomm_relayd_generic_reply), 0); + if (send_ret < 0) { + ERR("Relay sending reset metadata reply"); + ret = send_ret; + } + +end_no_session: + return ret; +} + /* * relay_unknown_command: send -1 if received unknown command */ @@ -1497,6 +1601,7 @@ static int relay_send_version(struct lttcomm_relayd_hdr *recv_hdr, { int ret; struct lttcomm_relayd_version reply, msg; + bool compatible = true; conn->version_check_done = 1; @@ -1521,9 +1626,7 @@ static int relay_send_version(struct lttcomm_relayd_hdr *recv_hdr, if (reply.major != be32toh(msg.major)) { DBG("Incompatible major versions (%u vs %u), deleting session", reply.major, be32toh(msg.major)); - connection_put(conn); - ret = 0; - goto end; + compatible = false; } conn->major = reply.major; @@ -1542,6 +1645,11 @@ static int relay_send_version(struct lttcomm_relayd_hdr *recv_hdr, ERR("Relay sending version"); } + if (!compatible) { + ret = -1; + goto end; + } + DBG("Version check done using protocol %u.%u", conn->major, conn->minor); @@ -1864,6 +1972,7 @@ static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, struct lttcomm_relayd_generic_reply reply; struct relay_stream *stream; uint64_t net_seq_num; + size_t msg_len; assert(conn); @@ -1875,9 +1984,12 @@ static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, goto end_no_session; } + msg_len = lttcomm_relayd_index_len( + lttng_to_index_major(conn->major, conn->minor), + lttng_to_index_minor(conn->major, conn->minor)); ret = conn->sock->ops->recvmsg(conn->sock, &index_info, - sizeof(index_info), 0); - if (ret < sizeof(index_info)) { + msg_len, 0); + if (ret < msg_len) { if (ret == 0) { /* Orderly shutdown. Not necessary to print an error. */ DBG("Socket %d did an orderly shutdown", conn->sock->fd); @@ -1927,7 +2039,7 @@ static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, ERR("relay_index_get_by_id_or_create index NULL"); goto end_stream_put; } - if (set_index_control_data(index, &index_info)) { + if (set_index_control_data(index, &index_info, conn)) { ERR("set_index_control_data error"); relay_index_put(index); ret = -1; @@ -2010,6 +2122,285 @@ end_no_session: return ret; } +/* + * relay_mkdir: Create a folder on the disk. + */ +static int relay_mkdir(struct lttcomm_relayd_hdr *recv_hdr, + struct relay_connection *conn) +{ + int ret; + ssize_t network_ret; + struct relay_session *session = conn->session; + struct lttcomm_relayd_mkdir path_info_header; + struct lttcomm_relayd_mkdir *path_info = NULL; + struct lttcomm_relayd_generic_reply reply; + char *path = NULL; + + if (!session || !conn->version_check_done) { + ERR("Trying to create a directory before version check"); + ret = -1; + goto end_no_session; + } + + if (session->major == 2 && session->minor < 11) { + /* + * This client is not supposed to use this command since + * it predates its introduction. + */ + ERR("relay_mkdir command is unsupported before LTTng 2.11"); + ret = -1; + goto end_no_session; + } + + network_ret = conn->sock->ops->recvmsg(conn->sock, &path_info_header, + sizeof(path_info_header), 0); + if (network_ret < (ssize_t) sizeof(path_info_header)) { + if (network_ret == 0) { + /* Orderly shutdown. Not necessary to print an error. */ + DBG("Socket %d did an orderly shutdown", conn->sock->fd); + } else { + ERR("Reception of mkdir command argument length failed with ret = %zi, expected %zu", + network_ret, sizeof(path_info_header)); + } + ret = -1; + goto end_no_session; + } + + path_info_header.length = be32toh(path_info_header.length); + + /* Ensure that it fits in local path length. */ + if (path_info_header.length >= LTTNG_PATH_MAX) { + ret = -ENAMETOOLONG; + ERR("Path name argument of mkdir command (%" PRIu32 " bytes) exceeds the maximal length allowed (%d bytes)", + path_info_header.length, LTTNG_PATH_MAX); + goto end; + } + + path_info = zmalloc(sizeof(path_info_header) + path_info_header.length); + if (!path_info) { + PERROR("zmalloc of mkdir command path"); + ret = -1; + goto end; + } + + network_ret = conn->sock->ops->recvmsg(conn->sock, path_info->path, + path_info_header.length, 0); + if (network_ret < (ssize_t) path_info_header.length) { + if (network_ret == 0) { + /* Orderly shutdown. Not necessary to print an error. */ + DBG("Socket %d did an orderly shutdown", conn->sock->fd); + } else { + ERR("Reception of mkdir path argument failed with ret = %zi, expected %" PRIu32, + network_ret, path_info_header.length); + } + ret = -1; + goto end_no_session; + } + + path = create_output_path(path_info->path); + if (!path) { + ERR("Failed to create output path"); + ret = -1; + goto end; + } + + ret = utils_mkdir_recursive(path, S_IRWXU | S_IRWXG, -1, -1); + if (ret < 0) { + ERR("relay creating output directory"); + goto end; + } + + ret = 0; + +end: + memset(&reply, 0, sizeof(reply)); + if (ret < 0) { + reply.ret_code = htobe32(LTTNG_ERR_UNK); + } else { + reply.ret_code = htobe32(LTTNG_OK); + } + network_ret = conn->sock->ops->sendmsg(conn->sock, &reply, + sizeof(struct lttcomm_relayd_generic_reply), 0); + if (network_ret < (ssize_t) sizeof(struct lttcomm_relayd_generic_reply)) { + ERR("Failed to send mkdir command status code with ret = %zi, expected %zu", + network_ret, + sizeof(struct lttcomm_relayd_generic_reply)); + ret = -1; + } + +end_no_session: + free(path); + free(path_info); + return ret; +} + +static int validate_rotate_rename_path_length(const char *path_type, + uint32_t path_length) +{ + int ret = 0; + + if (path_length > LTTNG_PATH_MAX) { + ret = -ENAMETOOLONG; + ERR("rotate rename \"%s\" path name length (%" PRIu32 " bytes) exceeds the allowed size of %i bytes", + path_type, path_length, LTTNG_PATH_MAX); + } else if (path_length == 0) { + ret = -EINVAL; + ERR("rotate rename \"%s\" path name has an illegal length of 0", path_type); + } + return ret; +} + +/* + * relay_rotate_rename: rename the trace folder after a rotation is + * completed. We are not closing any fd here, just moving the folder, so it + * works even if data is still in-flight. + */ +static int relay_rotate_rename(struct lttcomm_relayd_hdr *recv_hdr, + struct relay_connection *conn) +{ + int ret; + ssize_t network_ret; + struct relay_session *session = conn->session; + struct lttcomm_relayd_generic_reply reply; + struct lttcomm_relayd_rotate_rename header; + char *received_paths = NULL; + size_t received_paths_size; + const char *received_old_path, *received_new_path; + char *complete_old_path = NULL, *complete_new_path = NULL; + + if (!session || !conn->version_check_done) { + ERR("Trying to rename a trace folder before version check"); + ret = -1; + goto end_no_reply; + } + + if (session->major == 2 && session->minor < 11) { + ERR("relay_rotate_rename command is unsupported before LTTng 2.11"); + ret = -1; + goto end_no_reply; + } + + network_ret = conn->sock->ops->recvmsg(conn->sock, &header, + sizeof(header), 0); + if (network_ret < (ssize_t) sizeof(header)) { + if (network_ret == 0) { + /* Orderly shutdown. Not necessary to print an error. */ + DBG("Socket %d did an orderly shutdown", + conn->sock->fd); + } else { + ERR("Relay didn't receive a valid rotate_rename command header: expected %zu bytes, recvmsg() returned %zi", + sizeof(header), network_ret); + } + ret = -1; + goto end_no_reply; + } + + header.old_path_length = be32toh(header.old_path_length); + header.new_path_length = be32toh(header.new_path_length); + received_paths_size = header.old_path_length + header.new_path_length; + + /* Ensure the paths don't exceed their allowed size. */ + ret = validate_rotate_rename_path_length("old", header.old_path_length); + if (ret) { + goto end; + } + ret = validate_rotate_rename_path_length("new", header.new_path_length); + if (ret) { + goto end; + } + + received_paths = zmalloc(received_paths_size); + if (!received_paths) { + PERROR("Could not allocate rotate commands paths reception buffer"); + ret = -1; + goto end; + } + + network_ret = conn->sock->ops->recvmsg(conn->sock, received_paths, + received_paths_size, 0); + if (network_ret < (ssize_t) received_paths_size) { + if (network_ret == 0) { + /* Orderly shutdown. Not necessary to print an error. */ + DBG("Socket %d did an orderly shutdown", + conn->sock->fd); + } else { + ERR("Relay failed to received rename command paths (%zu bytes): recvmsg() returned %zi", + received_paths_size, network_ret); + } + ret = -1; + goto end_no_reply; + } + + /* Validate that both paths received are NULL terminated. */ + if (received_paths[header.old_path_length - 1] != '\0') { + ERR("relay_rotate_rename command's \"old\" path is invalid (not NULL terminated)"); + ret = -1; + goto end; + } + if (received_paths[received_paths_size - 1] != '\0') { + ERR("relay_rotate_rename command's \"new\" path is invalid (not NULL terminated)"); + ret = -1; + goto end; + } + + received_old_path = received_paths; + received_new_path = received_paths + header.old_path_length; + + complete_old_path = create_output_path(received_old_path); + if (!complete_old_path) { + ERR("Failed to build old output path in rotate_rename command"); + ret = -1; + goto end; + } + + complete_new_path = create_output_path(received_new_path); + if (!complete_new_path) { + ERR("Failed to build new output path in rotate_rename command"); + ret = -1; + goto end; + } + + ret = utils_mkdir_recursive(complete_new_path, S_IRWXU | S_IRWXG, + -1, -1); + if (ret < 0) { + ERR("Failed to mkdir() rotate_rename's \"new\" output directory at \"%s\"", + complete_new_path); + goto end; + } + + /* + * If a domain has not yet created its channel, the domain-specific + * folder might not exist, but this is not an error. + */ + ret = rename(complete_old_path, complete_new_path); + if (ret < 0 && errno != ENOENT) { + PERROR("Renaming chunk in rotate_rename command from \"%s\" to \"%s\"", + complete_old_path, complete_new_path); + goto end; + } + ret = 0; + +end: + memset(&reply, 0, sizeof(reply)); + if (ret < 0) { + reply.ret_code = htobe32(LTTNG_ERR_UNK); + } else { + reply.ret_code = htobe32(LTTNG_OK); + } + network_ret = conn->sock->ops->sendmsg(conn->sock, &reply, + sizeof(struct lttcomm_relayd_generic_reply), 0); + if (network_ret < sizeof(struct lttcomm_relayd_generic_reply)) { + ERR("Relay sending stream id"); + ret = -1; + } + +end_no_reply: + free(received_paths); + free(complete_old_path); + free(complete_new_path); + return ret; +} + /* * Process the commands received on the control socket */ @@ -2055,6 +2446,15 @@ static int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr, case RELAYD_STREAMS_SENT: ret = relay_streams_sent(recv_hdr, conn); break; + case RELAYD_RESET_METADATA: + ret = relay_reset_metadata(recv_hdr, conn); + break; + case RELAYD_ROTATE_RENAME: + ret = relay_rotate_rename(recv_hdr, conn); + break; + case RELAYD_MKDIR: + ret = relay_mkdir(recv_hdr, conn); + break; case RELAYD_UPDATE_SYNC_INFO: default: ERR("Received unknown command (%u)", be32toh(recv_hdr->cmd)); @@ -2084,8 +2484,8 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num, /* Get data offset because we are about to update the index. */ data_offset = htobe64(stream->tracefile_size_current); - DBG("handle_index_data: stream %" PRIu64 " data offset %" PRIu64, - stream->stream_handle, stream->tracefile_size_current); + DBG("handle_index_data: stream %" PRIu64 " net_seq_num %" PRIu64 " data offset %" PRIu64, + stream->stream_handle, net_seq_num, stream->tracefile_size_current); /* * Lookup for an existing index for that stream id/sequence @@ -2098,41 +2498,36 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num, goto end; } - if (rotate_index || !stream->index_fd) { - int fd; + if (rotate_index || !stream->index_file) { + uint32_t major, minor; - /* Put ref on previous index_fd. */ - if (stream->index_fd) { - stream_fd_put(stream->index_fd); - stream->index_fd = NULL; + /* Put ref on previous index_file. */ + if (stream->index_file) { + lttng_index_file_put(stream->index_file); + stream->index_file = NULL; } - - fd = index_create_file(stream->path_name, stream->channel_name, + major = stream->trace->session->major; + minor = stream->trace->session->minor; + stream->index_file = lttng_index_file_create(stream->path_name, + stream->channel_name, -1, -1, stream->tracefile_size, - tracefile_array_get_file_index_head(stream->tfa)); - if (fd < 0) { - ret = -1; - /* Put self-ref for this index due to error. */ - relay_index_put(index); - goto end; - } - stream->index_fd = stream_fd_create(fd); - if (!stream->index_fd) { + tracefile_array_get_file_index_head(stream->tfa), + lttng_to_index_major(major, minor), + lttng_to_index_minor(major, minor)); + if (!stream->index_file) { ret = -1; - if (close(fd)) { - PERROR("Error closing FD %d", fd); - } /* Put self-ref for this index due to error. */ relay_index_put(index); - /* Will put the local ref. */ + index = NULL; goto end; } } - if (relay_index_set_fd(index, stream->index_fd, data_offset)) { + if (relay_index_set_file(index, stream->index_file, data_offset)) { ret = -1; /* Put self-ref for this index due to error. */ relay_index_put(index); + index = NULL; goto end; } @@ -2146,6 +2541,7 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num, } else { /* Put self-ref for this index due to error. */ relay_index_put(index); + index = NULL; ret = -1; } end: @@ -2166,6 +2562,9 @@ static int relay_process_data(struct relay_connection *conn) uint32_t data_size; struct relay_session *session; bool new_stream = false, close_requested = false; + size_t chunk_size = RECV_DATA_BUFFER_SIZE; + size_t recv_off = 0; + char data_buffer[chunk_size]; ret = conn->sock->ops->recvmsg(conn->sock, &data_hdr, sizeof(struct lttcomm_relayd_data_hdr), 0); @@ -2183,39 +2582,17 @@ static int relay_process_data(struct relay_connection *conn) stream_id = be64toh(data_hdr.stream_id); stream = stream_get_by_id(stream_id); if (!stream) { + ERR("relay_process_data: Cannot find stream %" PRIu64, stream_id); ret = -1; goto end; } session = stream->trace->session; data_size = be32toh(data_hdr.data_size); - if (data_buffer_size < data_size) { - char *tmp_data_ptr; - - tmp_data_ptr = realloc(data_buffer, data_size); - if (!tmp_data_ptr) { - ERR("Allocating data buffer"); - free(data_buffer); - ret = -1; - goto end_stream_put; - } - data_buffer = tmp_data_ptr; - data_buffer_size = data_size; - } - memset(data_buffer, 0, data_size); net_seq_num = be64toh(data_hdr.net_seq_num); DBG3("Receiving data of size %u for stream id %" PRIu64 " seqnum %" PRIu64, data_size, stream_id, net_seq_num); - ret = conn->sock->ops->recvmsg(conn->sock, data_buffer, data_size, 0); - if (ret <= 0) { - if (ret == 0) { - /* Orderly shutdown. Not necessary to print an error. */ - DBG("Socket %d did an orderly shutdown", conn->sock->fd); - } - ret = -1; - goto end_stream_put; - } pthread_mutex_lock(&stream->lock); @@ -2255,24 +2632,45 @@ static int relay_process_data(struct relay_connection *conn) if (session->minor >= 4 && !session->snapshot) { ret = handle_index_data(stream, net_seq_num, rotate_index); if (ret < 0) { + ERR("handle_index_data: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d", + stream->stream_handle, net_seq_num, ret); goto end_stream_unlock; } } - /* Write data to stream output fd. */ - size_ret = lttng_write(stream->stream_fd->fd, data_buffer, data_size); - if (size_ret < data_size) { - ERR("Relay error writing data to file"); - ret = -1; - goto end_stream_unlock; - } + for (recv_off = 0; recv_off < data_size; recv_off += chunk_size) { + size_t recv_size = min(data_size - recv_off, chunk_size); - DBG2("Relay wrote %zd bytes to tracefile for stream id %" PRIu64, - size_ret, stream->stream_handle); + ret = conn->sock->ops->recvmsg(conn->sock, data_buffer, recv_size, 0); + if (ret <= 0) { + if (ret == 0) { + /* Orderly shutdown. Not necessary to print an error. */ + DBG("Socket %d did an orderly shutdown", conn->sock->fd); + } else { + ERR("Socket %d error %d", conn->sock->fd, ret); + } + ret = -1; + goto end_stream_unlock; + } + + /* Write data to stream output fd. */ + size_ret = lttng_write(stream->stream_fd->fd, data_buffer, + recv_size); + if (size_ret < recv_size) { + ERR("Relay error writing data to file"); + ret = -1; + goto end_stream_unlock; + } + + DBG2("Relay wrote %zd bytes to tracefile for stream id %" PRIu64, + size_ret, stream->stream_handle); + } ret = write_padding_to_file(stream->stream_fd->fd, be32toh(data_hdr.padding_size)); if (ret < 0) { + ERR("write_padding_to_file: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d", + stream->stream_handle, net_seq_num, ret); goto end_stream_unlock; } stream->tracefile_size_current += @@ -2295,7 +2693,6 @@ end_stream_unlock: uatomic_set(&session->new_streams, 1); pthread_mutex_unlock(&session->lock); } -end_stream_put: stream_put(stream); end: return ret; @@ -2594,6 +2991,11 @@ error: destroy_conn, sock_n.node) { health_code_update(); + + if (session_abort(destroy_conn->session)) { + assert(0); + } + /* * No need to grab another ref, because we own * destroy_conn. @@ -2613,7 +3015,6 @@ relay_connections_ht_error: DBG("Thread exited with error"); } DBG("Worker thread cleanup complete"); - free(data_buffer); error_testpoint: if (err) { health_error(); @@ -2696,7 +3097,6 @@ int main(int argc, char **argv) } } - /* Initialize thread health monitoring */ health_relayd = health_app_create(NR_HEALTH_RELAYD_TYPES); if (!health_relayd) { @@ -2711,15 +3111,6 @@ int main(int argc, char **argv) goto exit_init_data; } - /* Check if daemon is UID = 0 */ - if (!getuid()) { - if (control_uri->port < 1024 || data_uri->port < 1024 || live_uri->port < 1024) { - ERR("Need to be root to use ports < 1024"); - retval = -1; - goto exit_init_data; - } - } - /* Setup the thread apps communication pipe. */ if (create_relay_conn_pipe()) { retval = -1; @@ -2729,12 +3120,6 @@ int main(int argc, char **argv) /* Init relay command queue. */ cds_wfcq_init(&relay_conn_queue.head, &relay_conn_queue.tail); - /* Set up max poll set size */ - if (lttng_poll_set_max_size()) { - retval = -1; - goto exit_init_data; - } - /* Initialize communication library */ lttcomm_init(); lttcomm_inet_init(); @@ -2767,7 +3152,7 @@ int main(int argc, char **argv) } /* Create thread to manage the client socket */ - ret = pthread_create(&health_thread, NULL, + ret = pthread_create(&health_thread, default_pthread_attr(), thread_manage_health, (void *) NULL); if (ret) { errno = ret; @@ -2777,7 +3162,7 @@ int main(int argc, char **argv) } /* Setup the dispatcher thread */ - ret = pthread_create(&dispatcher_thread, NULL, + ret = pthread_create(&dispatcher_thread, default_pthread_attr(), relay_thread_dispatcher, (void *) NULL); if (ret) { errno = ret; @@ -2787,7 +3172,7 @@ int main(int argc, char **argv) } /* Setup the worker thread */ - ret = pthread_create(&worker_thread, NULL, + ret = pthread_create(&worker_thread, default_pthread_attr(), relay_thread_worker, NULL); if (ret) { errno = ret; @@ -2797,7 +3182,7 @@ int main(int argc, char **argv) } /* Setup the listener thread */ - ret = pthread_create(&listener_thread, NULL, + ret = pthread_create(&listener_thread, default_pthread_attr(), relay_thread_listener, (void *) NULL); if (ret) { errno = ret; @@ -2863,6 +3248,12 @@ exit_init_data: health_app_destroy(health_relayd); exit_health_app_create: exit_options: + /* + * Wait for all pending call_rcu work to complete before tearing + * down data structures. call_rcu worker may be trying to + * perform lookups in those structures. + */ + rcu_barrier(); relayd_cleanup(); /* Ensure all prior call_rcu are done. */