Channel rotate pipe between sessiond and the consumers
[lttng-tools.git] / src / bin / lttng-relayd / main.c
index 0eb8e28251f2451ab229dc41cf0e62726fddc7c5..4c99b56f67dcb23ee726b361e96d81d6ba0f0967 100644 (file)
@@ -70,6 +70,7 @@
 #include "stream.h"
 #include "connection.h"
 #include "tracefile-array.h"
+#include "tcp_keep_alive.h"
 
 static const char *help_msg =
 #ifdef LTTNG_EMBED_HELP
@@ -899,6 +900,15 @@ restart:
                                        lttcomm_destroy_sock(newsock);
                                        goto error;
                                }
+
+                               ret = socket_apply_keep_alive_config(newsock->fd);
+                               if (ret < 0) {
+                                       ERR("Failed to apply TCP keep-alive configuration on socket (%i)",
+                                                       newsock->fd);
+                                       lttcomm_destroy_sock(newsock);
+                                       goto error;
+                               }
+
                                new_conn = connection_create(newsock, type);
                                if (!new_conn) {
                                        lttcomm_destroy_sock(newsock);
@@ -1591,6 +1601,7 @@ static int relay_send_version(struct lttcomm_relayd_hdr *recv_hdr,
 {
        int ret;
        struct lttcomm_relayd_version reply, msg;
+       bool compatible = true;
 
        conn->version_check_done = 1;
 
@@ -1615,9 +1626,7 @@ static int relay_send_version(struct lttcomm_relayd_hdr *recv_hdr,
        if (reply.major != be32toh(msg.major)) {
                DBG("Incompatible major versions (%u vs %u), deleting session",
                                reply.major, be32toh(msg.major));
-               connection_put(conn);
-               ret = 0;
-               goto end;
+               compatible = false;
        }
 
        conn->major = reply.major;
@@ -1636,6 +1645,11 @@ static int relay_send_version(struct lttcomm_relayd_hdr *recv_hdr,
                ERR("Relay sending version");
        }
 
+       if (!compatible) {
+               ret = -1;
+               goto end;
+       }
+
        DBG("Version check done using protocol %u.%u", conn->major,
                        conn->minor);
 
@@ -2108,6 +2122,285 @@ end_no_session:
        return ret;
 }
 
+/*
+ * relay_mkdir: Create a folder on the disk.
+ */
+static int relay_mkdir(struct lttcomm_relayd_hdr *recv_hdr,
+               struct relay_connection *conn)
+{
+       int ret;
+       ssize_t network_ret;
+       struct relay_session *session = conn->session;
+       struct lttcomm_relayd_mkdir path_info_header;
+       struct lttcomm_relayd_mkdir *path_info = NULL;
+       struct lttcomm_relayd_generic_reply reply;
+       char *path = NULL;
+
+       if (!session || !conn->version_check_done) {
+               ERR("Trying to create a directory before version check");
+               ret = -1;
+               goto end_no_session;
+       }
+
+       if (session->major == 2 && session->minor < 11) {
+               /*
+                * This client is not supposed to use this command since
+                * it predates its introduction.
+                */
+               ERR("relay_mkdir command is unsupported before LTTng 2.11");
+               ret = -1;
+               goto end_no_session;
+       }
+
+       network_ret = conn->sock->ops->recvmsg(conn->sock, &path_info_header,
+                       sizeof(path_info_header), 0);
+       if (network_ret < (ssize_t) sizeof(path_info_header)) {
+               if (network_ret == 0) {
+                       /* Orderly shutdown. Not necessary to print an error. */
+                       DBG("Socket %d did an orderly shutdown", conn->sock->fd);
+               } else {
+                       ERR("Reception of mkdir command argument length failed with ret = %zi, expected %zu",
+                                       network_ret, sizeof(path_info_header));
+               }
+               ret = -1;
+               goto end_no_session;
+       }
+
+       path_info_header.length = be32toh(path_info_header.length);
+
+       /* Ensure that it fits in local path length. */
+       if (path_info_header.length >= LTTNG_PATH_MAX) {
+               ret = -ENAMETOOLONG;
+               ERR("Path name argument of mkdir command (%" PRIu32 " bytes) exceeds the maximal length allowed (%d bytes)",
+                               path_info_header.length, LTTNG_PATH_MAX);
+               goto end;
+       }
+
+       path_info = zmalloc(sizeof(path_info_header) + path_info_header.length);
+       if (!path_info) {
+               PERROR("zmalloc of mkdir command path");
+               ret = -1;
+               goto end;
+       }
+
+       network_ret = conn->sock->ops->recvmsg(conn->sock, path_info->path,
+                       path_info_header.length, 0);
+       if (network_ret < (ssize_t) path_info_header.length) {
+               if (network_ret == 0) {
+                       /* Orderly shutdown. Not necessary to print an error. */
+                       DBG("Socket %d did an orderly shutdown", conn->sock->fd);
+               } else {
+                       ERR("Reception of mkdir path argument failed with ret = %zi, expected %" PRIu32,
+                                       network_ret, path_info_header.length);
+               }
+               ret = -1;
+               goto end_no_session;
+       }
+
+       path = create_output_path(path_info->path);
+       if (!path) {
+               ERR("Failed to create output path");
+               ret = -1;
+               goto end;
+       }
+
+       ret = utils_mkdir_recursive(path, S_IRWXU | S_IRWXG, -1, -1);
+       if (ret < 0) {
+               ERR("relay creating output directory");
+               goto end;
+       }
+
+       ret = 0;
+
+end:
+       memset(&reply, 0, sizeof(reply));
+       if (ret < 0) {
+               reply.ret_code = htobe32(LTTNG_ERR_UNK);
+       } else {
+               reply.ret_code = htobe32(LTTNG_OK);
+       }
+       network_ret = conn->sock->ops->sendmsg(conn->sock, &reply,
+                       sizeof(struct lttcomm_relayd_generic_reply), 0);
+       if (network_ret < (ssize_t) sizeof(struct lttcomm_relayd_generic_reply)) {
+               ERR("Failed to send mkdir command status code with ret = %zi, expected %zu",
+                               network_ret,
+                               sizeof(struct lttcomm_relayd_generic_reply));
+               ret = -1;
+       }
+
+end_no_session:
+       free(path);
+       free(path_info);
+       return ret;
+}
+
+static int validate_rotate_rename_path_length(const char *path_type,
+               uint32_t path_length)
+{
+       int ret = 0;
+
+       if (path_length > LTTNG_PATH_MAX) {
+               ret = -ENAMETOOLONG;
+               ERR("rotate rename \"%s\" path name length (%" PRIu32 " bytes) exceeds the allowed size of %i bytes",
+                               path_type, path_length, LTTNG_PATH_MAX);
+       } else if (path_length == 0) {
+               ret = -EINVAL;
+               ERR("rotate rename \"%s\" path name has an illegal length of 0", path_type);
+       }
+       return ret;
+}
+
+/*
+ * relay_rotate_rename: rename the trace folder after a rotation is
+ * completed. We are not closing any fd here, just moving the folder, so it
+ * works even if data is still in-flight.
+ */
+static int relay_rotate_rename(struct lttcomm_relayd_hdr *recv_hdr,
+               struct relay_connection *conn)
+{
+       int ret;
+       ssize_t network_ret;
+       struct relay_session *session = conn->session;
+       struct lttcomm_relayd_generic_reply reply;
+       struct lttcomm_relayd_rotate_rename header;
+       char *received_paths = NULL;
+       size_t received_paths_size;
+       const char *received_old_path, *received_new_path;
+       char *complete_old_path = NULL, *complete_new_path = NULL;
+
+       if (!session || !conn->version_check_done) {
+               ERR("Trying to rename a trace folder before version check");
+               ret = -1;
+               goto end_no_reply;
+       }
+
+       if (session->major == 2 && session->minor < 11) {
+               ERR("relay_rotate_rename command is unsupported before LTTng 2.11");
+               ret = -1;
+               goto end_no_reply;
+       }
+
+       network_ret = conn->sock->ops->recvmsg(conn->sock, &header,
+                       sizeof(header), 0);
+       if (network_ret < (ssize_t) sizeof(header)) {
+               if (network_ret == 0) {
+                       /* Orderly shutdown. Not necessary to print an error. */
+                       DBG("Socket %d did an orderly shutdown",
+                                       conn->sock->fd);
+               } else {
+                       ERR("Relay didn't receive a valid rotate_rename command header: expected %zu bytes, recvmsg() returned %zi",
+                                       sizeof(header), network_ret);
+               }
+               ret = -1;
+               goto end_no_reply;
+       }
+
+       header.old_path_length = be32toh(header.old_path_length);
+       header.new_path_length = be32toh(header.new_path_length);
+       received_paths_size = header.old_path_length + header.new_path_length;
+
+       /* Ensure the paths don't exceed their allowed size. */
+       ret = validate_rotate_rename_path_length("old", header.old_path_length);
+       if (ret) {
+               goto end;
+       }
+       ret = validate_rotate_rename_path_length("new", header.new_path_length);
+       if (ret) {
+               goto end;
+       }
+
+       received_paths = zmalloc(received_paths_size);
+       if (!received_paths) {
+               PERROR("Could not allocate rotate commands paths reception buffer");
+               ret = -1;
+               goto end;
+       }
+
+       network_ret = conn->sock->ops->recvmsg(conn->sock, received_paths,
+                       received_paths_size, 0);
+       if (network_ret < (ssize_t) received_paths_size) {
+               if (network_ret == 0) {
+                       /* Orderly shutdown. Not necessary to print an error. */
+                       DBG("Socket %d did an orderly shutdown",
+                                       conn->sock->fd);
+               } else {
+                       ERR("Relay failed to received rename command paths (%zu bytes): recvmsg() returned %zi",
+                                       received_paths_size, network_ret);
+               }
+               ret = -1;
+               goto end_no_reply;
+       }
+
+       /* Validate that both paths received are NULL terminated. */
+       if (received_paths[header.old_path_length - 1] != '\0') {
+               ERR("relay_rotate_rename command's \"old\" path is invalid (not NULL terminated)");
+               ret = -1;
+               goto end;
+       }
+       if (received_paths[received_paths_size - 1] != '\0') {
+               ERR("relay_rotate_rename command's \"new\" path is invalid (not NULL terminated)");
+               ret = -1;
+               goto end;
+       }
+
+       received_old_path = received_paths;
+       received_new_path = received_paths + header.old_path_length;
+
+       complete_old_path = create_output_path(received_old_path);
+       if (!complete_old_path) {
+               ERR("Failed to build old output path in rotate_rename command");
+               ret = -1;
+               goto end;
+       }
+
+       complete_new_path = create_output_path(received_new_path);
+       if (!complete_new_path) {
+               ERR("Failed to build new output path in rotate_rename command");
+               ret = -1;
+               goto end;
+       }
+
+       ret = utils_mkdir_recursive(complete_new_path, S_IRWXU | S_IRWXG,
+                       -1, -1);
+       if (ret < 0) {
+               ERR("Failed to mkdir() rotate_rename's \"new\" output directory at \"%s\"",
+                               complete_new_path);
+               goto end;
+       }
+
+       /*
+        * If a domain has not yet created its channel, the domain-specific
+        * folder might not exist, but this is not an error.
+        */
+       ret = rename(complete_old_path, complete_new_path);
+       if (ret < 0 && errno != ENOENT) {
+               PERROR("Renaming chunk in rotate_rename command from \"%s\" to \"%s\"",
+                               complete_old_path, complete_new_path);
+               goto end;
+       }
+       ret = 0;
+
+end:
+       memset(&reply, 0, sizeof(reply));
+       if (ret < 0) {
+               reply.ret_code = htobe32(LTTNG_ERR_UNK);
+       } else {
+               reply.ret_code = htobe32(LTTNG_OK);
+       }
+       network_ret = conn->sock->ops->sendmsg(conn->sock, &reply,
+                       sizeof(struct lttcomm_relayd_generic_reply), 0);
+       if (network_ret < sizeof(struct lttcomm_relayd_generic_reply)) {
+               ERR("Relay sending stream id");
+               ret = -1;
+       }
+
+end_no_reply:
+       free(received_paths);
+       free(complete_old_path);
+       free(complete_new_path);
+       return ret;
+}
+
 /*
  * Process the commands received on the control socket
  */
@@ -2156,6 +2449,12 @@ static int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr,
        case RELAYD_RESET_METADATA:
                ret = relay_reset_metadata(recv_hdr, conn);
                break;
+       case RELAYD_ROTATE_RENAME:
+               ret = relay_rotate_rename(recv_hdr, conn);
+               break;
+       case RELAYD_MKDIR:
+               ret = relay_mkdir(recv_hdr, conn);
+               break;
        case RELAYD_UPDATE_SYNC_INFO:
        default:
                ERR("Received unknown command (%u)", be32toh(recv_hdr->cmd));
This page took 0.027952 seconds and 4 git commands to generate.