Introduce the relayd socket object
[lttng-tools.git] / src / common / relayd / relayd.c
index da54939f9a28153e4ae343d281ec1352e07b8ef9..bed0933d1102ae4d13454ba3d152fd86d831f377 100644 (file)
@@ -32,7 +32,7 @@
 /*
  * Send command. Fill up the header and append the data.
  */
-static int send_command(struct lttcomm_sock *sock,
+static int send_command(struct lttcomm_relayd_sock *rsock,
                enum lttcomm_relayd_command cmd, void *data, size_t size,
                int flags)
 {
@@ -65,7 +65,7 @@ static int send_command(struct lttcomm_sock *sock,
                memcpy(buf + sizeof(header), data, size);
        }
 
-       ret = sock->ops->sendmsg(sock, buf, buf_size, flags);
+       ret = rsock->sock.ops->sendmsg(&rsock->sock, buf, buf_size, flags);
        if (ret < 0) {
                ret = -errno;
                goto error;
@@ -83,20 +83,20 @@ alloc_error:
  * Receive reply data on socket. This MUST be call after send_command or else
  * could result in unexpected behavior(s).
  */
-static int recv_reply(struct lttcomm_sock *sock, void *data, size_t size)
+static int recv_reply(struct lttcomm_relayd_sock *rsock, void *data, size_t size)
 {
        int ret;
 
        DBG3("Relayd waiting for reply of size %ld", size);
 
-       ret = sock->ops->recvmsg(sock, data, size, 0);
+       ret = rsock->sock.ops->recvmsg(&rsock->sock, data, size, 0);
        if (ret <= 0 || ret != size) {
                if (ret == 0) {
                        /* Orderly shutdown. */
-                       DBG("Socket %d has performed an orderly shutdown", sock->fd);
+                       DBG("Socket %d has performed an orderly shutdown", rsock->sock.fd);
                } else {
                        DBG("Receiving reply failed on sock %d for size %lu with ret %d",
-                                       sock->fd, size, ret);
+                                       rsock->sock.fd, size, ret);
                }
                /* Always return -1 here and the caller can use errno. */
                ret = -1;
@@ -114,24 +114,24 @@ error:
  * On success, return 0 else a negative value which is either an errno error or
  * a lttng error code from the relayd.
  */
-int relayd_create_session(struct lttcomm_sock *sock, uint64_t *session_id)
+int relayd_create_session(struct lttcomm_relayd_sock *rsock, uint64_t *session_id)
 {
        int ret;
        struct lttcomm_relayd_status_session reply;
 
-       assert(sock);
+       assert(rsock);
        assert(session_id);
 
        DBG("Relayd create session");
 
        /* Send command */
-       ret = send_command(sock, RELAYD_CREATE_SESSION, NULL, 0, 0);
+       ret = send_command(rsock, RELAYD_CREATE_SESSION, NULL, 0, 0);
        if (ret < 0) {
                goto error;
        }
 
        /* Receive response */
-       ret = recv_reply(sock, (void *) &reply, sizeof(reply));
+       ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
        if (ret < 0) {
                goto error;
        }
@@ -160,7 +160,7 @@ error:
  *
  * On success return 0 else return ret_code negative value.
  */
-int relayd_add_stream(struct lttcomm_sock *sock, const char *channel_name,
+int relayd_add_stream(struct lttcomm_relayd_sock *rsock, const char *channel_name,
                const char *pathname, uint64_t *stream_id)
 {
        int ret;
@@ -168,7 +168,7 @@ int relayd_add_stream(struct lttcomm_sock *sock, const char *channel_name,
        struct lttcomm_relayd_status_stream reply;
 
        /* Code flow error. Safety net. */
-       assert(sock);
+       assert(rsock);
        assert(channel_name);
        assert(pathname);
 
@@ -178,13 +178,13 @@ int relayd_add_stream(struct lttcomm_sock *sock, const char *channel_name,
        strncpy(msg.pathname, pathname, sizeof(msg.pathname));
 
        /* Send command */
-       ret = send_command(sock, RELAYD_ADD_STREAM, (void *) &msg, sizeof(msg), 0);
+       ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) &msg, sizeof(msg), 0);
        if (ret < 0) {
                goto error;
        }
 
        /* Waiting for reply */
-       ret = recv_reply(sock, (void *) &reply, sizeof(reply));
+       ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
        if (ret < 0) {
                goto error;
        }
@@ -217,29 +217,29 @@ error:
  *
  * Return 0 if compatible else negative value.
  */
-int relayd_version_check(struct lttcomm_sock *sock, uint32_t major,
-               uint32_t minor, uint32_t *minor_to_use)
+int relayd_version_check(struct lttcomm_relayd_sock *rsock)
 {
        int ret;
        struct lttcomm_relayd_version msg;
 
        /* Code flow error. Safety net. */
-       assert(sock);
+       assert(rsock);
 
-       DBG("Relayd version check for major.minor %u.%u", major, minor);
+       DBG("Relayd version check for major.minor %u.%u", rsock->major,
+                       rsock->minor);
 
        /* Prepare network byte order before transmission. */
-       msg.major = htobe32(major);
-       msg.minor = htobe32(minor);
+       msg.major = htobe32(rsock->major);
+       msg.minor = htobe32(rsock->minor);
 
        /* Send command */
-       ret = send_command(sock, RELAYD_VERSION, (void *) &msg, sizeof(msg), 0);
+       ret = send_command(rsock, RELAYD_VERSION, (void *) &msg, sizeof(msg), 0);
        if (ret < 0) {
                goto error;
        }
 
        /* Receive response */
-       ret = recv_reply(sock, (void *) &msg, sizeof(msg));
+       ret = recv_reply(rsock, (void *) &msg, sizeof(msg));
        if (ret < 0) {
                goto error;
        }
@@ -254,30 +254,26 @@ int relayd_version_check(struct lttcomm_sock *sock, uint32_t major,
         * other. If the minor version differs, the lowest version is used by both
         * sides.
         */
-       if (msg.major != major) {
+       if (msg.major != rsock->major) {
                /* Not compatible */
                ret = -1;
                DBG2("Relayd version is NOT compatible. Relayd version %u != %u (us)",
-                               msg.major, major);
+                               msg.major, rsock->major);
                goto error;
        }
 
        /*
-        * After 2.1.0 release, for the 2.2 release, at this point will have to
-        * check the minor version in order for the session daemon to know which
-        * structure to use to communicate with the relayd. If the relayd's minor
-        * version is higher, it will adapt to our version so we can continue to
-        * use the latest relayd communication data structure.
+        * If the relayd's minor version is higher, it will adapt to our version so
+        * we can continue to use the latest relayd communication data structure.
+        * If the received minor version is higher, the relayd should adapt to us.
         */
-       if (minor <= msg.minor) {
-               *minor_to_use = minor;
-       } else {
-               *minor_to_use = msg.minor;
+       if (rsock->minor > msg.minor) {
+               rsock->minor = msg.minor;
        }
 
        /* Version number compatible */
        DBG2("Relayd version is compatible, using protocol version %u.%u",
-                       major, *minor_to_use);
+                       rsock->major, rsock->minor);
        ret = 0;
 
 error:
@@ -289,17 +285,17 @@ error:
  *
  * On success return 0 else return ret_code negative value.
  */
-int relayd_send_metadata(struct lttcomm_sock *sock, size_t len)
+int relayd_send_metadata(struct lttcomm_relayd_sock *rsock, size_t len)
 {
        int ret;
 
        /* Code flow error. Safety net. */
-       assert(sock);
+       assert(rsock);
 
        DBG("Relayd sending metadata of size %zu", len);
 
        /* Send command */
-       ret = send_command(sock, RELAYD_SEND_METADATA, NULL, len, 0);
+       ret = send_command(rsock, RELAYD_SEND_METADATA, NULL, len, 0);
        if (ret < 0) {
                goto error;
        }
@@ -317,20 +313,20 @@ error:
 }
 
 /*
- * Connect to relay daemon with an allocated lttcomm_sock.
+ * Connect to relay daemon with an allocated lttcomm_relayd_sock.
  */
-int relayd_connect(struct lttcomm_sock *sock)
+int relayd_connect(struct lttcomm_relayd_sock *rsock)
 {
        /* Code flow error. Safety net. */
-       assert(sock);
+       assert(rsock);
 
        DBG3("Relayd connect ...");
 
-       return sock->ops->connect(sock);
+       return rsock->sock.ops->connect(&rsock->sock);
 }
 
 /*
- * Close relayd socket with an allocated lttcomm_sock.
+ * Close relayd socket with an allocated lttcomm_relayd_sock.
  *
  * If no socket operations are found, simply return 0 meaning that everything
  * is fine. Without operations, the socket can not possibly be opened or used.
@@ -342,26 +338,26 @@ int relayd_connect(struct lttcomm_sock *sock)
  * Return the close returned value. On error, a negative value is usually
  * returned back from close(2).
  */
-int relayd_close(struct lttcomm_sock *sock)
+int relayd_close(struct lttcomm_relayd_sock *rsock)
 {
        int ret;
 
        /* Code flow error. Safety net. */
-       assert(sock);
+       assert(rsock);
 
        /* An invalid fd is fine, return success. */
-       if (sock->fd < 0) {
+       if (rsock->sock.fd < 0) {
                ret = 0;
                goto end;
        }
 
-       DBG3("Relayd closing socket %d", sock->fd);
+       DBG3("Relayd closing socket %d", rsock->sock.fd);
 
-       if (sock->ops) {
-               ret = sock->ops->close(sock);
+       if (rsock->sock.ops) {
+               ret = rsock->sock.ops->close(&rsock->sock);
        } else {
                /* Default call if no specific ops found. */
-               ret = close(sock->fd);
+               ret = close(rsock->sock.fd);
                if (ret < 0) {
                        PERROR("relayd_close default close");
                }
@@ -374,13 +370,13 @@ end:
 /*
  * Send data header structure to the relayd.
  */
-int relayd_send_data_hdr(struct lttcomm_sock *sock,
+int relayd_send_data_hdr(struct lttcomm_relayd_sock *rsock,
                struct lttcomm_relayd_data_hdr *hdr, size_t size)
 {
        int ret;
 
        /* Code flow error. Safety net. */
-       assert(sock);
+       assert(rsock);
        assert(hdr);
 
        DBG3("Relayd sending data header of size %ld", size);
@@ -391,7 +387,7 @@ int relayd_send_data_hdr(struct lttcomm_sock *sock,
        }
 
        /* Only send data header. */
-       ret = sock->ops->sendmsg(sock, hdr, size, 0);
+       ret = rsock->sock.ops->sendmsg(&rsock->sock, hdr, size, 0);
        if (ret < 0) {
                ret = -errno;
                goto error;
@@ -409,7 +405,7 @@ error:
 /*
  * Send close stream command to the relayd.
  */
-int relayd_send_close_stream(struct lttcomm_sock *sock, uint64_t stream_id,
+int relayd_send_close_stream(struct lttcomm_relayd_sock *rsock, uint64_t stream_id,
                uint64_t last_net_seq_num)
 {
        int ret;
@@ -417,7 +413,7 @@ int relayd_send_close_stream(struct lttcomm_sock *sock, uint64_t stream_id,
        struct lttcomm_relayd_generic_reply reply;
 
        /* Code flow error. Safety net. */
-       assert(sock);
+       assert(rsock);
 
        DBG("Relayd closing stream id %" PRIu64, stream_id);
 
@@ -425,13 +421,13 @@ int relayd_send_close_stream(struct lttcomm_sock *sock, uint64_t stream_id,
        msg.last_net_seq_num = htobe64(last_net_seq_num);
 
        /* Send command */
-       ret = send_command(sock, RELAYD_CLOSE_STREAM, (void *) &msg, sizeof(msg), 0);
+       ret = send_command(rsock, RELAYD_CLOSE_STREAM, (void *) &msg, sizeof(msg), 0);
        if (ret < 0) {
                goto error;
        }
 
        /* Receive response */
-       ret = recv_reply(sock, (void *) &reply, sizeof(reply));
+       ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
        if (ret < 0) {
                goto error;
        }
@@ -458,7 +454,7 @@ error:
  *
  * Return 0 if NOT pending, 1 if so and a negative value on error.
  */
-int relayd_data_pending(struct lttcomm_sock *sock, uint64_t stream_id,
+int relayd_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t stream_id,
                uint64_t last_net_seq_num)
 {
        int ret;
@@ -466,7 +462,7 @@ int relayd_data_pending(struct lttcomm_sock *sock, uint64_t stream_id,
        struct lttcomm_relayd_generic_reply reply;
 
        /* Code flow error. Safety net. */
-       assert(sock);
+       assert(rsock);
 
        DBG("Relayd data pending for stream id %" PRIu64, stream_id);
 
@@ -474,14 +470,14 @@ int relayd_data_pending(struct lttcomm_sock *sock, uint64_t stream_id,
        msg.last_net_seq_num = htobe64(last_net_seq_num);
 
        /* Send command */
-       ret = send_command(sock, RELAYD_DATA_PENDING, (void *) &msg,
+       ret = send_command(rsock, RELAYD_DATA_PENDING, (void *) &msg,
                        sizeof(msg), 0);
        if (ret < 0) {
                goto error;
        }
 
        /* Receive response */
-       ret = recv_reply(sock, (void *) &reply, sizeof(reply));
+       ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
        if (ret < 0) {
                goto error;
        }
@@ -506,7 +502,7 @@ error:
 /*
  * Check on the relayd side for a quiescent state on the control socket.
  */
-int relayd_quiescent_control(struct lttcomm_sock *sock,
+int relayd_quiescent_control(struct lttcomm_relayd_sock *rsock,
                uint64_t metadata_stream_id)
 {
        int ret;
@@ -514,20 +510,20 @@ int relayd_quiescent_control(struct lttcomm_sock *sock,
        struct lttcomm_relayd_generic_reply reply;
 
        /* Code flow error. Safety net. */
-       assert(sock);
+       assert(rsock);
 
        DBG("Relayd checking quiescent control state");
 
        msg.stream_id = htobe64(metadata_stream_id);
 
        /* Send command */
-       ret = send_command(sock, RELAYD_QUIESCENT_CONTROL, &msg, sizeof(msg), 0);
+       ret = send_command(rsock, RELAYD_QUIESCENT_CONTROL, &msg, sizeof(msg), 0);
        if (ret < 0) {
                goto error;
        }
 
        /* Receive response */
-       ret = recv_reply(sock, (void *) &reply, sizeof(reply));
+       ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
        if (ret < 0) {
                goto error;
        }
@@ -551,27 +547,27 @@ error:
 /*
  * Begin a data pending command for a specific session id.
  */
-int relayd_begin_data_pending(struct lttcomm_sock *sock, uint64_t id)
+int relayd_begin_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t id)
 {
        int ret;
        struct lttcomm_relayd_begin_data_pending msg;
        struct lttcomm_relayd_generic_reply reply;
 
        /* Code flow error. Safety net. */
-       assert(sock);
+       assert(rsock);
 
        DBG("Relayd begin data pending");
 
        msg.session_id = htobe64(id);
 
        /* Send command */
-       ret = send_command(sock, RELAYD_BEGIN_DATA_PENDING, &msg, sizeof(msg), 0);
+       ret = send_command(rsock, RELAYD_BEGIN_DATA_PENDING, &msg, sizeof(msg), 0);
        if (ret < 0) {
                goto error;
        }
 
        /* Receive response */
-       ret = recv_reply(sock, (void *) &reply, sizeof(reply));
+       ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
        if (ret < 0) {
                goto error;
        }
@@ -597,7 +593,7 @@ error:
  * Return 0 on success and set is_data_inflight to 0 if no data is being
  * streamed or 1 if it is the case.
  */
-int relayd_end_data_pending(struct lttcomm_sock *sock, uint64_t id,
+int relayd_end_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t id,
                unsigned int *is_data_inflight)
 {
        int ret;
@@ -605,20 +601,20 @@ int relayd_end_data_pending(struct lttcomm_sock *sock, uint64_t id,
        struct lttcomm_relayd_generic_reply reply;
 
        /* Code flow error. Safety net. */
-       assert(sock);
+       assert(rsock);
 
        DBG("Relayd end data pending");
 
        msg.session_id = htobe64(id);
 
        /* Send command */
-       ret = send_command(sock, RELAYD_END_DATA_PENDING, &msg, sizeof(msg), 0);
+       ret = send_command(rsock, RELAYD_END_DATA_PENDING, &msg, sizeof(msg), 0);
        if (ret < 0) {
                goto error;
        }
 
        /* Receive response */
-       ret = recv_reply(sock, (void *) &reply, sizeof(reply));
+       ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
        if (ret < 0) {
                goto error;
        }
This page took 0.030169 seconds and 4 git commands to generate.