From f50f23d9f80ed9fae7fe5c49aee65e813e0031c8 Mon Sep 17 00:00:00 2001 From: David Goulet Date: Wed, 12 Dec 2012 11:23:20 -0500 Subject: [PATCH] Make the consumer sends a ACK after each command This is needed to avoid buffer bloating when throttling communication between the consumer and the relayd. Considering a very low bandwith limit between the relayd and consumerd, the session daemon would send a high debit of commands to the consumer without ever emptying the unix socket queue, which makes the UNIX socket reach buffer full conditions, which is prone to trigger corner-cases behaviors in blocking send/recv with MSG_WAITALL, which is likely the cause of hang experienced when limiting relayd bandwidth. Adding an ACK to each command makes sure that we acknowledge the session daemon that we, the consumer, have emptied the unix socket buffer. NOTE: In consumer_add_relayd_socket(), there might be a problem with the error path and message status to the sessiond. A subsequent patch might fix a possible issue but for now it is not at all critical since any critical error on the consumer side will notify the sessiond through the error socket. Acked-by: Mathieu Desnoyers Signed-off-by: David Goulet --- src/bin/lttng-sessiond/cmd.c | 19 +++-- src/bin/lttng-sessiond/consumer.c | 87 +++++++++++++++++--- src/bin/lttng-sessiond/consumer.h | 13 +-- src/bin/lttng-sessiond/kernel-consumer.c | 20 +++-- src/bin/lttng-sessiond/kernel-consumer.h | 16 ++-- src/bin/lttng-sessiond/main.c | 2 +- src/bin/lttng-sessiond/ust-consumer.c | 31 ++++--- src/common/consumer.c | 29 +++++++ src/common/consumer.h | 1 + src/common/kernel-consumer/kernel-consumer.c | 52 +++++++++++- src/common/sessiond-comm/sessiond-comm.h | 7 ++ src/common/ust-consumer/ust-consumer.c | 63 +++++++++++++- 12 files changed, 284 insertions(+), 56 deletions(-) diff --git a/src/bin/lttng-sessiond/cmd.c b/src/bin/lttng-sessiond/cmd.c index 2110e31e9..aaa5a8923 100644 --- a/src/bin/lttng-sessiond/cmd.c +++ b/src/bin/lttng-sessiond/cmd.c @@ -440,7 +440,7 @@ static int init_kernel_tracing(struct ltt_kernel_session *session) assert(socket->fd >= 0); pthread_mutex_lock(socket->lock); - ret = kernel_consumer_send_session(socket->fd, session); + ret = kernel_consumer_send_session(socket, session); pthread_mutex_unlock(socket->lock); if (ret < 0) { ret = LTTNG_ERR_KERN_CONSUMER_FAIL; @@ -528,7 +528,7 @@ error: */ static int send_consumer_relayd_socket(int domain, struct ltt_session *session, struct lttng_uri *relayd_uri, struct consumer_output *consumer, - int consumer_fd) + struct consumer_socket *consumer_sock) { int ret; struct lttcomm_sock *sock = NULL; @@ -557,7 +557,7 @@ static int send_consumer_relayd_socket(int domain, struct ltt_session *session, } /* Send relayd socket to consumer. */ - ret = consumer_send_relayd_socket(consumer_fd, sock, + ret = consumer_send_relayd_socket(consumer_sock, sock, consumer, relayd_uri->stype); if (ret < 0) { ret = LTTNG_ERR_ENABLE_CONSUMER_FAIL; @@ -593,7 +593,8 @@ close_sock: * session. */ static int send_consumer_relayd_sockets(int domain, - struct ltt_session *session, struct consumer_output *consumer, int fd) + struct ltt_session *session, struct consumer_output *consumer, + struct consumer_socket *sock) { int ret = LTTNG_OK; @@ -603,7 +604,7 @@ static int send_consumer_relayd_sockets(int domain, /* Sending control relayd socket. */ if (!consumer->dst.net.control_sock_sent) { ret = send_consumer_relayd_socket(domain, session, - &consumer->dst.net.control, consumer, fd); + &consumer->dst.net.control, consumer, sock); if (ret != LTTNG_OK) { goto error; } @@ -612,7 +613,7 @@ static int send_consumer_relayd_sockets(int domain, /* Sending data relayd socket. */ if (!consumer->dst.net.data_sock_sent) { ret = send_consumer_relayd_socket(domain, session, - &consumer->dst.net.data, consumer, fd); + &consumer->dst.net.data, consumer, sock); if (ret != LTTNG_OK) { goto error; } @@ -652,7 +653,7 @@ static int setup_relayd(struct ltt_session *session) pthread_mutex_lock(socket->lock); ret = send_consumer_relayd_sockets(LTTNG_DOMAIN_UST, session, - usess->consumer, socket->fd); + usess->consumer, socket); pthread_mutex_unlock(socket->lock); if (ret != LTTNG_OK) { goto error; @@ -669,7 +670,7 @@ static int setup_relayd(struct ltt_session *session) pthread_mutex_lock(socket->lock); ret = send_consumer_relayd_sockets(LTTNG_DOMAIN_KERNEL, session, - ksess->consumer, socket->fd); + ksess->consumer, socket); pthread_mutex_unlock(socket->lock); if (ret != LTTNG_OK) { goto error; @@ -1657,7 +1658,7 @@ int cmd_set_consumer_uri(int domain, struct ltt_session *session, pthread_mutex_lock(socket->lock); ret = send_consumer_relayd_socket(domain, session, &uris[i], - consumer, socket->fd); + consumer, socket); pthread_mutex_unlock(socket->lock); if (ret != LTTNG_OK) { rcu_read_unlock(); diff --git a/src/bin/lttng-sessiond/consumer.c b/src/bin/lttng-sessiond/consumer.c index bf4779335..92a6c5ddc 100644 --- a/src/bin/lttng-sessiond/consumer.c +++ b/src/bin/lttng-sessiond/consumer.c @@ -30,6 +30,38 @@ #include "consumer.h" +/* + * Receive a reply command status message from the consumer. Consumer socket + * lock MUST be acquired before calling this function. + * + * Return 0 on success, -1 on recv error or a negative lttng error code which + * was possibly returned by the consumer. + */ +int consumer_recv_status_reply(struct consumer_socket *sock) +{ + int ret; + struct lttcomm_consumer_status_msg reply; + + assert(sock); + + ret = lttcomm_recv_unix_sock(sock->fd, &reply, sizeof(reply)); + if (ret < 0) { + PERROR("recv consumer status msg"); + goto end; + } + + if (reply.ret_code == LTTNG_OK) { + /* All good. */ + ret = 0; + } else { + ret = -reply.ret_code; + ERR("Consumer ret code %d", reply.ret_code); + } + +end: + return ret; +} + /* * Send destroy relayd command to consumer. * @@ -58,14 +90,18 @@ int consumer_send_destroy_relayd(struct consumer_socket *sock, pthread_mutex_lock(sock->lock); ret = lttcomm_send_unix_sock(sock->fd, &msg, sizeof(msg)); - pthread_mutex_unlock(sock->lock); if (ret < 0) { PERROR("send consumer destroy relayd command"); - goto error; + goto error_send; } + /* Don't check the return value. The caller will do it. */ + ret = consumer_recv_status_reply(sock); + DBG2("Consumer send destroy relayd command done"); +error_send: + pthread_mutex_unlock(sock->lock); error: return ret; } @@ -444,19 +480,22 @@ error: /* * Send file descriptor to consumer via sock. */ -int consumer_send_fds(int sock, int *fds, size_t nb_fd) +int consumer_send_fds(struct consumer_socket *sock, int *fds, size_t nb_fd) { int ret; assert(fds); + assert(sock); assert(nb_fd > 0); - ret = lttcomm_send_fds_unix_sock(sock, fds, nb_fd); + ret = lttcomm_send_fds_unix_sock(sock->fd, fds, nb_fd); if (ret < 0) { PERROR("send consumer fds"); goto error; } + ret = consumer_recv_status_reply(sock); + error: return ret; } @@ -464,20 +503,24 @@ error: /* * Consumer send channel communication message structure to consumer. */ -int consumer_send_channel(int sock, struct lttcomm_consumer_msg *msg) +int consumer_send_channel(struct consumer_socket *sock, + struct lttcomm_consumer_msg *msg) { int ret; assert(msg); - assert(sock >= 0); + assert(sock); + assert(sock->fd >= 0); - ret = lttcomm_send_unix_sock(sock, msg, + ret = lttcomm_send_unix_sock(sock->fd, msg, sizeof(struct lttcomm_consumer_msg)); if (ret < 0) { PERROR("send consumer channel"); goto error; } + ret = consumer_recv_status_reply(sock); + error: return ret; } @@ -553,13 +596,15 @@ void consumer_init_stream_comm_msg(struct lttcomm_consumer_msg *msg, /* * Send stream communication structure to the consumer. */ -int consumer_send_stream(int sock, struct consumer_output *dst, - struct lttcomm_consumer_msg *msg, int *fds, size_t nb_fd) +int consumer_send_stream(struct consumer_socket *sock, + struct consumer_output *dst, struct lttcomm_consumer_msg *msg, + int *fds, size_t nb_fd) { int ret; assert(msg); assert(dst); + assert(sock); switch (dst->type) { case CONSUMER_DST_NET: @@ -585,13 +630,18 @@ int consumer_send_stream(int sock, struct consumer_output *dst, } /* Send on socket */ - ret = lttcomm_send_unix_sock(sock, msg, + ret = lttcomm_send_unix_sock(sock->fd, msg, sizeof(struct lttcomm_consumer_msg)); if (ret < 0) { PERROR("send consumer stream"); goto error; } + ret = consumer_recv_status_reply(sock); + if (ret < 0) { + goto error; + } + ret = consumer_send_fds(sock, fds, nb_fd); if (ret < 0) { goto error; @@ -606,7 +656,7 @@ error: * * On success return positive value. On error, negative value. */ -int consumer_send_relayd_socket(int consumer_sock, +int consumer_send_relayd_socket(struct consumer_socket *consumer_sock, struct lttcomm_sock *sock, struct consumer_output *consumer, enum lttng_stream_type type) { @@ -616,6 +666,7 @@ int consumer_send_relayd_socket(int consumer_sock, /* Code flow error. Safety net. */ assert(sock); assert(consumer); + assert(consumer_sock); /* Bail out if consumer is disabled */ if (!consumer->enabled) { @@ -633,13 +684,18 @@ int consumer_send_relayd_socket(int consumer_sock, msg.u.relayd_sock.type = type; memcpy(&msg.u.relayd_sock.sock, sock, sizeof(msg.u.relayd_sock.sock)); - DBG3("Sending relayd sock info to consumer on %d", consumer_sock); - ret = lttcomm_send_unix_sock(consumer_sock, &msg, sizeof(msg)); + DBG3("Sending relayd sock info to consumer on %d", consumer_sock->fd); + ret = lttcomm_send_unix_sock(consumer_sock->fd, &msg, sizeof(msg)); if (ret < 0) { PERROR("send consumer relayd socket info"); goto error; } + ret = consumer_recv_status_reply(consumer_sock); + if (ret < 0) { + goto error; + } + DBG3("Sending relayd socket file descriptor to consumer"); ret = consumer_send_fds(consumer_sock, &sock->fd, 1); if (ret < 0) { @@ -739,6 +795,11 @@ int consumer_is_data_pending(unsigned int id, goto error; } + /* + * No need for a recv reply status because the answer to the command is + * the reply status message. + */ + ret = lttcomm_recv_unix_sock(socket->fd, &ret_code, sizeof(ret_code)); if (ret < 0) { PERROR("recv consumer data pending status"); diff --git a/src/bin/lttng-sessiond/consumer.h b/src/bin/lttng-sessiond/consumer.h index e45d5b0bf..01548cd17 100644 --- a/src/bin/lttng-sessiond/consumer.h +++ b/src/bin/lttng-sessiond/consumer.h @@ -163,15 +163,18 @@ struct consumer_output *consumer_copy_output(struct consumer_output *obj); void consumer_destroy_output(struct consumer_output *obj); int consumer_set_network_uri(struct consumer_output *obj, struct lttng_uri *uri); -int consumer_send_fds(int sock, int *fds, size_t nb_fd); -int consumer_send_stream(int sock, struct consumer_output *dst, - struct lttcomm_consumer_msg *msg, int *fds, size_t nb_fd); -int consumer_send_channel(int sock, struct lttcomm_consumer_msg *msg); -int consumer_send_relayd_socket(int consumer_sock, +int consumer_send_fds(struct consumer_socket *sock, int *fds, size_t nb_fd); +int consumer_send_stream(struct consumer_socket *sock, + struct consumer_output *dst, struct lttcomm_consumer_msg *msg, + int *fds, size_t nb_fd); +int consumer_send_channel(struct consumer_socket *sock, + struct lttcomm_consumer_msg *msg); +int consumer_send_relayd_socket(struct consumer_socket *consumer_sock, struct lttcomm_sock *sock, struct consumer_output *consumer, enum lttng_stream_type type); int consumer_send_destroy_relayd(struct consumer_socket *sock, struct consumer_output *consumer); +int consumer_recv_status_reply(struct consumer_socket *sock); void consumer_output_send_destroy_relayd(struct consumer_output *consumer); int consumer_create_socket(struct consumer_data *data, struct consumer_output *output); diff --git a/src/bin/lttng-sessiond/kernel-consumer.c b/src/bin/lttng-sessiond/kernel-consumer.c index c86d52803..2a9bf9983 100644 --- a/src/bin/lttng-sessiond/kernel-consumer.c +++ b/src/bin/lttng-sessiond/kernel-consumer.c @@ -31,7 +31,8 @@ /* * Sending a single channel to the consumer with command ADD_CHANNEL. */ -int kernel_consumer_add_channel(int sock, struct ltt_kernel_channel *channel) +int kernel_consumer_add_channel(struct consumer_socket *sock, + struct ltt_kernel_channel *channel) { int ret; struct lttcomm_consumer_msg lkm; @@ -63,7 +64,8 @@ error: /* * Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM. */ -int kernel_consumer_add_metadata(int sock, struct ltt_kernel_session *session) +int kernel_consumer_add_metadata(struct consumer_socket *sock, + struct ltt_kernel_session *session) { int ret; char tmp_path[PATH_MAX]; @@ -74,6 +76,7 @@ int kernel_consumer_add_metadata(int sock, struct ltt_kernel_session *session) /* Safety net */ assert(session); assert(session->consumer); + assert(sock); DBG("Sending metadata %d to kernel consumer", session->metadata_stream_fd); @@ -155,8 +158,9 @@ error: /* * Sending a single stream to the consumer with command ADD_STREAM. */ -int kernel_consumer_add_stream(int sock, struct ltt_kernel_channel *channel, - struct ltt_kernel_stream *stream, struct ltt_kernel_session *session) +int kernel_consumer_add_stream(struct consumer_socket *sock, + struct ltt_kernel_channel *channel, struct ltt_kernel_stream *stream, + struct ltt_kernel_session *session) { int ret; char tmp_path[PATH_MAX]; @@ -168,6 +172,7 @@ int kernel_consumer_add_stream(int sock, struct ltt_kernel_channel *channel, assert(stream); assert(session); assert(session->consumer); + assert(sock); DBG("Sending stream %d of channel %s to kernel consumer", stream->fd, channel->channel->name); @@ -224,7 +229,7 @@ error: /* * Send all stream fds of kernel channel to the consumer. */ -int kernel_consumer_send_channel_stream(int sock, +int kernel_consumer_send_channel_stream(struct consumer_socket *sock, struct ltt_kernel_channel *channel, struct ltt_kernel_session *session) { int ret; @@ -234,6 +239,7 @@ int kernel_consumer_send_channel_stream(int sock, assert(channel); assert(session); assert(session->consumer); + assert(sock); /* Bail out if consumer is disabled */ if (!session->consumer->enabled) { @@ -269,7 +275,8 @@ error: /* * Send all stream fds of the kernel session to the consumer. */ -int kernel_consumer_send_session(int sock, struct ltt_kernel_session *session) +int kernel_consumer_send_session(struct consumer_socket *sock, + struct ltt_kernel_session *session) { int ret; struct ltt_kernel_channel *chan; @@ -277,6 +284,7 @@ int kernel_consumer_send_session(int sock, struct ltt_kernel_session *session) /* Safety net */ assert(session); assert(session->consumer); + assert(sock); /* Bail out if consumer is disabled */ if (!session->consumer->enabled) { diff --git a/src/bin/lttng-sessiond/kernel-consumer.h b/src/bin/lttng-sessiond/kernel-consumer.h index 8aba019e6..07d7436a1 100644 --- a/src/bin/lttng-sessiond/kernel-consumer.h +++ b/src/bin/lttng-sessiond/kernel-consumer.h @@ -21,14 +21,18 @@ #include "trace-kernel.h" -int kernel_consumer_send_channel_stream(int sock, +int kernel_consumer_send_channel_stream(struct consumer_socket *sock, struct ltt_kernel_channel *channel, struct ltt_kernel_session *session); -int kernel_consumer_send_session(int sock, struct ltt_kernel_session *session); +int kernel_consumer_send_session(struct consumer_socket *sock, + struct ltt_kernel_session *session); -int kernel_consumer_add_stream(int sock, struct ltt_kernel_channel *channel, - struct ltt_kernel_stream *stream, struct ltt_kernel_session *session); +int kernel_consumer_add_stream(struct consumer_socket *sock, + struct ltt_kernel_channel *channel, struct ltt_kernel_stream *stream, + struct ltt_kernel_session *session); -int kernel_consumer_add_metadata(int sock, struct ltt_kernel_session *session); +int kernel_consumer_add_metadata(struct consumer_socket *sock, + struct ltt_kernel_session *session); -int kernel_consumer_add_channel(int sock, struct ltt_kernel_channel *channel); +int kernel_consumer_add_channel(struct consumer_socket *sock, + struct ltt_kernel_channel *channel); diff --git a/src/bin/lttng-sessiond/main.c b/src/bin/lttng-sessiond/main.c index d5cfc0e27..8a1f75311 100644 --- a/src/bin/lttng-sessiond/main.c +++ b/src/bin/lttng-sessiond/main.c @@ -633,7 +633,7 @@ static int update_kernel_stream(struct consumer_data *consumer_data, int fd) assert(socket->fd >= 0); pthread_mutex_lock(socket->lock); - ret = kernel_consumer_send_channel_stream(socket->fd, + ret = kernel_consumer_send_channel_stream(socket, channel, ksess); pthread_mutex_unlock(socket->lock); if (ret < 0) { diff --git a/src/bin/lttng-sessiond/ust-consumer.c b/src/bin/lttng-sessiond/ust-consumer.c index 465dd07d7..c1af765c9 100644 --- a/src/bin/lttng-sessiond/ust-consumer.c +++ b/src/bin/lttng-sessiond/ust-consumer.c @@ -32,15 +32,17 @@ /* * Send a single channel to the consumer using command ADD_CHANNEL. */ -static int send_channel(int sock, struct ust_app_channel *uchan) +static int send_channel(struct consumer_socket *sock, + struct ust_app_channel *uchan) { int ret, fd; struct lttcomm_consumer_msg msg; /* Safety net */ assert(uchan); + assert(sock); - if (sock < 0) { + if (sock->fd < 0) { ret = -EINVAL; goto error; } @@ -73,9 +75,10 @@ error: /* * Send a single stream to the consumer using ADD_STREAM command. */ -static int send_channel_stream(int sock, struct ust_app_channel *uchan, - struct ust_app_session *usess, struct ltt_ust_stream *stream, - struct consumer_output *consumer, const char *pathname) +static int send_channel_stream(struct consumer_socket *sock, + struct ust_app_channel *uchan, struct ust_app_session *usess, + struct ltt_ust_stream *stream, struct consumer_output *consumer, + const char *pathname) { int ret, fds[2]; struct lttcomm_consumer_msg msg; @@ -85,6 +88,7 @@ static int send_channel_stream(int sock, struct ust_app_channel *uchan, assert(usess); assert(stream); assert(consumer); + assert(sock); DBG2("Sending stream %d of channel %s to kernel consumer", stream->obj->shm_fd, uchan->name); @@ -119,7 +123,7 @@ error: /* * Send all stream fds of UST channel to the consumer. */ -static int send_channel_streams(int sock, +static int send_channel_streams(struct consumer_socket *sock, struct ust_app_channel *uchan, struct ust_app_session *usess, struct consumer_output *consumer) { @@ -128,6 +132,8 @@ static int send_channel_streams(int sock, const char *pathname; struct ltt_ust_stream *stream, *tmp; + assert(sock); + DBG("Sending streams of channel %s to UST consumer", uchan->name); ret = send_channel(sock, uchan); @@ -180,8 +186,8 @@ error: /* * Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM. */ -static int send_metadata(int sock, struct ust_app_session *usess, - struct consumer_output *consumer) +static int send_metadata(struct consumer_socket *sock, + struct ust_app_session *usess, struct consumer_output *consumer) { int ret, fd, fds[2]; char tmp_path[PATH_MAX]; @@ -191,9 +197,10 @@ static int send_metadata(int sock, struct ust_app_session *usess, /* Safety net */ assert(usess); assert(consumer); + assert(sock); - if (sock < 0) { - ERR("Consumer socket is negative (%d)", sock); + if (sock->fd < 0) { + ERR("Consumer socket is negative (%d)", sock->fd); return -EINVAL; } @@ -305,7 +312,7 @@ int ust_consumer_send_session(struct ust_app_session *usess, pthread_mutex_lock(sock->lock); /* Sending metadata information to the consumer */ - ret = send_metadata(sock->fd, usess, consumer); + ret = send_metadata(sock, usess, consumer); if (ret < 0) { goto error; } @@ -322,7 +329,7 @@ int ust_consumer_send_session(struct ust_app_session *usess, continue; } - ret = send_channel_streams(sock->fd, ua_chan, usess, consumer); + ret = send_channel_streams(sock, ua_chan, usess, consumer); if (ret < 0) { rcu_read_unlock(); goto error; diff --git a/src/common/consumer.c b/src/common/consumer.c index 70cde9c10..f57cfffff 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -2679,10 +2679,18 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock) { int fd = -1, ret = -1; + enum lttng_error_code ret_code = LTTNG_OK; struct consumer_relayd_sock_pair *relayd; DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx); + /* First send a status message before receiving the fds. */ + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto error; + } + /* Get relayd reference if exists. */ relayd = consumer_find_relayd(net_seq_idx); if (relayd == NULL) { @@ -2709,6 +2717,13 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, goto error; } + /* We have the fds without error. Send status back. */ + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto error; + } + /* Copy socket information and received FD */ switch (sock_type) { case LTTNG_STREAM_CONTROL: @@ -2913,3 +2928,17 @@ data_not_pending: rcu_read_unlock(); return 1; } + +/* + * Send a ret code status message to the sessiond daemon. + * + * Return the sendmsg() return value. + */ +int consumer_send_status_msg(int sock, int ret_code) +{ + struct lttcomm_consumer_status_msg msg; + + msg.ret_code = ret_code; + + return lttcomm_send_unix_sock(sock, &msg, sizeof(msg)); +} diff --git a/src/common/consumer.h b/src/common/consumer.h index 87ba490d7..a476dd5bf 100644 --- a/src/common/consumer.h +++ b/src/common/consumer.h @@ -416,5 +416,6 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, void consumer_flag_relayd_for_destroy( struct consumer_relayd_sock_pair *relayd); int consumer_data_pending(uint64_t id); +int consumer_send_status_msg(int sock, int ret_code); #endif /* LIB_CONSUMER_H */ diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 0b0592adb..40d0f7196 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -88,6 +88,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, int sock, struct pollfd *consumer_sockpoll) { ssize_t ret; + enum lttng_error_code ret_code = LTTNG_OK; struct lttcomm_consumer_msg msg; ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg)); @@ -96,6 +97,14 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, return ret; } if (msg.cmd_type == LTTNG_CONSUMER_STOP) { + /* + * Notify the session daemon that the command is completed. + * + * On transport layer error, the function call will print an error + * message so handling the returned code is a bit useless since we + * return an error code anyway. + */ + (void) consumer_send_status_msg(sock, ret_code); return -ENOENT; } @@ -105,6 +114,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, switch (msg.cmd_type) { case LTTNG_CONSUMER_ADD_RELAYD_SOCKET: { + /* Session daemon status message are handled in the following call. */ ret = consumer_add_relayd_socket(msg.u.relayd_sock.net_index, msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll, &msg.u.relayd_sock.sock); @@ -114,6 +124,13 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, { struct lttng_consumer_channel *new_channel; + /* First send a status message before receiving the fds. */ + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto end_nosignal; + } + DBG("consumer_add_channel %d", msg.u.channel.channel_key); new_channel = consumer_allocate_channel(msg.u.channel.channel_key, -1, -1, @@ -143,6 +160,13 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, struct lttng_consumer_stream *new_stream; int alloc_ret = 0; + /* First send a status message before receiving the fds. */ + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto end_nosignal; + } + /* block */ if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { rcu_read_unlock(); @@ -157,6 +181,17 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, return ret; } + /* + * Send status code to session daemon only if the recv works. If the + * above recv() failed, the session daemon is notified through the + * error socket and the teardown is eventually done. + */ + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto end_nosignal; + } + new_stream = consumer_allocate_stream(msg.u.stream.channel_key, msg.u.stream.stream_key, fd, fd, @@ -263,7 +298,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, relayd = consumer_find_relayd(index); if (relayd == NULL) { ERR("Unable to find relayd %" PRIu64, index); - goto end_nosignal; + ret_code = LTTNG_ERR_NO_CONSUMER; } /* @@ -276,7 +311,15 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, * * The destroy can happen either here or when a stream fd hangs up. */ - consumer_flag_relayd_for_destroy(relayd); + if (relayd) { + consumer_flag_relayd_for_destroy(relayd); + } + + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto end_nosignal; + } goto end_nosignal; } @@ -294,6 +337,11 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, if (ret < 0) { PERROR("send data pending ret code"); } + + /* + * No need to send back a status message since the data pending + * returned value is the response. + */ break; } default: diff --git a/src/common/sessiond-comm/sessiond-comm.h b/src/common/sessiond-comm/sessiond-comm.h index 011a1290a..4dd573122 100644 --- a/src/common/sessiond-comm/sessiond-comm.h +++ b/src/common/sessiond-comm/sessiond-comm.h @@ -291,6 +291,13 @@ struct lttcomm_consumer_msg { } u; }; +/* + * Status message returned to the sessiond after a received command. + */ +struct lttcomm_consumer_status_msg { + enum lttng_error_code ret_code; +}; + #ifdef HAVE_LIBLTTNG_UST_CTL #include diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 5a716859a..c0999b613 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -101,6 +101,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, int sock, struct pollfd *consumer_sockpoll) { ssize_t ret; + enum lttng_error_code ret_code = LTTNG_OK; struct lttcomm_consumer_msg msg; ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg)); @@ -111,6 +112,14 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, return ret; } if (msg.cmd_type == LTTNG_CONSUMER_STOP) { + /* + * Notify the session daemon that the command is completed. + * + * On transport layer error, the function call will print an error + * message so handling the returned code is a bit useless since we + * return an error code anyway. + */ + (void) consumer_send_status_msg(sock, ret_code); return -ENOENT; } @@ -120,6 +129,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, switch (msg.cmd_type) { case LTTNG_CONSUMER_ADD_RELAYD_SOCKET: { + /* Session daemon status message are handled in the following call. */ ret = consumer_add_relayd_socket(msg.u.relayd_sock.net_index, msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll, &msg.u.relayd_sock.sock); @@ -133,6 +143,13 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, DBG("UST Consumer adding channel"); + /* First send a status message before receiving the fds. */ + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto end_nosignal; + } + /* block */ if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { rcu_read_unlock(); @@ -145,6 +162,17 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, return ret; } + /* + * Send status code to session daemon only if the recv works. If the + * above recv() failed, the session daemon is notified through the + * error socket and the teardown is eventually done. + */ + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto end_nosignal; + } + DBG("consumer_add_channel %d", msg.u.channel.channel_key); new_channel = consumer_allocate_channel(msg.u.channel.channel_key, @@ -178,6 +206,13 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, DBG("UST Consumer adding stream"); + /* First send a status message before receiving the fds. */ + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto end_nosignal; + } + /* block */ if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { rcu_read_unlock(); @@ -190,6 +225,17 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, return ret; } + /* + * Send status code to session daemon only if the recv works. If the + * above recv() failed, the session daemon is notified through the + * error socket and the teardown is eventually done. + */ + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto end_nosignal; + } + DBG("Consumer command ADD_STREAM chan %d stream %d", msg.u.stream.channel_key, msg.u.stream.stream_key); @@ -288,7 +334,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, relayd = consumer_find_relayd(index); if (relayd == NULL) { ERR("Unable to find relayd %" PRIu64, index); - goto end_nosignal; + ret_code = LTTNG_ERR_NO_CONSUMER; } /* @@ -301,7 +347,15 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, * * The destroy can happen either here or when a stream fd hangs up. */ - consumer_flag_relayd_for_destroy(relayd); + if (relayd) { + consumer_flag_relayd_for_destroy(relayd); + } + + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto end_nosignal; + } goto end_nosignal; } @@ -324,6 +378,11 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, if (ret < 0) { PERROR("send data pending ret code"); } + + /* + * No need to send back a status message since the data pending + * returned value is the response. + */ break; } default: -- 2.34.1