Make the consumer sends a ACK after each command
[lttng-tools.git] / src / bin / lttng-sessiond / consumer.c
index bf477933515502510c284c58762b56d2b4399f75..92a6c5ddc72a0fca50f40fd2c1d03a0b7cddde12 100644 (file)
 
 #include "consumer.h"
 
+/*
+ * Receive a reply command status message from the consumer. Consumer socket
+ * lock MUST be acquired before calling this function.
+ *
+ * Return 0 on success, -1 on recv error or a negative lttng error code which
+ * was possibly returned by the consumer.
+ */
+int consumer_recv_status_reply(struct consumer_socket *sock)
+{
+       int ret;
+       struct lttcomm_consumer_status_msg reply;
+
+       assert(sock);
+
+       ret = lttcomm_recv_unix_sock(sock->fd, &reply, sizeof(reply));
+       if (ret < 0) {
+               PERROR("recv consumer status msg");
+               goto end;
+       }
+
+       if (reply.ret_code == LTTNG_OK) {
+               /* All good. */
+               ret = 0;
+       } else {
+               ret = -reply.ret_code;
+               ERR("Consumer ret code %d", reply.ret_code);
+       }
+
+end:
+       return ret;
+}
+
 /*
  * Send destroy relayd command to consumer.
  *
@@ -58,14 +90,18 @@ int consumer_send_destroy_relayd(struct consumer_socket *sock,
 
        pthread_mutex_lock(sock->lock);
        ret = lttcomm_send_unix_sock(sock->fd, &msg, sizeof(msg));
-       pthread_mutex_unlock(sock->lock);
        if (ret < 0) {
                PERROR("send consumer destroy relayd command");
-               goto error;
+               goto error_send;
        }
 
+       /* Don't check the return value. The caller will do it. */
+       ret = consumer_recv_status_reply(sock);
+
        DBG2("Consumer send destroy relayd command done");
 
+error_send:
+       pthread_mutex_unlock(sock->lock);
 error:
        return ret;
 }
@@ -444,19 +480,22 @@ error:
 /*
  * Send file descriptor to consumer via sock.
  */
-int consumer_send_fds(int sock, int *fds, size_t nb_fd)
+int consumer_send_fds(struct consumer_socket *sock, int *fds, size_t nb_fd)
 {
        int ret;
 
        assert(fds);
+       assert(sock);
        assert(nb_fd > 0);
 
-       ret = lttcomm_send_fds_unix_sock(sock, fds, nb_fd);
+       ret = lttcomm_send_fds_unix_sock(sock->fd, fds, nb_fd);
        if (ret < 0) {
                PERROR("send consumer fds");
                goto error;
        }
 
+       ret = consumer_recv_status_reply(sock);
+
 error:
        return ret;
 }
@@ -464,20 +503,24 @@ error:
 /*
  * Consumer send channel communication message structure to consumer.
  */
-int consumer_send_channel(int sock, struct lttcomm_consumer_msg *msg)
+int consumer_send_channel(struct consumer_socket *sock,
+               struct lttcomm_consumer_msg *msg)
 {
        int ret;
 
        assert(msg);
-       assert(sock >= 0);
+       assert(sock);
+       assert(sock->fd >= 0);
 
-       ret = lttcomm_send_unix_sock(sock, msg,
+       ret = lttcomm_send_unix_sock(sock->fd, msg,
                        sizeof(struct lttcomm_consumer_msg));
        if (ret < 0) {
                PERROR("send consumer channel");
                goto error;
        }
 
+       ret = consumer_recv_status_reply(sock);
+
 error:
        return ret;
 }
@@ -553,13 +596,15 @@ void consumer_init_stream_comm_msg(struct lttcomm_consumer_msg *msg,
 /*
  * Send stream communication structure to the consumer.
  */
-int consumer_send_stream(int sock, struct consumer_output *dst,
-       struct lttcomm_consumer_msg *msg, int *fds, size_t nb_fd)
+int consumer_send_stream(struct consumer_socket *sock,
+               struct consumer_output *dst, struct lttcomm_consumer_msg *msg,
+               int *fds, size_t nb_fd)
 {
        int ret;
 
        assert(msg);
        assert(dst);
+       assert(sock);
 
        switch (dst->type) {
        case CONSUMER_DST_NET:
@@ -585,13 +630,18 @@ int consumer_send_stream(int sock, struct consumer_output *dst,
        }
 
        /* Send on socket */
-       ret = lttcomm_send_unix_sock(sock, msg,
+       ret = lttcomm_send_unix_sock(sock->fd, msg,
                        sizeof(struct lttcomm_consumer_msg));
        if (ret < 0) {
                PERROR("send consumer stream");
                goto error;
        }
 
+       ret = consumer_recv_status_reply(sock);
+       if (ret < 0) {
+               goto error;
+       }
+
        ret = consumer_send_fds(sock, fds, nb_fd);
        if (ret < 0) {
                goto error;
@@ -606,7 +656,7 @@ error:
  *
  * On success return positive value. On error, negative value.
  */
-int consumer_send_relayd_socket(int consumer_sock,
+int consumer_send_relayd_socket(struct consumer_socket *consumer_sock,
                struct lttcomm_sock *sock, struct consumer_output *consumer,
                enum lttng_stream_type type)
 {
@@ -616,6 +666,7 @@ int consumer_send_relayd_socket(int consumer_sock,
        /* Code flow error. Safety net. */
        assert(sock);
        assert(consumer);
+       assert(consumer_sock);
 
        /* Bail out if consumer is disabled */
        if (!consumer->enabled) {
@@ -633,13 +684,18 @@ int consumer_send_relayd_socket(int consumer_sock,
        msg.u.relayd_sock.type = type;
        memcpy(&msg.u.relayd_sock.sock, sock, sizeof(msg.u.relayd_sock.sock));
 
-       DBG3("Sending relayd sock info to consumer on %d", consumer_sock);
-       ret = lttcomm_send_unix_sock(consumer_sock, &msg, sizeof(msg));
+       DBG3("Sending relayd sock info to consumer on %d", consumer_sock->fd);
+       ret = lttcomm_send_unix_sock(consumer_sock->fd, &msg, sizeof(msg));
        if (ret < 0) {
                PERROR("send consumer relayd socket info");
                goto error;
        }
 
+       ret = consumer_recv_status_reply(consumer_sock);
+       if (ret < 0) {
+               goto error;
+       }
+
        DBG3("Sending relayd socket file descriptor to consumer");
        ret = consumer_send_fds(consumer_sock, &sock->fd, 1);
        if (ret < 0) {
@@ -739,6 +795,11 @@ int consumer_is_data_pending(unsigned int id,
                        goto error;
                }
 
+               /*
+                * No need for a recv reply status because the answer to the command is
+                * the reply status message.
+                */
+
                ret = lttcomm_recv_unix_sock(socket->fd, &ret_code, sizeof(ret_code));
                if (ret < 0) {
                        PERROR("recv consumer data pending status");
This page took 0.025932 seconds and 4 git commands to generate.