Fix: change ERR/PERROR statement to DBG
[lttng-tools.git] / src / bin / lttng-sessiond / consumer.c
index aa050eceb56a48622ddfbf04b02c87d5f88ee9f6..ff55b57df4a59348a93a0047a3fd34e7a5d4011c 100644 (file)
 
 #include "consumer.h"
 
+/*
+ * Receive a reply command status message from the consumer. Consumer socket
+ * lock MUST be acquired before calling this function.
+ *
+ * Return 0 on success, -1 on recv error or a negative lttng error code which
+ * was possibly returned by the consumer.
+ */
+int consumer_recv_status_reply(struct consumer_socket *sock)
+{
+       int ret;
+       struct lttcomm_consumer_status_msg reply;
+
+       assert(sock);
+
+       ret = lttcomm_recv_unix_sock(sock->fd, &reply, sizeof(reply));
+       if (ret < 0) {
+               PERROR("recv consumer status msg");
+               goto end;
+       }
+
+       if (reply.ret_code == LTTNG_OK) {
+               /* All good. */
+               ret = 0;
+       } else {
+               ret = -reply.ret_code;
+               ERR("Consumer ret code %d", reply.ret_code);
+       }
+
+end:
+       return ret;
+}
+
 /*
  * Send destroy relayd command to consumer.
  *
@@ -58,14 +90,19 @@ int consumer_send_destroy_relayd(struct consumer_socket *sock,
 
        pthread_mutex_lock(sock->lock);
        ret = lttcomm_send_unix_sock(sock->fd, &msg, sizeof(msg));
-       pthread_mutex_unlock(sock->lock);
        if (ret < 0) {
-               PERROR("send consumer destroy relayd command");
-               goto error;
+               /* Indicate that the consumer is probably closing at this point. */
+               DBG("send consumer destroy relayd command");
+               goto error_send;
        }
 
+       /* Don't check the return value. The caller will do it. */
+       ret = consumer_recv_status_reply(sock);
+
        DBG2("Consumer send destroy relayd command done");
 
+error_send:
+       pthread_mutex_unlock(sock->lock);
 error:
        return ret;
 }
@@ -76,7 +113,6 @@ error:
  */
 void consumer_output_send_destroy_relayd(struct consumer_output *consumer)
 {
-       int ret;
        struct lttng_ht_iter iter;
        struct consumer_socket *socket;
 
@@ -87,10 +123,12 @@ void consumer_output_send_destroy_relayd(struct consumer_output *consumer)
                rcu_read_lock();
                cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket,
                                node.node) {
+                       int ret;
+
                        /* Send destroy relayd command */
                        ret = consumer_send_destroy_relayd(socket, consumer);
                        if (ret < 0) {
-                               ERR("Unable to send destroy relayd command to consumer");
+                               DBG("Unable to send destroy relayd command to consumer");
                                /* Continue since we MUST delete everything at this point. */
                        }
                }
@@ -444,19 +482,22 @@ error:
 /*
  * Send file descriptor to consumer via sock.
  */
-int consumer_send_fds(int sock, int *fds, size_t nb_fd)
+int consumer_send_fds(struct consumer_socket *sock, int *fds, size_t nb_fd)
 {
        int ret;
 
        assert(fds);
+       assert(sock);
        assert(nb_fd > 0);
 
-       ret = lttcomm_send_fds_unix_sock(sock, fds, nb_fd);
+       ret = lttcomm_send_fds_unix_sock(sock->fd, fds, nb_fd);
        if (ret < 0) {
                PERROR("send consumer fds");
                goto error;
        }
 
+       ret = consumer_recv_status_reply(sock);
+
 error:
        return ret;
 }
@@ -464,20 +505,24 @@ error:
 /*
  * Consumer send channel communication message structure to consumer.
  */
