Move stream name creation to fct create_ust_stream
[lttng-tools.git] / src / bin / lttng-sessiond / consumer.c
index fd9e1e6a0667f3e55f7946540883fb1277313129..344c6627dbbe507c01ba483c7ba46c8f2577b57c 100644 (file)
@@ -45,8 +45,13 @@ int consumer_recv_status_reply(struct consumer_socket *sock)
        assert(sock);
 
        ret = lttcomm_recv_unix_sock(sock->fd, &reply, sizeof(reply));
-       if (ret < 0) {
-               PERROR("recv consumer status msg");
+       if (ret <= 0) {
+               if (ret == 0) {
+                       /* Orderly shutdown. Don't return 0 which means success. */
+                       ret = -1;
+               }
+               /* The above call will print a PERROR on error. */
+               DBG("Fail to receive status reply on sock %d", sock->fd);
                goto end;
        }
 
@@ -55,7 +60,7 @@ int consumer_recv_status_reply(struct consumer_socket *sock)
                ret = 0;
        } else {
                ret = -reply.ret_code;
-               ERR("Consumer ret code %d", reply.ret_code);
+               DBG("Consumer ret code %d", reply.ret_code);
        }
 
 end:
@@ -91,7 +96,8 @@ int consumer_send_destroy_relayd(struct consumer_socket *sock,
        pthread_mutex_lock(sock->lock);
        ret = lttcomm_send_unix_sock(sock->fd, &msg, sizeof(msg));
        if (ret < 0) {
-               PERROR("send consumer destroy relayd command");
+               /* Indicate that the consumer is probably closing at this point. */
+               DBG("send consumer destroy relayd command");
                goto error_send;
        }
 
@@ -112,7 +118,6 @@ error:
  */
 void consumer_output_send_destroy_relayd(struct consumer_output *consumer)
 {
-       int ret;
        struct lttng_ht_iter iter;
        struct consumer_socket *socket;
 
@@ -123,10 +128,12 @@ void consumer_output_send_destroy_relayd(struct consumer_output *consumer)
                rcu_read_lock();
                cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket,
                                node.node) {
+                       int ret;
+
                        /* Send destroy relayd command */
                        ret = consumer_send_destroy_relayd(socket, consumer);
                        if (ret < 0) {
-                               ERR("Unable to send destroy relayd command to consumer");
+                               DBG("Unable to send destroy relayd command to consumer");
                                /* Continue since we MUST delete everything at this point. */
                        }
                }
@@ -369,10 +376,12 @@ struct consumer_output *consumer_copy_output(struct consumer_output *obj)
        /* Putting back the HT pointer and start copying socket(s). */
        output->socks = tmp_ht_ptr;
 
+       rcu_read_lock();
        cds_lfht_for_each_entry(obj->socks->ht, &iter.iter, socket, node.node) {
                /* Create new socket object. */
                copy_sock = consumer_allocate_socket(socket->fd);
                if (copy_sock == NULL) {
+                       rcu_read_unlock();
                        goto malloc_error;
                }
 
@@ -380,6 +389,7 @@ struct consumer_output *consumer_copy_output(struct consumer_output *obj)
                copy_sock->lock = socket->lock;
                consumer_add_socket(copy_sock, output);
        }
+       rcu_read_unlock();
 
 error:
        return output;
@@ -490,7 +500,8 @@ int consumer_send_fds(struct consumer_socket *sock, int *fds, size_t nb_fd)
 
        ret = lttcomm_send_fds_unix_sock(sock->fd, fds, nb_fd);
        if (ret < 0) {
-               PERROR("send consumer fds");
+               /* The above call will print a PERROR on error. */
+               DBG("Error when sending consumer fds on sock %d", sock->fd);
                goto error;
        }
 
@@ -515,7 +526,8 @@ int consumer_send_channel(struct consumer_socket *sock,
        ret = lttcomm_send_unix_sock(sock->fd, msg,
                        sizeof(struct lttcomm_consumer_msg));
        if (ret < 0) {
-               PERROR("send consumer channel");
+               /* The above call will print a PERROR on error. */
+               DBG("Error when sending consumer channel on sock %d", sock->fd);
                goto error;
        }
 
@@ -633,7 +645,8 @@ int consumer_send_stream(struct consumer_socket *sock,
        ret = lttcomm_send_unix_sock(sock->fd, msg,
                        sizeof(struct lttcomm_consumer_msg));
        if (ret < 0) {
-               PERROR("send consumer stream");
+               /* The above call will print a PERROR on error. */
+               DBG("Error when sending consumer stream on sock %d", sock->fd);
                goto error;
        }
 
@@ -688,7 +701,8 @@ int consumer_send_relayd_socket(struct consumer_socket *consumer_sock,
        DBG3("Sending relayd sock info to consumer on %d", consumer_sock->fd);
        ret = lttcomm_send_unix_sock(consumer_sock->fd, &msg, sizeof(msg));
        if (ret < 0) {
-               PERROR("send consumer relayd socket info");
+               /* The above call will print a PERROR on error. */
+               DBG("Error when sending relayd sockets on sock %d", sock->fd);
                goto error;
        }
 
@@ -782,6 +796,7 @@ int consumer_is_data_pending(unsigned int id,
        DBG3("Consumer data pending for id %u", id);
 
        /* Send command for each consumer */
+       rcu_read_lock();
        cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket,
                        node.node) {
                /* Code flow error */
@@ -791,9 +806,10 @@ int consumer_is_data_pending(unsigned int id,
 
                ret = lttcomm_send_unix_sock(socket->fd, &msg, sizeof(msg));
                if (ret < 0) {
-                       PERROR("send consumer data pending command");
+                       /* The above call will print a PERROR on error. */
+                       DBG("Error on consumer is data pending on sock %d", socket->fd);
                        pthread_mutex_unlock(socket->lock);
-                       goto error;
+                       goto error_unlock;
                }
 
                /*
@@ -802,10 +818,15 @@ int consumer_is_data_pending(unsigned int id,
                 */
 
                ret = lttcomm_recv_unix_sock(socket->fd, &ret_code, sizeof(ret_code));
-               if (ret < 0) {
-                       PERROR("recv consumer data pending status");
+               if (ret <= 0) {
+                       if (ret == 0) {
+                               /* Orderly shutdown. Don't return 0 which means success. */
+                               ret = -1;
+                       }
+                       /* The above call will print a PERROR on error. */
+                       DBG("Error on recv consumer is data pending on sock %d", socket->fd);
                        pthread_mutex_unlock(socket->lock);
-                       goto error;
+                       goto error_unlock;
                }
 
                pthread_mutex_unlock(socket->lock);
@@ -814,10 +835,13 @@ int consumer_is_data_pending(unsigned int id,
                        break;
                }
        }
+       rcu_read_unlock();
 
-       DBG("Consumer data pending ret %d", ret_code);
+       DBG("Consumer data is %s pending for session id %u",
+                       ret_code == 1 ? "" : "NOT", id);
        return ret_code;
 
-error:
+error_unlock:
+       rcu_read_unlock();
        return -1;
 }
This page took 0.025371 seconds and 4 git commands to generate.