Move stream name creation to fct create_ust_stream
[lttng-tools.git] / src / bin / lttng-sessiond / consumer.c
index 1e20948887b1f889ae67bb81e3e844499edef869..344c6627dbbe507c01ba483c7ba46c8f2577b57c 100644 (file)
 
 #include "consumer.h"
 
+/*
+ * Receive a reply command status message from the consumer. Consumer socket
+ * lock MUST be acquired before calling this function.
+ *
+ * Return 0 on success, -1 on recv error or a negative lttng error code which
+ * was possibly returned by the consumer.
+ */
+int consumer_recv_status_reply(struct consumer_socket *sock)
+{
+       int ret;
+       struct lttcomm_consumer_status_msg reply;
+
+       assert(sock);
+
+       ret = lttcomm_recv_unix_sock(sock->fd, &reply, sizeof(reply));
+       if (ret <= 0) {
+               if (ret == 0) {
+                       /* Orderly shutdown. Don't return 0 which means success. */
+                       ret = -1;
+               }
+               /* The above call will print a PERROR on error. */
+               DBG("Fail to receive status reply on sock %d", sock->fd);
+               goto end;
+       }
+
+       if (reply.ret_code == LTTNG_OK) {
+               /* All good. */
+               ret = 0;
+       } else {
+               ret = -reply.ret_code;
+               DBG("Consumer ret code %d", reply.ret_code);
+       }
+
+end:
+       return ret;
+}
+
 /*
  * Send destroy relayd command to consumer.
  *
@@ -58,14 +95,19 @@ int consumer_send_destroy_relayd(struct consumer_socket *sock,
 
        pthread_mutex_lock(sock->lock);
        ret = lttcomm_send_unix_sock(sock->fd, &msg, sizeof(msg));
-       pthread_mutex_unlock(sock->lock);
        if (ret < 0) {
-               PERROR("send consumer destroy relayd command");
-               goto error;
+               /* Indicate that the consumer is probably closing at this point. */
+               DBG("send consumer destroy relayd command");
+               goto error_send;
        }
 
+       /* Don't check the return value. The caller will do it. */
+       ret = consumer_recv_status_reply(sock);
+
        DBG2("Consumer send destroy relayd command done");
 
+error_send:
+       pthread_mutex_unlock(sock->lock);
 error:
        return ret;
 }
@@ -76,7 +118,6 @@ error:
  */
 void consumer_output_send_destroy_relayd(struct consumer_output *consumer)
 {
-       int ret;
        struct lttng_ht_iter iter;
        struct consumer_socket *socket;
 
@@ -87,10 +128,12 @@ void consumer_output_send_destroy_relayd(struct consumer_output *consumer)
                rcu_read_lock();
                cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket,
                                node.node) {
+                       int ret;
+
                        /* Send destroy relayd command */
                        ret = consumer_send_destroy_relayd(socket, consumer);
                        if (ret < 0) {
-                               ERR("Unable to send destroy relayd command to consumer");
+                               DBG("Unable to send destroy relayd command to consumer");
                                /* Continue since we MUST delete everything at this point. */
                        }
                }