-int consumer_send_channel(int sock, struct lttcomm_consumer_msg *msg)
+int consumer_send_channel(struct consumer_socket *sock,
+               struct lttcomm_consumer_msg *msg)
 {
        int ret;
 
        assert(msg);
-       assert(sock >= 0);
+       assert(sock);
+       assert(sock->fd >= 0);
 
-       ret = lttcomm_send_unix_sock(sock, msg,
+       ret = lttcomm_send_unix_sock(sock->fd, msg,
                        sizeof(struct lttcomm_consumer_msg));
        if (ret < 0) {
                PERROR("send consumer channel");
                goto error;
        }
 
+       ret = consumer_recv_status_reply(sock);
+
 error:
        return ret;
 }
@@ -553,13 +598,15 @@ void consumer_init_stream_comm_msg(struct lttcomm_consumer_msg *msg,
 /*
  * Send stream communication structure to the consumer.
  */
-int consumer_send_stream(int sock, struct consumer_output *dst,
-       struct lttcomm_consumer_msg *msg, int *fds, size_t nb_fd)
+int consumer_send_stream(struct consumer_socket *sock,
+               struct consumer_output *dst, struct lttcomm_consumer_msg *msg,
+               int *fds, size_t nb_fd)
 {
        int ret;
 
        assert(msg);
        assert(dst);
+       assert(sock);
 
        switch (dst->type) {
        case CONSUMER_DST_NET:
@@ -585,13 +632,18 @@ int consumer_send_stream(int sock, struct consumer_output *dst,
        }
 
        /* Send on socket */
-       ret = lttcomm_send_unix_sock(sock, msg,
+       ret = lttcomm_send_unix_sock(sock->fd, msg,
                        sizeof(struct lttcomm_consumer_msg));
        if (ret < 0) {
                PERROR("send consumer stream");
                goto error;
        }
 
+       ret = consumer_recv_status_reply(sock);
+       if (ret < 0) {
+               goto error;
+       }
+
        ret = consumer_send_fds(sock, fds, nb_fd);
        if (ret < 0) {
                goto error;
@@ -606,9 +658,9 @@ error:
  *
  * On success return positive value. On error, negative value.
  */
-int consumer_send_relayd_socket(int consumer_sock,
+int consumer_send_relayd_socket(struct consumer_socket *consumer_sock,
                struct lttcomm_sock *sock, struct consumer_output *consumer,
-               enum lttng_stream_type type)
+               enum lttng_stream_type type, unsigned int session_id)
 {
        int ret;
        struct lttcomm_consumer_msg msg;
@@ -616,6 +668,7 @@ int consumer_send_relayd_socket(int consumer_sock,
        /* Code flow error. Safety net. */
        assert(sock);
        assert(consumer);
+       assert(consumer_sock);
 
        /* Bail out if consumer is disabled */
        if (!consumer->enabled) {
@@ -631,15 +684,21 @@ int consumer_send_relayd_socket(int consumer_sock,
         */
        msg.u.relayd_sock.net_index = consumer->net_seq_index;
        msg.u.relayd_sock.type = type;
+       msg.u.relayd_sock.session_id = session_id;
        memcpy(&msg.u.relayd_sock.sock, sock, sizeof(msg.u.relayd_sock.sock));
 
-       DBG3("Sending relayd sock info to consumer on %d", consumer_sock);
-       ret = lttcomm_send_unix_sock(consumer_sock, &msg, sizeof(msg));
+       DBG3("Sending relayd sock info to consumer on %d", consumer_sock->fd);
+       ret = lttcomm_send_unix_sock(consumer_sock->fd, &msg, sizeof(msg));
        if (ret < 0) {
                PERROR("send consumer relayd socket info");
                goto error;
        }
 
+       ret = consumer_recv_status_reply(consumer_sock);
+       if (ret < 0) {
+               goto error;
+       }
+
        DBG3("Sending relayd socket file descriptor to consumer");
        ret = consumer_send_fds(consumer_sock, &sock->fd, 1);
        if (ret < 0) {
@@ -701,28 +760,28 @@ error:
 }
 
 /*
- * Ask the consumer if the data is ready to bread (available) for the specific
+ * Ask the consumer if the data is ready to read (NOT pending) for the specific
  * session id.
  *
  * This function has a different behavior with the consumer i.e. that it waits
- * for a reply from the consumer if yes or no the data is available.
+ * for a reply from the consumer if yes or no the data is pending.
  */
-int consumer_is_data_available(unsigned int id,
+int consumer_is_data_pending(unsigned int id,
                struct consumer_output *consumer)
 {
        int ret;
-       int32_t ret_code = 1;  /* Default is that the data is available */
+       int32_t ret_code = 0;  /* Default is that the data is NOT pending */
        struct consumer_socket *socket;
        struct lttng_ht_iter iter;
        struct lttcomm_consumer_msg msg;
 
        assert(consumer);
 
-       msg.cmd_type = LTTNG_CONSUMER_DATA_AVAILABLE;
+       msg.cmd_type = LTTNG_CONSUMER_DATA_PENDING;
 
-       msg.u.data_available.session_id = (uint64_t) id;
+       msg.u.data_pending.session_id = (uint64_t) id;
 
-       DBG3("Consumer data available for id %u", id);
+       DBG3("Consumer data pending for id %u", id);
 
        /* Send command for each consumer */
        cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket,
@@ -734,30 +793,31 @@ int consumer_is_data_available(unsigned int id,
 
                ret = lttcomm_send_unix_sock(socket->fd, &msg, sizeof(msg));
                if (ret < 0) {
-                       PERROR("send consumer data available command");
+                       PERROR("send consumer data pending command");
                        pthread_mutex_unlock(socket->lock);
                        goto error;
                }
 
                /*
-                * Waiting for the reply code where 0 the data is not available and 1
-                * it is for trace reading.
+                * No need for a recv reply status because the answer to the command is
+                * the reply status message.
                 */
+
                ret = lttcomm_recv_unix_sock(socket->fd, &ret_code, sizeof(ret_code));
                if (ret < 0) {
-                       PERROR("recv consumer data available status");
+                       PERROR("recv consumer data pending status");
                        pthread_mutex_unlock(socket->lock);
                        goto error;
                }
 
                pthread_mutex_unlock(socket->lock);
 
-               if (ret_code == 0) {
+               if (ret_code == 1) {
                        break;
                }
        }
 
-       DBG("Consumer data available ret %d", ret_code);
+       DBG("Consumer data pending ret %d", ret_code);
        return ret_code;
 
 error:
This page took 0.027182 seconds and 4 git commands to generate.