X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fconsumer.c;h=344c6627dbbe507c01ba483c7ba46c8f2577b57c;hp=66cdc2479d28213acd65b4c988a413a072ec9ac5;hb=7f13370536e1ad64db733423b542755c97160f4d;hpb=c617c0c651432f9d5ae7adf4c5c1a5fd92ad828e diff --git a/src/bin/lttng-sessiond/consumer.c b/src/bin/lttng-sessiond/consumer.c index 66cdc2479..344c6627d 100644 --- a/src/bin/lttng-sessiond/consumer.c +++ b/src/bin/lttng-sessiond/consumer.c @@ -45,8 +45,13 @@ int consumer_recv_status_reply(struct consumer_socket *sock) assert(sock); ret = lttcomm_recv_unix_sock(sock->fd, &reply, sizeof(reply)); - if (ret < 0) { - PERROR("recv consumer status msg"); + if (ret <= 0) { + if (ret == 0) { + /* Orderly shutdown. Don't return 0 which means success. */ + ret = -1; + } + /* The above call will print a PERROR on error. */ + DBG("Fail to receive status reply on sock %d", sock->fd); goto end; } @@ -55,7 +60,7 @@ int consumer_recv_status_reply(struct consumer_socket *sock) ret = 0; } else { ret = -reply.ret_code; - ERR("Consumer ret code %d", reply.ret_code); + DBG("Consumer ret code %d", reply.ret_code); } end: @@ -91,7 +96,8 @@ int consumer_send_destroy_relayd(struct consumer_socket *sock, pthread_mutex_lock(sock->lock); ret = lttcomm_send_unix_sock(sock->fd, &msg, sizeof(msg)); if (ret < 0) { - PERROR("send consumer destroy relayd command"); + /* Indicate that the consumer is probably closing at this point. */ + DBG("send consumer destroy relayd command"); goto error_send; } @@ -127,7 +133,7 @@ void consumer_output_send_destroy_relayd(struct consumer_output *consumer) /* Send destroy relayd command */ ret = consumer_send_destroy_relayd(socket, consumer); if (ret < 0) { - ERR("Unable to send destroy relayd command to consumer"); + DBG("Unable to send destroy relayd command to consumer"); /* Continue since we MUST delete everything at this point. */ } } @@ -370,10 +376,12 @@ struct consumer_output *consumer_copy_output(struct consumer_output *obj) /* Putting back the HT pointer and start copying socket(s). */ output->socks = tmp_ht_ptr; + rcu_read_lock(); cds_lfht_for_each_entry(obj->socks->ht, &iter.iter, socket, node.node) { /* Create new socket object. */ copy_sock = consumer_allocate_socket(socket->fd); if (copy_sock == NULL) { + rcu_read_unlock(); goto malloc_error; } @@ -381,6 +389,7 @@ struct consumer_output *consumer_copy_output(struct consumer_output *obj) copy_sock->lock = socket->lock; consumer_add_socket(copy_sock, output); } + rcu_read_unlock(); error: return output; @@ -491,7 +500,8 @@ int consumer_send_fds(struct consumer_socket *sock, int *fds, size_t nb_fd) ret = lttcomm_send_fds_unix_sock(sock->fd, fds, nb_fd); if (ret < 0) { - PERROR("send consumer fds"); + /* The above call will print a PERROR on error. */ + DBG("Error when sending consumer fds on sock %d", sock->fd); goto error; } @@ -516,7 +526,8 @@ int consumer_send_channel(struct consumer_socket *sock, ret = lttcomm_send_unix_sock(sock->fd, msg, sizeof(struct lttcomm_consumer_msg)); if (ret < 0) { - PERROR("send consumer channel"); + /* The above call will print a PERROR on error. */ + DBG("Error when sending consumer channel on sock %d", sock->fd); goto error; } @@ -634,7 +645,8 @@ int consumer_send_stream(struct consumer_socket *sock, ret = lttcomm_send_unix_sock(sock->fd, msg, sizeof(struct lttcomm_consumer_msg)); if (ret < 0) { - PERROR("send consumer stream"); + /* The above call will print a PERROR on error. */ + DBG("Error when sending consumer stream on sock %d", sock->fd); goto error; } @@ -689,7 +701,8 @@ int consumer_send_relayd_socket(struct consumer_socket *consumer_sock, DBG3("Sending relayd sock info to consumer on %d", consumer_sock->fd); ret = lttcomm_send_unix_sock(consumer_sock->fd, &msg, sizeof(msg)); if (ret < 0) { - PERROR("send consumer relayd socket info"); + /* The above call will print a PERROR on error. */ + DBG("Error when sending relayd sockets on sock %d", sock->fd); goto error; } @@ -783,6 +796,7 @@ int consumer_is_data_pending(unsigned int id, DBG3("Consumer data pending for id %u", id); /* Send command for each consumer */ + rcu_read_lock(); cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket, node.node) { /* Code flow error */ @@ -792,9 +806,10 @@ int consumer_is_data_pending(unsigned int id, ret = lttcomm_send_unix_sock(socket->fd, &msg, sizeof(msg)); if (ret < 0) { - PERROR("send consumer data pending command"); + /* The above call will print a PERROR on error. */ + DBG("Error on consumer is data pending on sock %d", socket->fd); pthread_mutex_unlock(socket->lock); - goto error; + goto error_unlock; } /* @@ -803,10 +818,15 @@ int consumer_is_data_pending(unsigned int id, */ ret = lttcomm_recv_unix_sock(socket->fd, &ret_code, sizeof(ret_code)); - if (ret < 0) { - PERROR("recv consumer data pending status"); + if (ret <= 0) { + if (ret == 0) { + /* Orderly shutdown. Don't return 0 which means success. */ + ret = -1; + } + /* The above call will print a PERROR on error. */ + DBG("Error on recv consumer is data pending on sock %d", socket->fd); pthread_mutex_unlock(socket->lock); - goto error; + goto error_unlock; } pthread_mutex_unlock(socket->lock); @@ -815,10 +835,13 @@ int consumer_is_data_pending(unsigned int id, break; } } + rcu_read_unlock(); - DBG("Consumer data pending ret %d", ret_code); + DBG("Consumer data is %s pending for session id %u", + ret_code == 1 ? "" : "NOT", id); return ret_code; -error: +error_unlock: + rcu_read_unlock(); return -1; }