X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.c;h=45b7de8c657f9b43a426057fc07f7a299689dc75;hp=9c0e2b1e37158d728ace4bae91a21a70d9d80a93;hb=4f5fb4c3d8752aae822ed0066784cc77e6f0f508;hpb=3a90409872c4a2f631b1efa1b37551b713a961ad diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index 9c0e2b1e3..45b7de8c6 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -54,6 +54,7 @@ #include #include #include +#include #include #include @@ -70,6 +71,15 @@ #include "stream.h" #include "connection.h" #include "tracefile-array.h" +#include "tcp_keep_alive.h" + +static const char *help_msg = +#ifdef LTTNG_EMBED_HELP +#include +#else +NULL +#endif +; /* command line options */ char *opt_output_path; @@ -84,6 +94,7 @@ static int lttng_relay_ready = NR_LTTNG_RELAY_READY; /* Size of receive buffer. */ #define RECV_DATA_BUFFER_SIZE 65536 +#define FILE_COPY_BUFFER_SIZE 65536 static int recv_child_signal; /* Set to 1 when a SIGUSR1 signal is received. */ static pid_t child_ppid; /* Internal parent PID use with daemonize. */ @@ -250,9 +261,9 @@ static int set_option(int opt, const char *arg, const char *optname) } break; case 'h': - ret = utils_show_man_page(8, "lttng-relayd"); + ret = utils_show_help(8, "lttng-relayd", help_msg); if (ret) { - ERR("Cannot view man page lttng-relayd(8)"); + ERR("Cannot show --help for `lttng-relayd`"); perror("exec"); } exit(EXIT_FAILURE); @@ -891,6 +902,15 @@ restart: lttcomm_destroy_sock(newsock); goto error; } + + ret = socket_apply_keep_alive_config(newsock->fd); + if (ret < 0) { + ERR("Failed to apply TCP keep-alive configuration on socket (%i)", + newsock->fd); + lttcomm_destroy_sock(newsock); + goto error; + } + new_conn = connection_create(newsock, type); if (!new_conn) { lttcomm_destroy_sock(newsock); @@ -969,12 +989,16 @@ static void *relay_thread_dispatcher(void *data) health_code_update(); - while (!CMM_LOAD_SHARED(dispatch_thread_exit)) { + for (;;) { health_code_update(); /* Atomically prepare the queue futex */ futex_nto1_prepare(&relay_conn_queue.futex); + if (CMM_LOAD_SHARED(dispatch_thread_exit)) { + break; + } + do { health_code_update(); @@ -1481,6 +1505,247 @@ end: return ret; } +/* + * Close the current index file if it is open, and create a new one. + * + * Return 0 on success, -1 on error. + */ +static +int create_rotate_index_file(struct relay_stream *stream) +{ + int ret; + uint32_t major, minor; + + /* Put ref on previous index_file. */ + if (stream->index_file) { + lttng_index_file_put(stream->index_file); + stream->index_file = NULL; + } + major = stream->trace->session->major; + minor = stream->trace->session->minor; + stream->index_file = lttng_index_file_create(stream->path_name, + stream->channel_name, + -1, -1, stream->tracefile_size, + tracefile_array_get_file_index_head(stream->tfa), + lttng_to_index_major(major, minor), + lttng_to_index_minor(major, minor)); + if (!stream->index_file) { + ret = -1; + goto end; + } + + ret = 0; + +end: + return ret; +} + +static +int do_rotate_stream(struct relay_stream *stream) +{ + int ret; + + /* Perform the stream rotation. */ + ret = utils_rotate_stream_file(stream->path_name, + stream->channel_name, stream->tracefile_size, + stream->tracefile_count, -1, + -1, stream->stream_fd->fd, + NULL, &stream->stream_fd->fd); + if (ret < 0) { + ERR("Rotating stream output file"); + goto end; + } + stream->tracefile_size_current = 0; + + /* Rotate also the index if the stream is not a metadata stream. */ + if (!stream->is_metadata) { + ret = create_rotate_index_file(stream); + if (ret < 0) { + ERR("Failed to rotate index file"); + goto end; + } + } + + stream->rotate_at_seq_num = -1ULL; + stream->pos_after_last_complete_data_index = 0; + +end: + return ret; +} + +/* + * If too much data has been written in a tracefile before we received the + * rotation command, we have to move the excess data to the new tracefile and + * perform the rotation. This can happen because the control and data + * connections are separate, the indexes as well as the commands arrive from + * the control connection and we have no control over the order so we could be + * in a situation where too much data has been received on the data connection + * before the rotation command on the control connection arrives. We don't need + * to update the index because its order is guaranteed with the rotation + * command message. + */ +static +int rotate_truncate_stream(struct relay_stream *stream) +{ + int ret, new_fd; + uint64_t diff, pos = 0; + char buf[FILE_COPY_BUFFER_SIZE]; + + assert(!stream->is_metadata); + + assert(stream->tracefile_size_current > + stream->pos_after_last_complete_data_index); + diff = stream->tracefile_size_current - + stream->pos_after_last_complete_data_index; + + /* Create the new tracefile. */ + new_fd = utils_create_stream_file(stream->path_name, + stream->channel_name, + stream->tracefile_size, stream->tracefile_count, + /* uid */ -1, /* gid */ -1, /* suffix */ NULL); + if (new_fd < 0) { + ERR("Failed to create new stream file at path %s for channel %s", + stream->path_name, stream->channel_name); + ret = -1; + goto end; + } + + /* + * Rewind the current tracefile to the position at which the rotation + * should have occured. + */ + ret = lseek(stream->stream_fd->fd, + stream->pos_after_last_complete_data_index, SEEK_SET); + if (ret < 0) { + PERROR("seek truncate stream"); + goto end; + } + + /* Move data from the old file to the new file. */ + while (pos < diff) { + uint64_t count, bytes_left; + ssize_t io_ret; + + bytes_left = diff - pos; + count = bytes_left > sizeof(buf) ? sizeof(buf) : bytes_left; + assert(count <= SIZE_MAX); + + io_ret = lttng_read(stream->stream_fd->fd, buf, count); + if (io_ret < (ssize_t) count) { + char error_string[256]; + + snprintf(error_string, sizeof(error_string), + "Failed to read %" PRIu64 " bytes from fd %i in rotate_truncate_stream(), returned %zi", + count, stream->stream_fd->fd, io_ret); + if (io_ret == -1) { + PERROR("%s", error_string); + } else { + ERR("%s", error_string); + } + ret = -1; + goto end; + } + + io_ret = lttng_write(new_fd, buf, count); + if (io_ret < (ssize_t) count) { + char error_string[256]; + + snprintf(error_string, sizeof(error_string), + "Failed to write %" PRIu64 " bytes from fd %i in rotate_truncate_stream(), returned %zi", + count, new_fd, io_ret); + if (io_ret == -1) { + PERROR("%s", error_string); + } else { + ERR("%s", error_string); + } + ret = -1; + goto end; + } + + pos += count; + } + + /* Truncate the file to get rid of the excess data. */ + ret = ftruncate(stream->stream_fd->fd, + stream->pos_after_last_complete_data_index); + if (ret) { + PERROR("ftruncate"); + goto end; + } + + ret = close(stream->stream_fd->fd); + if (ret < 0) { + PERROR("Closing tracefile"); + goto end; + } + + ret = create_rotate_index_file(stream); + if (ret < 0) { + ERR("Rotate stream index file"); + goto end; + } + + /* + * Update the offset and FD of all the eventual indexes created by the + * data connection before the rotation command arrived. + */ + ret = relay_index_switch_all_files(stream); + if (ret < 0) { + ERR("Failed to rotate index file"); + goto end; + } + + stream->stream_fd->fd = new_fd; + stream->tracefile_size_current = diff; + stream->pos_after_last_complete_data_index = 0; + stream->rotate_at_seq_num = -1ULL; + + ret = 0; + +end: + return ret; +} + +/* + * Check if a stream should perform a rotation (for session rotation). + * Must be called with the stream lock held. + * + * Return 0 on success, a negative value on error. + */ +static +int try_rotate_stream(struct relay_stream *stream) +{ + int ret = 0; + + /* No rotation expected. */ + if (stream->rotate_at_seq_num == -1ULL) { + goto end; + } + + if (stream->prev_seq < stream->rotate_at_seq_num) { + DBG("Stream %" PRIu64 " no yet ready for rotation", + stream->stream_handle); + goto end; + } else if (stream->prev_seq > stream->rotate_at_seq_num) { + DBG("Rotation after too much data has been written in tracefile " + "for stream %" PRIu64 ", need to truncate before " + "rotating", stream->stream_handle); + ret = rotate_truncate_stream(stream); + if (ret) { + ERR("Failed to truncate stream"); + goto end; + } + } else { + /* stream->prev_seq == stream->rotate_at_seq_num */ + DBG("Stream %" PRIu64 " ready for rotation", + stream->stream_handle); + ret = do_rotate_stream(stream); + } + +end: + return ret; +} + /* * relay_recv_metadata: receive the metadata for the session. */ @@ -1564,6 +1829,11 @@ static int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr, DBG2("Relay metadata written. Updated metadata_received %" PRIu64, metadata_stream->metadata_received); + ret = try_rotate_stream(metadata_stream); + if (ret < 0) { + goto end_put; + } + end_put: pthread_mutex_unlock(&metadata_stream->lock); stream_put(metadata_stream); @@ -1579,6 +1849,7 @@ static int relay_send_version(struct lttcomm_relayd_hdr *recv_hdr, { int ret; struct lttcomm_relayd_version reply, msg; + bool compatible = true; conn->version_check_done = 1; @@ -1603,9 +1874,7 @@ static int relay_send_version(struct lttcomm_relayd_hdr *recv_hdr, if (reply.major != be32toh(msg.major)) { DBG("Incompatible major versions (%u vs %u), deleting session", reply.major, be32toh(msg.major)); - connection_put(conn); - ret = 0; - goto end; + compatible = false; } conn->major = reply.major; @@ -1624,6 +1893,11 @@ static int relay_send_version(struct lttcomm_relayd_hdr *recv_hdr, ERR("Relay sending version"); } + if (!compatible) { + ret = -1; + goto end; + } + DBG("Version check done using protocol %u.%u", conn->major, conn->minor); @@ -1946,6 +2220,7 @@ static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, struct lttcomm_relayd_generic_reply reply; struct relay_stream *stream; uint64_t net_seq_num; + size_t msg_len; assert(conn); @@ -1957,9 +2232,12 @@ static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, goto end_no_session; } + msg_len = lttcomm_relayd_index_len( + lttng_to_index_major(conn->major, conn->minor), + lttng_to_index_minor(conn->major, conn->minor)); ret = conn->sock->ops->recvmsg(conn->sock, &index_info, - sizeof(index_info), 0); - if (ret < sizeof(index_info)) { + msg_len, 0); + if (ret < msg_len) { if (ret == 0) { /* Orderly shutdown. Not necessary to print an error. */ DBG("Socket %d did an orderly shutdown", conn->sock->fd); @@ -2019,6 +2297,7 @@ static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, if (ret == 0) { tracefile_array_commit_seq(stream->tfa); stream->index_received_seqcount++; + stream->pos_after_last_complete_data_index += index->total_size; } else if (ret > 0) { /* no flush. */ ret = 0; @@ -2092,6 +2371,530 @@ end_no_session: return ret; } +/* + * relay_rotate_stream: rotate a stream to a new tracefile for the session + * rotation feature (not the tracefile rotation feature). + */ +static int relay_rotate_session_stream(struct lttcomm_relayd_hdr *recv_hdr, + struct relay_connection *conn) +{ + int ret, send_ret; + struct relay_session *session = conn->session; + struct lttcomm_relayd_rotate_stream stream_info; + struct lttcomm_relayd_generic_reply reply; + struct relay_stream *stream; + size_t len; + char *new_pathname = NULL; + + DBG("Rotate stream received"); + + if (!session || !conn->version_check_done) { + ERR("Trying to rotate a stream before version check"); + ret = -1; + goto end_no_reply; + } + + if (session->major == 2 && session->minor < 11) { + ERR("Unsupported feature before 2.11"); + ret = -1; + goto end_no_reply; + } + + memset(&stream_info, 0, sizeof(struct lttcomm_relayd_rotate_stream)); + + /* + * Receive the struct up to the new_pathname member since we don't know + * its size yet. + */ + ret = conn->sock->ops->recvmsg(conn->sock, &stream_info, + sizeof(struct lttcomm_relayd_rotate_stream), 0); + if (ret < sizeof(struct lttcomm_relayd_rotate_stream)) { + if (ret == 0) { + /* Orderly shutdown. Not necessary to print an error. */ + DBG("Socket %d did an orderly shutdown", conn->sock->fd); + } else { + ERR("Relay didn't receive valid rotate_stream struct size : %d", ret); + } + ret = -1; + goto end_no_reply; + } + + stream = stream_get_by_id(be64toh(stream_info.stream_id)); + if (!stream) { + ret = -1; + goto end; + } + + len = be32toh(stream_info.pathname_length); + /* Ensure it fits in local filename length. */ + if (len >= LTTNG_PATH_MAX) { + ret = -ENAMETOOLONG; + ERR("Length of relay_rotate_session_stream command's path name (%zu bytes) exceeds the maximal allowed length of %i bytes", + len, LTTNG_PATH_MAX); + goto end; + } + + new_pathname = zmalloc(len); + if (!new_pathname) { + PERROR("Failed to allocation new path name of relay_rotate_session_stream command"); + ret = -1; + goto end; + } + + ret = conn->sock->ops->recvmsg(conn->sock, new_pathname, len, 0); + if (ret < len) { + if (ret == 0) { + /* Orderly shutdown. Not necessary to print an error. */ + DBG("Socket %d did an orderly shutdown", conn->sock->fd); + } else { + ERR("Relay didn't receive valid rotate_stream struct size : %d", ret); + } + ret = -1; + goto end_no_reply; + } + + pthread_mutex_lock(&stream->lock); + + /* + * Update the trace path (just the folder, the stream name does not + * change). + */ + free(stream->path_name); + stream->path_name = create_output_path(new_pathname); + if (!stream->path_name) { + ERR("Failed to create a new output path"); + goto end_stream_unlock; + } + ret = utils_mkdir_recursive(stream->path_name, S_IRWXU | S_IRWXG, + -1, -1); + if (ret < 0) { + ERR("relay creating output directory"); + goto end_stream_unlock; + } + stream->chunk_id = be64toh(stream_info.new_chunk_id); + + if (stream->is_metadata) { + /* + * The metadata stream is sent only over the control connection + * so we know we have all the data to perform the stream + * rotation. + */ + ret = do_rotate_stream(stream); + } else { + stream->rotate_at_seq_num = be64toh(stream_info.rotate_at_seq_num); + ret = try_rotate_stream(stream); + } + if (ret < 0) { + goto end_stream_unlock; + } + +end_stream_unlock: + pthread_mutex_unlock(&stream->lock); + stream_put(stream); +end: + memset(&reply, 0, sizeof(reply)); + if (ret < 0) { + reply.ret_code = htobe32(LTTNG_ERR_UNK); + } else { + reply.ret_code = htobe32(LTTNG_OK); + } + send_ret = conn->sock->ops->sendmsg(conn->sock, &reply, + sizeof(struct lttcomm_relayd_generic_reply), 0); + if (send_ret < 0) { + ERR("Failed to send reply of rotate session stream command"); + ret = send_ret; + } + +end_no_reply: + free(new_pathname); + return ret; +} + +/* + * relay_mkdir: Create a folder on the disk. + */ +static int relay_mkdir(struct lttcomm_relayd_hdr *recv_hdr, + struct relay_connection *conn) +{ + int ret; + ssize_t network_ret; + struct relay_session *session = conn->session; + struct lttcomm_relayd_mkdir path_info_header; + struct lttcomm_relayd_mkdir *path_info = NULL; + struct lttcomm_relayd_generic_reply reply; + char *path = NULL; + + if (!session || !conn->version_check_done) { + ERR("Trying to create a directory before version check"); + ret = -1; + goto end_no_session; + } + + if (session->major == 2 && session->minor < 11) { + /* + * This client is not supposed to use this command since + * it predates its introduction. + */ + ERR("relay_mkdir command is unsupported before LTTng 2.11"); + ret = -1; + goto end_no_session; + } + + network_ret = conn->sock->ops->recvmsg(conn->sock, &path_info_header, + sizeof(path_info_header), 0); + if (network_ret < (ssize_t) sizeof(path_info_header)) { + if (network_ret == 0) { + /* Orderly shutdown. Not necessary to print an error. */ + DBG("Socket %d did an orderly shutdown", conn->sock->fd); + } else { + ERR("Reception of mkdir command argument length failed with ret = %zi, expected %zu", + network_ret, sizeof(path_info_header)); + } + ret = -1; + goto end_no_session; + } + + path_info_header.length = be32toh(path_info_header.length); + + /* Ensure that it fits in local path length. */ + if (path_info_header.length >= LTTNG_PATH_MAX) { + ret = -ENAMETOOLONG; + ERR("Path name argument of mkdir command (%" PRIu32 " bytes) exceeds the maximal length allowed (%d bytes)", + path_info_header.length, LTTNG_PATH_MAX); + goto end; + } + + path_info = zmalloc(sizeof(path_info_header) + path_info_header.length); + if (!path_info) { + PERROR("zmalloc of mkdir command path"); + ret = -1; + goto end; + } + + network_ret = conn->sock->ops->recvmsg(conn->sock, path_info->path, + path_info_header.length, 0); + if (network_ret < (ssize_t) path_info_header.length) { + if (network_ret == 0) { + /* Orderly shutdown. Not necessary to print an error. */ + DBG("Socket %d did an orderly shutdown", conn->sock->fd); + } else { + ERR("Reception of mkdir path argument failed with ret = %zi, expected %" PRIu32, + network_ret, path_info_header.length); + } + ret = -1; + goto end_no_session; + } + + path = create_output_path(path_info->path); + if (!path) { + ERR("Failed to create output path"); + ret = -1; + goto end; + } + + ret = utils_mkdir_recursive(path, S_IRWXU | S_IRWXG, -1, -1); + if (ret < 0) { + ERR("relay creating output directory"); + goto end; + } + + ret = 0; + +end: + memset(&reply, 0, sizeof(reply)); + if (ret < 0) { + reply.ret_code = htobe32(LTTNG_ERR_UNK); + } else { + reply.ret_code = htobe32(LTTNG_OK); + } + network_ret = conn->sock->ops->sendmsg(conn->sock, &reply, + sizeof(struct lttcomm_relayd_generic_reply), 0); + if (network_ret < (ssize_t) sizeof(struct lttcomm_relayd_generic_reply)) { + ERR("Failed to send mkdir command status code with ret = %zi, expected %zu", + network_ret, + sizeof(struct lttcomm_relayd_generic_reply)); + ret = -1; + } + +end_no_session: + free(path); + free(path_info); + return ret; +} + +static int validate_rotate_rename_path_length(const char *path_type, + uint32_t path_length) +{ + int ret = 0; + + if (path_length > LTTNG_PATH_MAX) { + ret = -ENAMETOOLONG; + ERR("rotate rename \"%s\" path name length (%" PRIu32 " bytes) exceeds the allowed size of %i bytes", + path_type, path_length, LTTNG_PATH_MAX); + } else if (path_length == 0) { + ret = -EINVAL; + ERR("rotate rename \"%s\" path name has an illegal length of 0", path_type); + } + return ret; +} + +/* + * relay_rotate_rename: rename the trace folder after a rotation is + * completed. We are not closing any fd here, just moving the folder, so it + * works even if data is still in-flight. + */ +static int relay_rotate_rename(struct lttcomm_relayd_hdr *recv_hdr, + struct relay_connection *conn) +{ + int ret; + ssize_t network_ret; + struct relay_session *session = conn->session; + struct lttcomm_relayd_generic_reply reply; + struct lttcomm_relayd_rotate_rename header; + char *received_paths = NULL; + size_t received_paths_size; + const char *received_old_path, *received_new_path; + char *complete_old_path = NULL, *complete_new_path = NULL; + + if (!session || !conn->version_check_done) { + ERR("Trying to rename a trace folder before version check"); + ret = -1; + goto end_no_reply; + } + + if (session->major == 2 && session->minor < 11) { + ERR("relay_rotate_rename command is unsupported before LTTng 2.11"); + ret = -1; + goto end_no_reply; + } + + network_ret = conn->sock->ops->recvmsg(conn->sock, &header, + sizeof(header), 0); + if (network_ret < (ssize_t) sizeof(header)) { + if (network_ret == 0) { + /* Orderly shutdown. Not necessary to print an error. */ + DBG("Socket %d did an orderly shutdown", + conn->sock->fd); + } else { + ERR("Relay didn't receive a valid rotate_rename command header: expected %zu bytes, recvmsg() returned %zi", + sizeof(header), network_ret); + } + ret = -1; + goto end_no_reply; + } + + header.old_path_length = be32toh(header.old_path_length); + header.new_path_length = be32toh(header.new_path_length); + received_paths_size = header.old_path_length + header.new_path_length; + + /* Ensure the paths don't exceed their allowed size. */ + ret = validate_rotate_rename_path_length("old", header.old_path_length); + if (ret) { + goto end; + } + ret = validate_rotate_rename_path_length("new", header.new_path_length); + if (ret) { + goto end; + } + + received_paths = zmalloc(received_paths_size); + if (!received_paths) { + PERROR("Could not allocate rotate commands paths reception buffer"); + ret = -1; + goto end; + } + + network_ret = conn->sock->ops->recvmsg(conn->sock, received_paths, + received_paths_size, 0); + if (network_ret < (ssize_t) received_paths_size) { + if (network_ret == 0) { + /* Orderly shutdown. Not necessary to print an error. */ + DBG("Socket %d did an orderly shutdown", + conn->sock->fd); + } else { + ERR("Relay failed to received rename command paths (%zu bytes): recvmsg() returned %zi", + received_paths_size, network_ret); + } + ret = -1; + goto end_no_reply; + } + + /* Validate that both paths received are NULL terminated. */ + if (received_paths[header.old_path_length - 1] != '\0') { + ERR("relay_rotate_rename command's \"old\" path is invalid (not NULL terminated)"); + ret = -1; + goto end; + } + if (received_paths[received_paths_size - 1] != '\0') { + ERR("relay_rotate_rename command's \"new\" path is invalid (not NULL terminated)"); + ret = -1; + goto end; + } + + received_old_path = received_paths; + received_new_path = received_paths + header.old_path_length; + + complete_old_path = create_output_path(received_old_path); + if (!complete_old_path) { + ERR("Failed to build old output path in rotate_rename command"); + ret = -1; + goto end; + } + + complete_new_path = create_output_path(received_new_path); + if (!complete_new_path) { + ERR("Failed to build new output path in rotate_rename command"); + ret = -1; + goto end; + } + + ret = utils_mkdir_recursive(complete_new_path, S_IRWXU | S_IRWXG, + -1, -1); + if (ret < 0) { + ERR("Failed to mkdir() rotate_rename's \"new\" output directory at \"%s\"", + complete_new_path); + goto end; + } + + /* + * If a domain has not yet created its channel, the domain-specific + * folder might not exist, but this is not an error. + */ + ret = rename(complete_old_path, complete_new_path); + if (ret < 0 && errno != ENOENT) { + PERROR("Renaming chunk in rotate_rename command from \"%s\" to \"%s\"", + complete_old_path, complete_new_path); + goto end; + } + ret = 0; + +end: + memset(&reply, 0, sizeof(reply)); + if (ret < 0) { + reply.ret_code = htobe32(LTTNG_ERR_UNK); + } else { + reply.ret_code = htobe32(LTTNG_OK); + } + network_ret = conn->sock->ops->sendmsg(conn->sock, &reply, + sizeof(struct lttcomm_relayd_generic_reply), 0); + if (network_ret < sizeof(struct lttcomm_relayd_generic_reply)) { + ERR("Relay sending stream id"); + ret = -1; + } + +end_no_reply: + free(received_paths); + free(complete_old_path); + free(complete_new_path); + return ret; +} + +/* + * Check if all the streams in the session have completed the last rotation. + * The chunk_id value is used to distinguish the cases where a stream was + * closed on the consumerd before the rotation started but it still active on + * the relayd, and the case where a stream appeared on the consumerd/relayd + * after the last rotation started (in that case, it is already writing in the + * new chunk folder). + */ +static +int relay_rotate_pending(struct lttcomm_relayd_hdr *recv_hdr, + struct relay_connection *conn) +{ + struct relay_session *session = conn->session; + struct lttcomm_relayd_rotate_pending msg; + struct lttcomm_relayd_generic_reply reply; + struct lttng_ht_iter iter; + struct relay_stream *stream; + int ret = 0; + ssize_t network_ret; + uint64_t chunk_id; + bool rotate_pending = false; + + DBG("Rotate pending command received"); + + if (!session || !conn->version_check_done) { + ERR("Trying to check for data before version check"); + ret = -1; + goto end_no_reply; + } + + if (session->major == 2 && session->minor < 11) { + ERR("Unsupported feature before 2.11"); + ret = -1; + goto end_no_reply; + } + + network_ret = conn->sock->ops->recvmsg(conn->sock, &msg, sizeof(msg), 0); + if (network_ret < (ssize_t) sizeof(msg)) { + if (network_ret == 0) { + /* Orderly shutdown. Not necessary to print an error. */ + DBG("Socket %d did an orderly shutdown", conn->sock->fd); + } else { + ERR("Relay didn't receive valid rotate_pending struct size : %zi", + network_ret); + } + ret = -1; + goto end_no_reply; + } + + chunk_id = be64toh(msg.chunk_id); + DBG("Evaluating rotate pending for chunk id %" PRIu64, chunk_id); + + /* + * Iterate over all the streams in the session and check if they are + * still waiting for data to perform their rotation. + */ + rcu_read_lock(); + cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream, + node.node) { + if (!stream_get(stream)) { + continue; + } + if (stream->trace->session != session) { + stream_put(stream); + continue; + } + pthread_mutex_lock(&stream->lock); + if (stream->rotate_at_seq_num != -1ULL) { + /* We have not yet performed the rotation. */ + rotate_pending = true; + DBG("Stream %" PRIu64 " is still rotating", + stream->stream_handle); + } else if (stream->chunk_id < chunk_id) { + /* + * Stream closed on the consumer but still active on the + * relay. + */ + rotate_pending = true; + DBG("Stream %" PRIu64 " did not exist on the consumer " + "when the last rotation started, but is" + "still waiting for data before getting" + "closed", + stream->stream_handle); + } + pthread_mutex_unlock(&stream->lock); + stream_put(stream); + if (rotate_pending) { + goto send_reply; + } + } + +send_reply: + rcu_read_unlock(); + memset(&reply, 0, sizeof(reply)); + reply.ret_code = htobe32(rotate_pending ? 1 : 0); + network_ret = conn->sock->ops->sendmsg(conn->sock, &reply, + sizeof(reply), 0); + if (network_ret < (ssize_t) sizeof(reply)) { + ERR("Relay rotate pending ret code failed"); + ret = -1; + } + +end_no_reply: + return ret; +} + /* * Process the commands received on the control socket */ @@ -2140,6 +2943,18 @@ static int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr, case RELAYD_RESET_METADATA: ret = relay_reset_metadata(recv_hdr, conn); break; + case RELAYD_ROTATE_STREAM: + ret = relay_rotate_session_stream(recv_hdr, conn); + break; + case RELAYD_ROTATE_RENAME: + ret = relay_rotate_rename(recv_hdr, conn); + break; + case RELAYD_ROTATE_PENDING: + ret = relay_rotate_pending(recv_hdr, conn); + break; + case RELAYD_MKDIR: + ret = relay_mkdir(recv_hdr, conn); + break; case RELAYD_UPDATE_SYNC_INFO: default: ERR("Received unknown command (%u)", be32toh(recv_hdr->cmd)); @@ -2160,7 +2975,7 @@ end: * Return 0 on success else a negative value. */ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num, - int rotate_index) + int rotate_index, bool *flushed, uint64_t total_size) { int ret = 0; uint64_t data_offset; @@ -2183,41 +2998,22 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num, goto end; } - if (rotate_index || !stream->index_fd) { - int fd; - - /* Put ref on previous index_fd. */ - if (stream->index_fd) { - stream_fd_put(stream->index_fd); - stream->index_fd = NULL; - } - - fd = index_create_file(stream->path_name, stream->channel_name, - -1, -1, stream->tracefile_size, - tracefile_array_get_file_index_head(stream->tfa)); - if (fd < 0) { - ret = -1; - /* Put self-ref for this index due to error. */ - relay_index_put(index); - goto end; - } - stream->index_fd = stream_fd_create(fd); - if (!stream->index_fd) { - ret = -1; - if (close(fd)) { - PERROR("Error closing FD %d", fd); - } + if (rotate_index || !stream->index_file) { + ret = create_rotate_index_file(stream); + if (ret < 0) { + ERR("Failed to rotate index"); /* Put self-ref for this index due to error. */ relay_index_put(index); - /* Will put the local ref. */ + index = NULL; goto end; } } - if (relay_index_set_fd(index, stream->index_fd, data_offset)) { + if (relay_index_set_file(index, stream->index_file, data_offset)) { ret = -1; /* Put self-ref for this index due to error. */ relay_index_put(index); + index = NULL; goto end; } @@ -2225,12 +3021,15 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num, if (ret == 0) { tracefile_array_commit_seq(stream->tfa); stream->index_received_seqcount++; + *flushed = true; } else if (ret > 0) { + index->total_size = total_size; /* No flush. */ ret = 0; } else { /* Put self-ref for this index due to error. */ relay_index_put(index); + index = NULL; ret = -1; } end: @@ -2254,6 +3053,7 @@ static int relay_process_data(struct relay_connection *conn) size_t chunk_size = RECV_DATA_BUFFER_SIZE; size_t recv_off = 0; char data_buffer[chunk_size]; + bool index_flushed = false; ret = conn->sock->ops->recvmsg(conn->sock, &data_hdr, sizeof(struct lttcomm_relayd_data_hdr), 0); @@ -2319,7 +3119,9 @@ static int relay_process_data(struct relay_connection *conn) * snapshot and index are NOT supported. */ if (session->minor >= 4 && !session->snapshot) { - ret = handle_index_data(stream, net_seq_num, rotate_index); + ret = handle_index_data(stream, net_seq_num, rotate_index, + &index_flushed, + data_size + be32toh(data_hdr.padding_size)); if (ret < 0) { ERR("handle_index_data: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d", stream->stream_handle, net_seq_num, ret); @@ -2367,9 +3169,18 @@ static int relay_process_data(struct relay_connection *conn) if (stream->prev_seq == -1ULL) { new_stream = true; } + if (index_flushed) { + stream->pos_after_last_complete_data_index = + stream->tracefile_size_current; + } stream->prev_seq = net_seq_num; + ret = try_rotate_stream(stream); + if (ret < 0) { + goto end_stream_unlock; + } + end_stream_unlock: close_requested = stream->close_requested; pthread_mutex_unlock(&stream->lock); @@ -2680,6 +3491,11 @@ error: destroy_conn, sock_n.node) { health_code_update(); + + if (session_abort(destroy_conn->session)) { + assert(0); + } + /* * No need to grab another ref, because we own * destroy_conn. @@ -2932,6 +3748,12 @@ exit_init_data: health_app_destroy(health_relayd); exit_health_app_create: exit_options: + /* + * Wait for all pending call_rcu work to complete before tearing + * down data structures. call_rcu worker may be trying to + * perform lookups in those structures. + */ + rcu_barrier(); relayd_cleanup(); /* Ensure all prior call_rcu are done. */