@@ -314,6 +357,7 @@ void consumer_destroy_output(struct consumer_output *obj)
  */
 struct consumer_output *consumer_copy_output(struct consumer_output *obj)
 {
+       struct lttng_ht *tmp_ht_ptr;
        struct lttng_ht_iter iter;
        struct consumer_socket *socket, *copy_sock;
        struct consumer_output *output;
@@ -324,22 +368,28 @@ struct consumer_output *consumer_copy_output(struct consumer_output *obj)
        if (output == NULL) {
                goto error;
        }
+       /* Avoid losing the HT reference after the memcpy() */
+       tmp_ht_ptr = output->socks;
 
        memcpy(output, obj, sizeof(struct consumer_output));
 
-       /* Copy sockets */
-       output->socks = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
+       /* Putting back the HT pointer and start copying socket(s). */
+       output->socks = tmp_ht_ptr;
 
+       rcu_read_lock();
        cds_lfht_for_each_entry(obj->socks->ht, &iter.iter, socket, node.node) {
                /* Create new socket object. */
                copy_sock = consumer_allocate_socket(socket->fd);
                if (copy_sock == NULL) {
+                       rcu_read_unlock();
                        goto malloc_error;
                }
 
+               copy_sock->registered = socket->registered;
                copy_sock->lock = socket->lock;
                consumer_add_socket(copy_sock, output);
        }
+       rcu_read_unlock();
 
 error:
        return output;
@@ -440,19 +490,23 @@ error:
 /*
  * Send file descriptor to consumer via sock.
  */
-int consumer_send_fds(int sock, int *fds, size_t nb_fd)
+int consumer_send_fds(struct consumer_socket *sock, int *fds, size_t nb_fd)
 {
        int ret;
 
        assert(fds);
+       assert(sock);
        assert(nb_fd > 0);
 
-       ret = lttcomm_send_fds_unix_sock(sock, fds, nb_fd);
+       ret = lttcomm_send_fds_unix_sock(sock->fd, fds, nb_fd);
        if (ret < 0) {
-               PERROR("send consumer fds");
+               /* The above call will print a PERROR on error. */
+               DBG("Error when sending consumer fds on sock %d", sock->fd);
                goto error;
        }
 
+       ret = consumer_recv_status_reply(sock);
+
 error:
        return ret;
 }
@@ -460,20 +514,25 @@ error:
 /*
  * Consumer send channel communication message structure to consumer.
  */
-int consumer_send_channel(int sock, struct lttcomm_consumer_msg *msg)
+int consumer_send_channel(struct consumer_socket *sock,
+               struct lttcomm_consumer_msg *msg)
 {
        int ret;
 
        assert(msg);
-       assert(sock >= 0);
+       assert(sock);
+       assert(sock->fd >= 0);
 
-       ret = lttcomm_send_unix_sock(sock, msg,
+       ret = lttcomm_send_unix_sock(sock->fd, msg,
                        sizeof(struct lttcomm_consumer_msg));
        if (ret < 0) {
-               PERROR("send consumer channel");
+               /* The above call will print a PERROR on error. */
+               DBG("Error when sending consumer channel on sock %d", sock->fd);
                goto error;
        }
 
+       ret = consumer_recv_status_reply(sock);
+
 error:
        return ret;
 }
@@ -549,13 +608,15 @@ void consumer_init_stream_comm_msg(struct lttcomm_consumer_msg *msg,
 /*
  * Send stream communication structure to the consumer.
  */
-int consumer_send_stream(int sock, struct consumer_output *dst,
-       struct lttcomm_consumer_msg *msg, int *fds, size_t nb_fd)
+int consumer_send_stream(struct consumer_socket *sock,
+               struct consumer_output *dst, struct lttcomm_consumer_msg *msg,
+               int *fds, size_t nb_fd)
 {
        int ret;
 
        assert(msg);
        assert(dst);
+       assert(sock);
 
        switch (dst->type) {
        case CONSUMER_DST_NET:
@@ -581,10 +642,16 @@ int consumer_send_stream(int sock, struct consumer_output *dst,
        }
 
        /* Send on socket */
-       ret = lttcomm_send_unix_sock(sock, msg,
+       ret = lttcomm_send_unix_sock(sock->fd, msg,
                        sizeof(struct lttcomm_consumer_msg));
        if (ret < 0) {
-               PERROR("send consumer stream");
+               /* The above call will print a PERROR on error. */
+               DBG("Error when sending consumer stream on sock %d", sock->fd);
+               goto error;
+       }
+
+       ret = consumer_recv_status_reply(sock);
+       if (ret < 0) {
                goto error;
        }
 
@@ -602,9 +669,9 @@ error:
  *
  * On success return positive value. On error, negative value.
  */
-int consumer_send_relayd_socket(int consumer_sock,
+int consumer_send_relayd_socket(struct consumer_socket *consumer_sock,
                struct lttcomm_sock *sock, struct consumer_output *consumer,
-               enum lttng_stream_type type)
+               enum lttng_stream_type type, unsigned int session_id)
 {
        int ret;
        struct lttcomm_consumer_msg msg;
@@ -612,6 +679,7 @@ int consumer_send_relayd_socket(int consumer_sock,
        /* Code flow error. Safety net. */
        assert(sock);
        assert(consumer);
+       assert(consumer_sock);
 
        /* Bail out if consumer is disabled */
        if (!consumer->enabled) {
@@ -627,12 +695,19 @@ int consumer_send_relayd_socket(int consumer_sock,
         */
        msg.u.relayd_sock.net_index = consumer->net_seq_index;
        msg.u.relayd_sock.type = type;
+       msg.u.relayd_sock.session_id = session_id;
        memcpy(&msg.u.relayd_sock.sock, sock, sizeof(msg.u.relayd_sock.sock));
 
-       DBG3("Sending relayd sock info to consumer on %d", consumer_sock);
-       ret = lttcomm_send_unix_sock(consumer_sock, &msg, sizeof(msg));
+       DBG3("Sending relayd sock info to consumer on %d", consumer_sock->fd);
+       ret = lttcomm_send_unix_sock(consumer_sock->fd, &msg, sizeof(msg));
+       if (ret < 0) {
+               /* The above call will print a PERROR on error. */
+               DBG("Error when sending relayd sockets on sock %d", sock->fd);
+               goto error;
+       }
+
+       ret = consumer_recv_status_reply(consumer_sock);
        if (ret < 0) {
-               PERROR("send consumer relayd socket info");
                goto error;
        }
 
@@ -697,30 +772,31 @@ error:
 }
 
 /*
- * Ask the consumer if the data is ready to bread (available) for the specific
+ * Ask the consumer if the data is ready to read (NOT pending) for the specific
  * session id.
  *
  * This function has a different behavior with the consumer i.e. that it waits
- * for a reply from the consumer if yes or no the data is available.
+ * for a reply from the consumer if yes or no the data is pending.
  */
-int consumer_is_data_available(unsigned int id,
+int consumer_is_data_pending(unsigned int id,
                struct consumer_output *consumer)
 {
        int ret;
-       int32_t ret_code = 1;  /* Default is that the data is available */
+       int32_t ret_code = 0;  /* Default is that the data is NOT pending */
        struct consumer_socket *socket;
        struct lttng_ht_iter iter;
        struct lttcomm_consumer_msg msg;
 
        assert(consumer);
 
-       msg.cmd_type = LTTNG_CONSUMER_DATA_AVAILABLE;
+       msg.cmd_type = LTTNG_CONSUMER_DATA_PENDING;
 
-       msg.u.data_available.session_id = (uint64_t) id;
+       msg.u.data_pending.session_id = (uint64_t) id;
 
-       DBG3("Consumer data available for id %u", id);
+       DBG3("Consumer data pending for id %u", id);
 
        /* Send command for each consumer */
+       rcu_read_lock();
        cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket,
                        node.node) {
                /* Code flow error */
@@ -730,32 +806,42 @@ int consumer_is_data_available(unsigned int id,
 
                ret = lttcomm_send_unix_sock(socket->fd, &msg, sizeof(msg));
                if (ret < 0) {
-                       PERROR("send consumer data available command");
+                       /* The above call will print a PERROR on error. */
+                       DBG("Error on consumer is data pending on sock %d", socket->fd);
                        pthread_mutex_unlock(socket->lock);
-                       goto error;
+                       goto error_unlock;
                }
 
                /*
-                * Waiting for the reply code where 0 the data is not available and 1
-                * it is for trace reading.
+                * No need for a recv reply status because the answer to the command is
+                * the reply status message.
                 */
+
                ret = lttcomm_recv_unix_sock(socket->fd, &ret_code, sizeof(ret_code));
-               if (ret < 0) {
-                       PERROR("recv consumer data available status");
+               if (ret <= 0) {
+                       if (ret == 0) {
+                               /* Orderly shutdown. Don't return 0 which means success. */
+                               ret = -1;
+                       }
+                       /* The above call will print a PERROR on error. */
+                       DBG("Error on recv consumer is data pending on sock %d", socket->fd);
                        pthread_mutex_unlock(socket->lock);
-                       goto error;
+                       goto error_unlock;
                }
 
                pthread_mutex_unlock(socket->lock);
 
-               if (ret_code == 0) {
+               if (ret_code == 1) {
                        break;
                }
        }
+       rcu_read_unlock();
 
-       DBG("Consumer data available ret %d", ret_code);
+       DBG("Consumer data is %s pending for session id %u",
+                       ret_code == 1 ? "" : "NOT", id);
        return ret_code;
 
-error:
+error_unlock:
+       rcu_read_unlock();
        return -1;
 }
This page took 0.027057 seconds and 4 git commands to generate.