Make the consumer sends a ACK after each command
authorDavid Goulet <dgoulet@efficios.com>
Wed, 12 Dec 2012 16:23:20 +0000 (11:23 -0500)
committerDavid Goulet <dgoulet@efficios.com>
Thu, 13 Dec 2012 16:40:43 +0000 (11:40 -0500)
This is needed to avoid buffer bloating when throttling communication
between the consumer and the relayd. Considering a very low bandwith
limit between the relayd and consumerd, the session daemon would send a
high debit of commands to the consumer without ever

emptying the unix socket queue, which makes the UNIX socket reach buffer
full conditions, which is prone to trigger corner-cases behaviors in
blocking send/recv with MSG_WAITALL, which is likely the cause of hang
experienced when limiting relayd bandwidth.

Adding an ACK to each command makes sure that we acknowledge the session
daemon that we, the consumer, have emptied the unix socket buffer.

NOTE: In consumer_add_relayd_socket(), there might be a problem with the
error path and message status to the sessiond. A subsequent patch might
fix a possible issue but for now it is not at all critical since any
critical error on the consumer side will notify the sessiond through the
error socket.

Acked-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
Signed-off-by: David Goulet <dgoulet@efficios.com>
12 files changed:
src/bin/lttng-sessiond/cmd.c
src/bin/lttng-sessiond/consumer.c
src/bin/lttng-sessiond/consumer.h
src/bin/lttng-sessiond/kernel-consumer.c
src/bin/lttng-sessiond/kernel-consumer.h
src/bin/lttng-sessiond/main.c
src/bin/lttng-sessiond/ust-consumer.c
src/common/consumer.c
src/common/consumer.h
src/common/kernel-consumer/kernel-consumer.c
src/common/sessiond-comm/sessiond-comm.h
src/common/ust-consumer/ust-consumer.c

index 2110e31e9508683f7332088c20e147bf84e3c921..aaa5a8923332bb39ed3b43a8ecdc5688880206dd 100644 (file)
@@ -440,7 +440,7 @@ static int init_kernel_tracing(struct ltt_kernel_session *session)
                        assert(socket->fd >= 0);
 
                        pthread_mutex_lock(socket->lock);
-                       ret = kernel_consumer_send_session(socket->fd, session);
+                       ret = kernel_consumer_send_session(socket, session);
                        pthread_mutex_unlock(socket->lock);
                        if (ret < 0) {
                                ret = LTTNG_ERR_KERN_CONSUMER_FAIL;
@@ -528,7 +528,7 @@ error:
  */
 static int send_consumer_relayd_socket(int domain, struct ltt_session *session,
                struct lttng_uri *relayd_uri, struct consumer_output *consumer,
-               int consumer_fd)
+               struct consumer_socket *consumer_sock)
 {
        int ret;
        struct lttcomm_sock *sock = NULL;
@@ -557,7 +557,7 @@ static int send_consumer_relayd_socket(int domain, struct ltt_session *session,
        }
 
        /* Send relayd socket to consumer. */
-       ret = consumer_send_relayd_socket(consumer_fd, sock,
+       ret = consumer_send_relayd_socket(consumer_sock, sock,
                        consumer, relayd_uri->stype);
        if (ret < 0) {
                ret = LTTNG_ERR_ENABLE_CONSUMER_FAIL;
@@ -593,7 +593,8 @@ close_sock:
  * session.
  */
 static int send_consumer_relayd_sockets(int domain,
-               struct ltt_session *session, struct consumer_output *consumer, int fd)
+               struct ltt_session *session, struct consumer_output *consumer,
+               struct consumer_socket *sock)
 {
        int ret = LTTNG_OK;
 
@@ -603,7 +604,7 @@ static int send_consumer_relayd_sockets(int domain,
        /* Sending control relayd socket. */
        if (!consumer->dst.net.control_sock_sent) {
                ret = send_consumer_relayd_socket(domain, session,
-                               &consumer->dst.net.control, consumer, fd);
+                               &consumer->dst.net.control, consumer, sock);
                if (ret != LTTNG_OK) {
                        goto error;
                }
@@ -612,7 +613,7 @@ static int send_consumer_relayd_sockets(int domain,
        /* Sending data relayd socket. */
        if (!consumer->dst.net.data_sock_sent) {
                ret = send_consumer_relayd_socket(domain, session,
-                               &consumer->dst.net.data, consumer, fd);
+                               &consumer->dst.net.data, consumer, sock);
                if (ret != LTTNG_OK) {
                        goto error;
                }
@@ -652,7 +653,7 @@ static int setup_relayd(struct ltt_session *session)
 
                        pthread_mutex_lock(socket->lock);
                        ret = send_consumer_relayd_sockets(LTTNG_DOMAIN_UST, session,
-                                       usess->consumer, socket->fd);
+                                       usess->consumer, socket);
                        pthread_mutex_unlock(socket->lock);
                        if (ret != LTTNG_OK) {
                                goto error;
@@ -669,7 +670,7 @@ static int setup_relayd(struct ltt_session *session)
 
                        pthread_mutex_lock(socket->lock);
                        ret = send_consumer_relayd_sockets(LTTNG_DOMAIN_KERNEL, session,
-                                       ksess->consumer, socket->fd);
+                                       ksess->consumer, socket);
                        pthread_mutex_unlock(socket->lock);
                        if (ret != LTTNG_OK) {
                                goto error;
@@ -1657,7 +1658,7 @@ int cmd_set_consumer_uri(int domain, struct ltt_session *session,
 
                        pthread_mutex_lock(socket->lock);
                        ret = send_consumer_relayd_socket(domain, session, &uris[i],
-                                       consumer, socket->fd);
+                                       consumer, socket);
                        pthread_mutex_unlock(socket->lock);
                        if (ret != LTTNG_OK) {
                                rcu_read_unlock();
index bf477933515502510c284c58762b56d2b4399f75..92a6c5ddc72a0fca50f40fd2c1d03a0b7cddde12 100644 (file)
 
 #include "consumer.h"
 
+/*
+ * Receive a reply command status message from the consumer. Consumer socket
+ * lock MUST be acquired before calling this function.
+ *
+ * Return 0 on success, -1 on recv error or a negative lttng error code which
+ * was possibly returned by the consumer.
+ */
+int consumer_recv_status_reply(struct consumer_socket *sock)
+{
+       int ret;
+       struct lttcomm_consumer_status_msg reply;
+
+       assert(sock);
+
+       ret = lttcomm_recv_unix_sock(sock->fd, &reply, sizeof(reply));
+       if (ret < 0) {
+               PERROR("recv consumer status msg");
+               goto end;
+       }
+
+       if (reply.ret_code == LTTNG_OK) {
+               /* All good. */
+               ret = 0;
+       } else {
+               ret = -reply.ret_code;
+               ERR("Consumer ret code %d", reply.ret_code);
+       }
+
+end:
+       return ret;
+}
+
 /*
  * Send destroy relayd command to consumer.
  *
@@ -58,14 +90,18 @@ int consumer_send_destroy_relayd(struct consumer_socket *sock,
 
        pthread_mutex_lock(sock->lock);
        ret = lttcomm_send_unix_sock(sock->fd, &msg, sizeof(msg));
-       pthread_mutex_unlock(sock->lock);
        if (ret < 0) {
                PERROR("send consumer destroy relayd command");
-               goto error;
+               goto error_send;
        }
 
+       /* Don't check the return value. The caller will do it. */
+       ret = consumer_recv_status_reply(sock);
+
        DBG2("Consumer send destroy relayd command done");
 
+error_send:
+       pthread_mutex_unlock(sock->lock);
 error:
        return ret;
 }
@@ -444,19 +480,22 @@ error:
 /*
  * Send file descriptor to consumer via sock.
  */
-int consumer_send_fds(int sock, int *fds, size_t nb_fd)
+int consumer_send_fds(struct consumer_socket *sock, int *fds, size_t nb_fd)
 {
        int ret;
 
        assert(fds);
+       assert(sock);
        assert(nb_fd > 0);
 
-       ret = lttcomm_send_fds_unix_sock(sock, fds, nb_fd);
+       ret = lttcomm_send_fds_unix_sock(sock->fd, fds, nb_fd);
        if (ret < 0) {
                PERROR("send consumer fds");
                goto error;
        }
 
+       ret = consumer_recv_status_reply(sock);
+
 error:
        return ret;
 }
@@ -464,20 +503,24 @@ error:
 /*
  * Consumer send channel communication message structure to consumer.
  */
-int consumer_send_channel(int sock, struct lttcomm_consumer_msg *msg)
+int consumer_send_channel(struct consumer_socket *sock,
+               struct lttcomm_consumer_msg *msg)
 {
        int ret;
 
        assert(msg);
-       assert(sock >= 0);
+       assert(sock);
+       assert(sock->fd >= 0);
 
-       ret = lttcomm_send_unix_sock(sock, msg,
+       ret = lttcomm_send_unix_sock(sock->fd, msg,
                        sizeof(struct lttcomm_consumer_msg));
        if (ret < 0) {
                PERROR("send consumer channel");
                goto error;
        }
 
+       ret = consumer_recv_status_reply(sock);
+
 error:
        return ret;
 }
@@ -553,13 +596,15 @@ void consumer_init_stream_comm_msg(struct lttcomm_consumer_msg *msg,
 /*
  * Send stream communication structure to the consumer.
  */
-int consumer_send_stream(int sock, struct consumer_output *dst,
-       struct lttcomm_consumer_msg *msg, int *fds, size_t nb_fd)
+int consumer_send_stream(struct consumer_socket *sock,
+               struct consumer_output *dst, struct lttcomm_consumer_msg *msg,
+               int *fds, size_t nb_fd)
 {
        int ret;
 
        assert(msg);
        assert(dst);
+       assert(sock);
 
        switch (dst->type) {
        case CONSUMER_DST_NET:
@@ -585,13 +630,18 @@ int consumer_send_stream(int sock, struct consumer_output *dst,
        }
 
        /* Send on socket */
-       ret = lttcomm_send_unix_sock(sock, msg,
+       ret = lttcomm_send_unix_sock(sock->fd, msg,
                        sizeof(struct lttcomm_consumer_msg));
        if (ret < 0) {
                PERROR("send consumer stream");
                goto error;
        }
 
+       ret = consumer_recv_status_reply(sock);
+       if (ret < 0) {
+               goto error;
+       }
+
        ret = consumer_send_fds(sock, fds, nb_fd);
        if (ret < 0) {
                goto error;
@@ -606,7 +656,7 @@ error:
  *
  * On success return positive value. On error, negative value.
  */
-int consumer_send_relayd_socket(int consumer_sock,
+int consumer_send_relayd_socket(struct consumer_socket *consumer_sock,
                struct lttcomm_sock *sock, struct consumer_output *consumer,
                enum lttng_stream_type type)
 {
@@ -616,6 +666,7 @@ int consumer_send_relayd_socket(int consumer_sock,
        /* Code flow error. Safety net. */
        assert(sock);
        assert(consumer);
+       assert(consumer_sock);
 
        /* Bail out if consumer is disabled */
        if (!consumer->enabled) {
@@ -633,13 +684,18 @@ int consumer_send_relayd_socket(int consumer_sock,
        msg.u.relayd_sock.type = type;
        memcpy(&msg.u.relayd_sock.sock, sock, sizeof(msg.u.relayd_sock.sock));
 
-       DBG3("Sending relayd sock info to consumer on %d", consumer_sock);
-       ret = lttcomm_send_unix_sock(consumer_sock, &msg, sizeof(msg));
+       DBG3("Sending relayd sock info to consumer on %d", consumer_sock->fd);
+       ret = lttcomm_send_unix_sock(consumer_sock->fd, &msg, sizeof(msg));
        if (ret < 0) {
                PERROR("send consumer relayd socket info");
                goto error;
        }
 
+       ret = consumer_recv_status_reply(consumer_sock);
+       if (ret < 0) {
+               goto error;
+       }
+
        DBG3("Sending relayd socket file descriptor to consumer");
        ret = consumer_send_fds(consumer_sock, &sock->fd, 1);
        if (ret < 0) {
@@ -739,6 +795,11 @@ int consumer_is_data_pending(unsigned int id,
                        goto error;
                }
 
+               /*
+                * No need for a recv reply status because the answer to the command is
+                * the reply status message.
+                */
+
                ret = lttcomm_recv_unix_sock(socket->fd, &ret_code, sizeof(ret_code));
                if (ret < 0) {
                        PERROR("recv consumer data pending status");
index e45d5b0bf4a99b9311968529e31150bfc0188dad..01548cd17f088bfab82e6bd57082ed2ff87c1510 100644 (file)
@@ -163,15 +163,18 @@ struct consumer_output *consumer_copy_output(struct consumer_output *obj);
 void consumer_destroy_output(struct consumer_output *obj);
 int consumer_set_network_uri(struct consumer_output *obj,
                struct lttng_uri *uri);
-int consumer_send_fds(int sock, int *fds, size_t nb_fd);
-int consumer_send_stream(int sock, struct consumer_output *dst,
-               struct lttcomm_consumer_msg *msg, int *fds, size_t nb_fd);
-int consumer_send_channel(int sock, struct lttcomm_consumer_msg *msg);
-int consumer_send_relayd_socket(int consumer_sock,
+int consumer_send_fds(struct consumer_socket *sock, int *fds, size_t nb_fd);
+int consumer_send_stream(struct consumer_socket *sock,
+               struct consumer_output *dst, struct lttcomm_consumer_msg *msg,
+               int *fds, size_t nb_fd);
+int consumer_send_channel(struct consumer_socket *sock,
+               struct lttcomm_consumer_msg *msg);
+int consumer_send_relayd_socket(struct consumer_socket *consumer_sock,
                struct lttcomm_sock *sock, struct consumer_output *consumer,
                enum lttng_stream_type type);
 int consumer_send_destroy_relayd(struct consumer_socket *sock,
                struct consumer_output *consumer);
+int consumer_recv_status_reply(struct consumer_socket *sock);
 void consumer_output_send_destroy_relayd(struct consumer_output *consumer);
 int consumer_create_socket(struct consumer_data *data,
                struct consumer_output *output);
index c86d52803b80ac19c254eb3a1e267f3223d6c5d9..2a9bf9983d937223e5131ed5c0d374599c052000 100644 (file)
@@ -31,7 +31,8 @@
 /*
  * Sending a single channel to the consumer with command ADD_CHANNEL.
  */
-int kernel_consumer_add_channel(int sock, struct ltt_kernel_channel *channel)
+int kernel_consumer_add_channel(struct consumer_socket *sock,
+               struct ltt_kernel_channel *channel)
 {
        int ret;
        struct lttcomm_consumer_msg lkm;
@@ -63,7 +64,8 @@ error:
 /*
  * Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM.
  */
-int kernel_consumer_add_metadata(int sock, struct ltt_kernel_session *session)
+int kernel_consumer_add_metadata(struct consumer_socket *sock,
+               struct ltt_kernel_session *session)
 {
        int ret;
        char tmp_path[PATH_MAX];
@@ -74,6 +76,7 @@ int kernel_consumer_add_metadata(int sock, struct ltt_kernel_session *session)
        /* Safety net */
        assert(session);
        assert(session->consumer);
+       assert(sock);
 
        DBG("Sending metadata %d to kernel consumer", session->metadata_stream_fd);
 
@@ -155,8 +158,9 @@ error:
 /*
  * Sending a single stream to the consumer with command ADD_STREAM.
  */
-int kernel_consumer_add_stream(int sock, struct ltt_kernel_channel *channel,
-               struct ltt_kernel_stream *stream, struct ltt_kernel_session *session)
+int kernel_consumer_add_stream(struct consumer_socket *sock,
+               struct ltt_kernel_channel *channel, struct ltt_kernel_stream *stream,
+               struct ltt_kernel_session *session)
 {
        int ret;
        char tmp_path[PATH_MAX];
@@ -168,6 +172,7 @@ int kernel_consumer_add_stream(int sock, struct ltt_kernel_channel *channel,
        assert(stream);
        assert(session);
        assert(session->consumer);
+       assert(sock);
 
        DBG("Sending stream %d of channel %s to kernel consumer",
                        stream->fd, channel->channel->name);
@@ -224,7 +229,7 @@ error:
 /*
  * Send all stream fds of kernel channel to the consumer.
  */
-int kernel_consumer_send_channel_stream(int sock,
+int kernel_consumer_send_channel_stream(struct consumer_socket *sock,
                struct ltt_kernel_channel *channel, struct ltt_kernel_session *session)
 {
        int ret;
@@ -234,6 +239,7 @@ int kernel_consumer_send_channel_stream(int sock,
        assert(channel);
        assert(session);
        assert(session->consumer);
+       assert(sock);
 
        /* Bail out if consumer is disabled */
        if (!session->consumer->enabled) {
@@ -269,7 +275,8 @@ error:
 /*
  * Send all stream fds of the kernel session to the consumer.
  */
-int kernel_consumer_send_session(int sock, struct ltt_kernel_session *session)
+int kernel_consumer_send_session(struct consumer_socket *sock,
+               struct ltt_kernel_session *session)
 {
        int ret;
        struct ltt_kernel_channel *chan;
@@ -277,6 +284,7 @@ int kernel_consumer_send_session(int sock, struct ltt_kernel_session *session)
        /* Safety net */
        assert(session);
        assert(session->consumer);
+       assert(sock);
 
        /* Bail out if consumer is disabled */
        if (!session->consumer->enabled) {
index 8aba019e6de5d1a93ad7c897234d5119adeed640..07d7436a14d8efe41c62b0479f1d440ffd4acff4 100644 (file)
 
 #include "trace-kernel.h"
 
-int kernel_consumer_send_channel_stream(int sock,
+int kernel_consumer_send_channel_stream(struct consumer_socket *sock,
                struct ltt_kernel_channel *channel, struct ltt_kernel_session *session);
 
-int kernel_consumer_send_session(int sock, struct ltt_kernel_session *session);
+int kernel_consumer_send_session(struct consumer_socket *sock,
+               struct ltt_kernel_session *session);
 
-int kernel_consumer_add_stream(int sock, struct ltt_kernel_channel *channel,
-               struct ltt_kernel_stream *stream, struct ltt_kernel_session *session);
+int kernel_consumer_add_stream(struct consumer_socket *sock,
+               struct ltt_kernel_channel *channel, struct ltt_kernel_stream *stream,
+               struct ltt_kernel_session *session);
 
-int kernel_consumer_add_metadata(int sock, struct ltt_kernel_session *session);
+int kernel_consumer_add_metadata(struct consumer_socket *sock,
+               struct ltt_kernel_session *session);
 
-int kernel_consumer_add_channel(int sock, struct ltt_kernel_channel *channel);
+int kernel_consumer_add_channel(struct consumer_socket *sock,
+               struct ltt_kernel_channel *channel);
index d5cfc0e2769e58a6b13b0870ef2dace8b0a59c85..8a1f753114bfe3bfaa8cd5629ea5c7e199c29aab 100644 (file)
@@ -633,7 +633,7 @@ static int update_kernel_stream(struct consumer_data *consumer_data, int fd)
                                                assert(socket->fd >= 0);
 
                                                pthread_mutex_lock(socket->lock);
-                                               ret = kernel_consumer_send_channel_stream(socket->fd,
+                                               ret = kernel_consumer_send_channel_stream(socket,
                                                                channel, ksess);
                                                pthread_mutex_unlock(socket->lock);
                                                if (ret < 0) {
index 465dd07d74489cd26135126f903e1a30aa6070a2..c1af765c9ab9821a855666da2613e96d6af16aab 100644 (file)
 /*
  * Send a single channel to the consumer using command ADD_CHANNEL.
  */
-static int send_channel(int sock, struct ust_app_channel *uchan)
+static int send_channel(struct consumer_socket *sock,
+               struct ust_app_channel *uchan)
 {
        int ret, fd;
        struct lttcomm_consumer_msg msg;
 
        /* Safety net */
        assert(uchan);
+       assert(sock);
 
-       if (sock < 0) {
+       if (sock->fd < 0) {
                ret = -EINVAL;
                goto error;
        }
@@ -73,9 +75,10 @@ error:
 /*
  * Send a single stream to the consumer using ADD_STREAM command.
  */
-static int send_channel_stream(int sock, struct ust_app_channel *uchan,
-               struct ust_app_session *usess, struct ltt_ust_stream *stream,
-               struct consumer_output *consumer, const char *pathname)
+static int send_channel_stream(struct consumer_socket *sock,
+               struct ust_app_channel *uchan, struct ust_app_session *usess,
+               struct ltt_ust_stream *stream, struct consumer_output *consumer,
+               const char *pathname)
 {
        int ret, fds[2];
        struct lttcomm_consumer_msg msg;
@@ -85,6 +88,7 @@ static int send_channel_stream(int sock, struct ust_app_channel *uchan,
        assert(usess);
        assert(stream);
        assert(consumer);
+       assert(sock);
 
        DBG2("Sending stream %d of channel %s to kernel consumer",
                        stream->obj->shm_fd, uchan->name);
@@ -119,7 +123,7 @@ error:
 /*
  * Send all stream fds of UST channel to the consumer.
  */
-static int send_channel_streams(int sock,
+static int send_channel_streams(struct consumer_socket *sock,
                struct ust_app_channel *uchan, struct ust_app_session *usess,
                struct consumer_output *consumer)
 {
@@ -128,6 +132,8 @@ static int send_channel_streams(int sock,
        const char *pathname;
        struct ltt_ust_stream *stream, *tmp;
 
+       assert(sock);
+
        DBG("Sending streams of channel %s to UST consumer", uchan->name);
 
        ret = send_channel(sock, uchan);
@@ -180,8 +186,8 @@ error:
 /*
  * Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM.
  */
-static int send_metadata(int sock, struct ust_app_session *usess,
-               struct consumer_output *consumer)
+static int send_metadata(struct consumer_socket *sock,
+               struct ust_app_session *usess, struct consumer_output *consumer)
 {
        int ret, fd, fds[2];
        char tmp_path[PATH_MAX];
@@ -191,9 +197,10 @@ static int send_metadata(int sock, struct ust_app_session *usess,
        /* Safety net */
        assert(usess);
        assert(consumer);
+       assert(sock);
 
-       if (sock < 0) {
-               ERR("Consumer socket is negative (%d)", sock);
+       if (sock->fd < 0) {
+               ERR("Consumer socket is negative (%d)", sock->fd);
                return -EINVAL;
        }
 
@@ -305,7 +312,7 @@ int ust_consumer_send_session(struct ust_app_session *usess,
        pthread_mutex_lock(sock->lock);
 
        /* Sending metadata information to the consumer */
-       ret = send_metadata(sock->fd, usess, consumer);
+       ret = send_metadata(sock, usess, consumer);
        if (ret < 0) {
                goto error;
        }
@@ -322,7 +329,7 @@ int ust_consumer_send_session(struct ust_app_session *usess,
                        continue;
                }
 
-               ret = send_channel_streams(sock->fd, ua_chan, usess, consumer);
+               ret = send_channel_streams(sock, ua_chan, usess, consumer);
                if (ret < 0) {
                        rcu_read_unlock();
                        goto error;
index 70cde9c1028d978d7deb1865ff84fe9455988a56..f57cfffff57262cbb04ef048fb761e75019eb805 100644 (file)
@@ -2679,10 +2679,18 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
                struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock)
 {
        int fd = -1, ret = -1;
+       enum lttng_error_code ret_code = LTTNG_OK;
        struct consumer_relayd_sock_pair *relayd;
 
        DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx);
 
+       /* First send a status message before receiving the fds. */
+       ret = consumer_send_status_msg(sock, ret_code);
+       if (ret < 0) {
+               /* Somehow, the session daemon is not responding anymore. */
+               goto error;
+       }
+
        /* Get relayd reference if exists. */
        relayd = consumer_find_relayd(net_seq_idx);
        if (relayd == NULL) {
@@ -2709,6 +2717,13 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
                goto error;
        }
 
+       /* We have the fds without error. Send status back. */
+       ret = consumer_send_status_msg(sock, ret_code);
+       if (ret < 0) {
+               /* Somehow, the session daemon is not responding anymore. */
+               goto error;
+       }
+
        /* Copy socket information and received FD */
        switch (sock_type) {
        case LTTNG_STREAM_CONTROL:
@@ -2913,3 +2928,17 @@ data_not_pending:
        rcu_read_unlock();
        return 1;
 }
+
+/*
+ * Send a ret code status message to the sessiond daemon.
+ *
+ * Return the sendmsg() return value.
+ */
+int consumer_send_status_msg(int sock, int ret_code)
+{
+       struct lttcomm_consumer_status_msg msg;
+
+       msg.ret_code = ret_code;
+
+       return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
+}
index 87ba490d7500af8ea078cec6ab652495ab5c91d5..a476dd5bf84fa5617df92ba603557e35b374f0ee 100644 (file)
@@ -416,5 +416,6 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
 void consumer_flag_relayd_for_destroy(
                struct consumer_relayd_sock_pair *relayd);
 int consumer_data_pending(uint64_t id);
+int consumer_send_status_msg(int sock, int ret_code);
 
 #endif /* LIB_CONSUMER_H */
index 0b0592adb8a3bbe008141a4cb92196e020b752c8..40d0f71960fb6204019b508bd0ca1caffb4f838c 100644 (file)
@@ -88,6 +88,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                int sock, struct pollfd *consumer_sockpoll)
 {
        ssize_t ret;
+       enum lttng_error_code ret_code = LTTNG_OK;
        struct lttcomm_consumer_msg msg;
 
        ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
@@ -96,6 +97,14 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                return ret;
        }
        if (msg.cmd_type == LTTNG_CONSUMER_STOP) {
+               /*
+                * Notify the session daemon that the command is completed.
+                *
+                * On transport layer error, the function call will print an error
+                * message so handling the returned code is a bit useless since we
+                * return an error code anyway.
+                */
+               (void) consumer_send_status_msg(sock, ret_code);
                return -ENOENT;
        }
 
@@ -105,6 +114,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        switch (msg.cmd_type) {
        case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
        {
+               /* Session daemon status message are handled in the following call. */
                ret = consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
                                msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll,
                                &msg.u.relayd_sock.sock);
@@ -114,6 +124,13 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        {
                struct lttng_consumer_channel *new_channel;
 
+               /* First send a status message before receiving the fds. */
+               ret = consumer_send_status_msg(sock, ret_code);
+               if (ret < 0) {
+                       /* Somehow, the session daemon is not responding anymore. */
+                       goto end_nosignal;
+               }
+
                DBG("consumer_add_channel %d", msg.u.channel.channel_key);
                new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
                                -1, -1,
@@ -143,6 +160,13 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_stream *new_stream;
                int alloc_ret = 0;
 
+               /* First send a status message before receiving the fds. */
+               ret = consumer_send_status_msg(sock, ret_code);
+               if (ret < 0) {
+                       /* Somehow, the session daemon is not responding anymore. */
+                       goto end_nosignal;
+               }
+
                /* block */
                if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
                        rcu_read_unlock();
@@ -157,6 +181,17 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        return ret;
                }
 
+               /*
+                * Send status code to session daemon only if the recv works. If the
+                * above recv() failed, the session daemon is notified through the
+                * error socket and the teardown is eventually done.
+                */
+               ret = consumer_send_status_msg(sock, ret_code);
+               if (ret < 0) {
+                       /* Somehow, the session daemon is not responding anymore. */
+                       goto end_nosignal;
+               }
+
                new_stream = consumer_allocate_stream(msg.u.stream.channel_key,
                                msg.u.stream.stream_key,
                                fd, fd,
@@ -263,7 +298,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                relayd = consumer_find_relayd(index);
                if (relayd == NULL) {
                        ERR("Unable to find relayd %" PRIu64, index);
-                       goto end_nosignal;
+                       ret_code = LTTNG_ERR_NO_CONSUMER;
                }
 
                /*
@@ -276,7 +311,15 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                 *
                 * The destroy can happen either here or when a stream fd hangs up.
                 */
-               consumer_flag_relayd_for_destroy(relayd);
+               if (relayd) {
+                       consumer_flag_relayd_for_destroy(relayd);
+               }
+
+               ret = consumer_send_status_msg(sock, ret_code);
+               if (ret < 0) {
+                       /* Somehow, the session daemon is not responding anymore. */
+                       goto end_nosignal;
+               }
 
                goto end_nosignal;
        }
@@ -294,6 +337,11 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                if (ret < 0) {
                        PERROR("send data pending ret code");
                }
+
+               /*
+                * No need to send back a status message since the data pending
+                * returned value is the response.
+                */
                break;
        }
        default:
index 011a1290a99097c6cb36c52c84209e57e25ac265..4dd5731221e929f2f1e94984ce17826327797866 100644 (file)
@@ -291,6 +291,13 @@ struct lttcomm_consumer_msg {
        } u;
 };
 
+/*
+ * Status message returned to the sessiond after a received command.
+ */
+struct lttcomm_consumer_status_msg {
+       enum lttng_error_code ret_code;
+};
+
 #ifdef HAVE_LIBLTTNG_UST_CTL
 
 #include <lttng/ust-abi.h>
index 5a716859a386236fd480fbf1bb133b96a52012e7..c0999b613b496446c3c0de472af7f76cad6085d1 100644 (file)
@@ -101,6 +101,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                int sock, struct pollfd *consumer_sockpoll)
 {
        ssize_t ret;
+       enum lttng_error_code ret_code = LTTNG_OK;
        struct lttcomm_consumer_msg msg;
 
        ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
@@ -111,6 +112,14 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                return ret;
        }
        if (msg.cmd_type == LTTNG_CONSUMER_STOP) {
+               /*
+                * Notify the session daemon that the command is completed.
+                *
+                * On transport layer error, the function call will print an error
+                * message so handling the returned code is a bit useless since we
+                * return an error code anyway.
+                */
+               (void) consumer_send_status_msg(sock, ret_code);
                return -ENOENT;
        }
 
@@ -120,6 +129,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        switch (msg.cmd_type) {
        case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
        {
+               /* Session daemon status message are handled in the following call. */
                ret = consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
                                msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll,
                                &msg.u.relayd_sock.sock);
@@ -133,6 +143,13 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
                DBG("UST Consumer adding channel");
 
+               /* First send a status message before receiving the fds. */
+               ret = consumer_send_status_msg(sock, ret_code);
+               if (ret < 0) {
+                       /* Somehow, the session daemon is not responding anymore. */
+                       goto end_nosignal;
+               }
+
                /* block */
                if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
                        rcu_read_unlock();
@@ -145,6 +162,17 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        return ret;
                }
 
+               /*
+                * Send status code to session daemon only if the recv works. If the
+                * above recv() failed, the session daemon is notified through the
+                * error socket and the teardown is eventually done.
+                */
+               ret = consumer_send_status_msg(sock, ret_code);
+               if (ret < 0) {
+                       /* Somehow, the session daemon is not responding anymore. */
+                       goto end_nosignal;
+               }
+
                DBG("consumer_add_channel %d", msg.u.channel.channel_key);
 
                new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
@@ -178,6 +206,13 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
                DBG("UST Consumer adding stream");
 
+               /* First send a status message before receiving the fds. */
+               ret = consumer_send_status_msg(sock, ret_code);
+               if (ret < 0) {
+                       /* Somehow, the session daemon is not responding anymore. */
+                       goto end_nosignal;
+               }
+
                /* block */
                if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
                        rcu_read_unlock();
@@ -190,6 +225,17 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        return ret;
                }
 
+               /*
+                * Send status code to session daemon only if the recv works. If the
+                * above recv() failed, the session daemon is notified through the
+                * error socket and the teardown is eventually done.
+                */
+               ret = consumer_send_status_msg(sock, ret_code);
+               if (ret < 0) {
+                       /* Somehow, the session daemon is not responding anymore. */
+                       goto end_nosignal;
+               }
+
                DBG("Consumer command ADD_STREAM chan %d stream %d",
                                msg.u.stream.channel_key, msg.u.stream.stream_key);
 
@@ -288,7 +334,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                relayd = consumer_find_relayd(index);
                if (relayd == NULL) {
                        ERR("Unable to find relayd %" PRIu64, index);
-                       goto end_nosignal;
+                       ret_code = LTTNG_ERR_NO_CONSUMER;
                }
 
                /*
@@ -301,7 +347,15 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                 *
                 * The destroy can happen either here or when a stream fd hangs up.
                 */
-               consumer_flag_relayd_for_destroy(relayd);
+               if (relayd) {
+                       consumer_flag_relayd_for_destroy(relayd);
+               }
+
+               ret = consumer_send_status_msg(sock, ret_code);
+               if (ret < 0) {
+                       /* Somehow, the session daemon is not responding anymore. */
+                       goto end_nosignal;
+               }
 
                goto end_nosignal;
        }
@@ -324,6 +378,11 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                if (ret < 0) {
                        PERROR("send data pending ret code");
                }
+
+               /*
+                * No need to send back a status message since the data pending
+                * returned value is the response.
+                */
                break;
        }
        default:
This page took 0.061865 seconds and 4 git commands to generate.