This is needed to avoid buffer bloating when throttling communication
between the consumer and the relayd. Considering a very low bandwith
limit between the relayd and consumerd, the session daemon would send a
high debit of commands to the consumer without ever
emptying the unix socket queue, which makes the UNIX socket reach buffer
full conditions, which is prone to trigger corner-cases behaviors in
blocking send/recv with MSG_WAITALL, which is likely the cause of hang
experienced when limiting relayd bandwidth.
Adding an ACK to each command makes sure that we acknowledge the session
daemon that we, the consumer, have emptied the unix socket buffer.
NOTE: In consumer_add_relayd_socket(), there might be a problem with the
error path and message status to the sessiond. A subsequent patch might
fix a possible issue but for now it is not at all critical since any
critical error on the consumer side will notify the sessiond through the
error socket.
Acked-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
Signed-off-by: David Goulet <dgoulet@efficios.com>
12 files changed:
assert(socket->fd >= 0);
pthread_mutex_lock(socket->lock);
assert(socket->fd >= 0);
pthread_mutex_lock(socket->lock);
- ret = kernel_consumer_send_session(socket->fd, session);
+ ret = kernel_consumer_send_session(socket, session);
pthread_mutex_unlock(socket->lock);
if (ret < 0) {
ret = LTTNG_ERR_KERN_CONSUMER_FAIL;
pthread_mutex_unlock(socket->lock);
if (ret < 0) {
ret = LTTNG_ERR_KERN_CONSUMER_FAIL;
*/
static int send_consumer_relayd_socket(int domain, struct ltt_session *session,
struct lttng_uri *relayd_uri, struct consumer_output *consumer,
*/
static int send_consumer_relayd_socket(int domain, struct ltt_session *session,
struct lttng_uri *relayd_uri, struct consumer_output *consumer,
+ struct consumer_socket *consumer_sock)
{
int ret;
struct lttcomm_sock *sock = NULL;
{
int ret;
struct lttcomm_sock *sock = NULL;
}
/* Send relayd socket to consumer. */
}
/* Send relayd socket to consumer. */
- ret = consumer_send_relayd_socket(consumer_fd, sock,
+ ret = consumer_send_relayd_socket(consumer_sock, sock,
consumer, relayd_uri->stype);
if (ret < 0) {
ret = LTTNG_ERR_ENABLE_CONSUMER_FAIL;
consumer, relayd_uri->stype);
if (ret < 0) {
ret = LTTNG_ERR_ENABLE_CONSUMER_FAIL;
* session.
*/
static int send_consumer_relayd_sockets(int domain,
* session.
*/
static int send_consumer_relayd_sockets(int domain,
- struct ltt_session *session, struct consumer_output *consumer, int fd)
+ struct ltt_session *session, struct consumer_output *consumer,
+ struct consumer_socket *sock)
/* Sending control relayd socket. */
if (!consumer->dst.net.control_sock_sent) {
ret = send_consumer_relayd_socket(domain, session,
/* Sending control relayd socket. */
if (!consumer->dst.net.control_sock_sent) {
ret = send_consumer_relayd_socket(domain, session,
- &consumer->dst.net.control, consumer, fd);
+ &consumer->dst.net.control, consumer, sock);
if (ret != LTTNG_OK) {
goto error;
}
if (ret != LTTNG_OK) {
goto error;
}
/* Sending data relayd socket. */
if (!consumer->dst.net.data_sock_sent) {
ret = send_consumer_relayd_socket(domain, session,
/* Sending data relayd socket. */
if (!consumer->dst.net.data_sock_sent) {
ret = send_consumer_relayd_socket(domain, session,
- &consumer->dst.net.data, consumer, fd);
+ &consumer->dst.net.data, consumer, sock);
if (ret != LTTNG_OK) {
goto error;
}
if (ret != LTTNG_OK) {
goto error;
}
pthread_mutex_lock(socket->lock);
ret = send_consumer_relayd_sockets(LTTNG_DOMAIN_UST, session,
pthread_mutex_lock(socket->lock);
ret = send_consumer_relayd_sockets(LTTNG_DOMAIN_UST, session,
- usess->consumer, socket->fd);
+ usess->consumer, socket);
pthread_mutex_unlock(socket->lock);
if (ret != LTTNG_OK) {
goto error;
pthread_mutex_unlock(socket->lock);
if (ret != LTTNG_OK) {
goto error;
pthread_mutex_lock(socket->lock);
ret = send_consumer_relayd_sockets(LTTNG_DOMAIN_KERNEL, session,
pthread_mutex_lock(socket->lock);
ret = send_consumer_relayd_sockets(LTTNG_DOMAIN_KERNEL, session,
- ksess->consumer, socket->fd);
+ ksess->consumer, socket);
pthread_mutex_unlock(socket->lock);
if (ret != LTTNG_OK) {
goto error;
pthread_mutex_unlock(socket->lock);
if (ret != LTTNG_OK) {
goto error;
pthread_mutex_lock(socket->lock);
ret = send_consumer_relayd_socket(domain, session, &uris[i],
pthread_mutex_lock(socket->lock);
ret = send_consumer_relayd_socket(domain, session, &uris[i],
pthread_mutex_unlock(socket->lock);
if (ret != LTTNG_OK) {
rcu_read_unlock();
pthread_mutex_unlock(socket->lock);
if (ret != LTTNG_OK) {
rcu_read_unlock();
+/*
+ * Receive a reply command status message from the consumer. Consumer socket
+ * lock MUST be acquired before calling this function.
+ *
+ * Return 0 on success, -1 on recv error or a negative lttng error code which
+ * was possibly returned by the consumer.
+ */
+int consumer_recv_status_reply(struct consumer_socket *sock)
+{
+ int ret;
+ struct lttcomm_consumer_status_msg reply;
+
+ assert(sock);
+
+ ret = lttcomm_recv_unix_sock(sock->fd, &reply, sizeof(reply));
+ if (ret < 0) {
+ PERROR("recv consumer status msg");
+ goto end;
+ }
+
+ if (reply.ret_code == LTTNG_OK) {
+ /* All good. */
+ ret = 0;
+ } else {
+ ret = -reply.ret_code;
+ ERR("Consumer ret code %d", reply.ret_code);
+ }
+
+end:
+ return ret;
+}
+
/*
* Send destroy relayd command to consumer.
*
/*
* Send destroy relayd command to consumer.
*
pthread_mutex_lock(sock->lock);
ret = lttcomm_send_unix_sock(sock->fd, &msg, sizeof(msg));
pthread_mutex_lock(sock->lock);
ret = lttcomm_send_unix_sock(sock->fd, &msg, sizeof(msg));
- pthread_mutex_unlock(sock->lock);
if (ret < 0) {
PERROR("send consumer destroy relayd command");
if (ret < 0) {
PERROR("send consumer destroy relayd command");
+ /* Don't check the return value. The caller will do it. */
+ ret = consumer_recv_status_reply(sock);
+
DBG2("Consumer send destroy relayd command done");
DBG2("Consumer send destroy relayd command done");
+error_send:
+ pthread_mutex_unlock(sock->lock);
/*
* Send file descriptor to consumer via sock.
*/
/*
* Send file descriptor to consumer via sock.
*/
-int consumer_send_fds(int sock, int *fds, size_t nb_fd)
+int consumer_send_fds(struct consumer_socket *sock, int *fds, size_t nb_fd)
- ret = lttcomm_send_fds_unix_sock(sock, fds, nb_fd);
+ ret = lttcomm_send_fds_unix_sock(sock->fd, fds, nb_fd);
if (ret < 0) {
PERROR("send consumer fds");
goto error;
}
if (ret < 0) {
PERROR("send consumer fds");
goto error;
}
+ ret = consumer_recv_status_reply(sock);
+
/*
* Consumer send channel communication message structure to consumer.
*/
/*
* Consumer send channel communication message structure to consumer.
*/
-int consumer_send_channel(int sock, struct lttcomm_consumer_msg *msg)
+int consumer_send_channel(struct consumer_socket *sock,
+ struct lttcomm_consumer_msg *msg)
+ assert(sock);
+ assert(sock->fd >= 0);
- ret = lttcomm_send_unix_sock(sock, msg,
+ ret = lttcomm_send_unix_sock(sock->fd, msg,
sizeof(struct lttcomm_consumer_msg));
if (ret < 0) {
PERROR("send consumer channel");
goto error;
}
sizeof(struct lttcomm_consumer_msg));
if (ret < 0) {
PERROR("send consumer channel");
goto error;
}
+ ret = consumer_recv_status_reply(sock);
+
/*
* Send stream communication structure to the consumer.
*/
/*
* Send stream communication structure to the consumer.
*/
-int consumer_send_stream(int sock, struct consumer_output *dst,
- struct lttcomm_consumer_msg *msg, int *fds, size_t nb_fd)
+int consumer_send_stream(struct consumer_socket *sock,
+ struct consumer_output *dst, struct lttcomm_consumer_msg *msg,
+ int *fds, size_t nb_fd)
{
int ret;
assert(msg);
assert(dst);
{
int ret;
assert(msg);
assert(dst);
switch (dst->type) {
case CONSUMER_DST_NET:
switch (dst->type) {
case CONSUMER_DST_NET:
- ret = lttcomm_send_unix_sock(sock, msg,
+ ret = lttcomm_send_unix_sock(sock->fd, msg,
sizeof(struct lttcomm_consumer_msg));
if (ret < 0) {
PERROR("send consumer stream");
goto error;
}
sizeof(struct lttcomm_consumer_msg));
if (ret < 0) {
PERROR("send consumer stream");
goto error;
}
+ ret = consumer_recv_status_reply(sock);
+ if (ret < 0) {
+ goto error;
+ }
+
ret = consumer_send_fds(sock, fds, nb_fd);
if (ret < 0) {
goto error;
ret = consumer_send_fds(sock, fds, nb_fd);
if (ret < 0) {
goto error;
*
* On success return positive value. On error, negative value.
*/
*
* On success return positive value. On error, negative value.
*/
-int consumer_send_relayd_socket(int consumer_sock,
+int consumer_send_relayd_socket(struct consumer_socket *consumer_sock,
struct lttcomm_sock *sock, struct consumer_output *consumer,
enum lttng_stream_type type)
{
struct lttcomm_sock *sock, struct consumer_output *consumer,
enum lttng_stream_type type)
{
/* Code flow error. Safety net. */
assert(sock);
assert(consumer);
/* Code flow error. Safety net. */
assert(sock);
assert(consumer);
/* Bail out if consumer is disabled */
if (!consumer->enabled) {
/* Bail out if consumer is disabled */
if (!consumer->enabled) {
msg.u.relayd_sock.type = type;
memcpy(&msg.u.relayd_sock.sock, sock, sizeof(msg.u.relayd_sock.sock));
msg.u.relayd_sock.type = type;
memcpy(&msg.u.relayd_sock.sock, sock, sizeof(msg.u.relayd_sock.sock));
- DBG3("Sending relayd sock info to consumer on %d", consumer_sock);
- ret = lttcomm_send_unix_sock(consumer_sock, &msg, sizeof(msg));
+ DBG3("Sending relayd sock info to consumer on %d", consumer_sock->fd);
+ ret = lttcomm_send_unix_sock(consumer_sock->fd, &msg, sizeof(msg));
if (ret < 0) {
PERROR("send consumer relayd socket info");
goto error;
}
if (ret < 0) {
PERROR("send consumer relayd socket info");
goto error;
}
+ ret = consumer_recv_status_reply(consumer_sock);
+ if (ret < 0) {
+ goto error;
+ }
+
DBG3("Sending relayd socket file descriptor to consumer");
ret = consumer_send_fds(consumer_sock, &sock->fd, 1);
if (ret < 0) {
DBG3("Sending relayd socket file descriptor to consumer");
ret = consumer_send_fds(consumer_sock, &sock->fd, 1);
if (ret < 0) {
+ /*
+ * No need for a recv reply status because the answer to the command is
+ * the reply status message.
+ */
+
ret = lttcomm_recv_unix_sock(socket->fd, &ret_code, sizeof(ret_code));
if (ret < 0) {
PERROR("recv consumer data pending status");
ret = lttcomm_recv_unix_sock(socket->fd, &ret_code, sizeof(ret_code));
if (ret < 0) {
PERROR("recv consumer data pending status");
void consumer_destroy_output(struct consumer_output *obj);
int consumer_set_network_uri(struct consumer_output *obj,
struct lttng_uri *uri);
void consumer_destroy_output(struct consumer_output *obj);
int consumer_set_network_uri(struct consumer_output *obj,
struct lttng_uri *uri);
-int consumer_send_fds(int sock, int *fds, size_t nb_fd);
-int consumer_send_stream(int sock, struct consumer_output *dst,
- struct lttcomm_consumer_msg *msg, int *fds, size_t nb_fd);
-int consumer_send_channel(int sock, struct lttcomm_consumer_msg *msg);
-int consumer_send_relayd_socket(int consumer_sock,
+int consumer_send_fds(struct consumer_socket *sock, int *fds, size_t nb_fd);
+int consumer_send_stream(struct consumer_socket *sock,
+ struct consumer_output *dst, struct lttcomm_consumer_msg *msg,
+ int *fds, size_t nb_fd);
+int consumer_send_channel(struct consumer_socket *sock,
+ struct lttcomm_consumer_msg *msg);
+int consumer_send_relayd_socket(struct consumer_socket *consumer_sock,
struct lttcomm_sock *sock, struct consumer_output *consumer,
enum lttng_stream_type type);
int consumer_send_destroy_relayd(struct consumer_socket *sock,
struct consumer_output *consumer);
struct lttcomm_sock *sock, struct consumer_output *consumer,
enum lttng_stream_type type);
int consumer_send_destroy_relayd(struct consumer_socket *sock,
struct consumer_output *consumer);
+int consumer_recv_status_reply(struct consumer_socket *sock);
void consumer_output_send_destroy_relayd(struct consumer_output *consumer);
int consumer_create_socket(struct consumer_data *data,
struct consumer_output *output);
void consumer_output_send_destroy_relayd(struct consumer_output *consumer);
int consumer_create_socket(struct consumer_data *data,
struct consumer_output *output);
/*
* Sending a single channel to the consumer with command ADD_CHANNEL.
*/
/*
* Sending a single channel to the consumer with command ADD_CHANNEL.
*/
-int kernel_consumer_add_channel(int sock, struct ltt_kernel_channel *channel)
+int kernel_consumer_add_channel(struct consumer_socket *sock,
+ struct ltt_kernel_channel *channel)
{
int ret;
struct lttcomm_consumer_msg lkm;
{
int ret;
struct lttcomm_consumer_msg lkm;
/*
* Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM.
*/
/*
* Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM.
*/
-int kernel_consumer_add_metadata(int sock, struct ltt_kernel_session *session)
+int kernel_consumer_add_metadata(struct consumer_socket *sock,
+ struct ltt_kernel_session *session)
{
int ret;
char tmp_path[PATH_MAX];
{
int ret;
char tmp_path[PATH_MAX];
/* Safety net */
assert(session);
assert(session->consumer);
/* Safety net */
assert(session);
assert(session->consumer);
DBG("Sending metadata %d to kernel consumer", session->metadata_stream_fd);
DBG("Sending metadata %d to kernel consumer", session->metadata_stream_fd);
/*
* Sending a single stream to the consumer with command ADD_STREAM.
*/
/*
* Sending a single stream to the consumer with command ADD_STREAM.
*/
-int kernel_consumer_add_stream(int sock, struct ltt_kernel_channel *channel,
- struct ltt_kernel_stream *stream, struct ltt_kernel_session *session)
+int kernel_consumer_add_stream(struct consumer_socket *sock,
+ struct ltt_kernel_channel *channel, struct ltt_kernel_stream *stream,
+ struct ltt_kernel_session *session)
{
int ret;
char tmp_path[PATH_MAX];
{
int ret;
char tmp_path[PATH_MAX];
assert(stream);
assert(session);
assert(session->consumer);
assert(stream);
assert(session);
assert(session->consumer);
DBG("Sending stream %d of channel %s to kernel consumer",
stream->fd, channel->channel->name);
DBG("Sending stream %d of channel %s to kernel consumer",
stream->fd, channel->channel->name);
/*
* Send all stream fds of kernel channel to the consumer.
*/
/*
* Send all stream fds of kernel channel to the consumer.
*/
-int kernel_consumer_send_channel_stream(int sock,
+int kernel_consumer_send_channel_stream(struct consumer_socket *sock,
struct ltt_kernel_channel *channel, struct ltt_kernel_session *session)
{
int ret;
struct ltt_kernel_channel *channel, struct ltt_kernel_session *session)
{
int ret;
assert(channel);
assert(session);
assert(session->consumer);
assert(channel);
assert(session);
assert(session->consumer);
/* Bail out if consumer is disabled */
if (!session->consumer->enabled) {
/* Bail out if consumer is disabled */
if (!session->consumer->enabled) {
/*
* Send all stream fds of the kernel session to the consumer.
*/
/*
* Send all stream fds of the kernel session to the consumer.
*/
-int kernel_consumer_send_session(int sock, struct ltt_kernel_session *session)
+int kernel_consumer_send_session(struct consumer_socket *sock,
+ struct ltt_kernel_session *session)
{
int ret;
struct ltt_kernel_channel *chan;
{
int ret;
struct ltt_kernel_channel *chan;
/* Safety net */
assert(session);
assert(session->consumer);
/* Safety net */
assert(session);
assert(session->consumer);
/* Bail out if consumer is disabled */
if (!session->consumer->enabled) {
/* Bail out if consumer is disabled */
if (!session->consumer->enabled) {
#include "trace-kernel.h"
#include "trace-kernel.h"
-int kernel_consumer_send_channel_stream(int sock,
+int kernel_consumer_send_channel_stream(struct consumer_socket *sock,
struct ltt_kernel_channel *channel, struct ltt_kernel_session *session);
struct ltt_kernel_channel *channel, struct ltt_kernel_session *session);
-int kernel_consumer_send_session(int sock, struct ltt_kernel_session *session);
+int kernel_consumer_send_session(struct consumer_socket *sock,
+ struct ltt_kernel_session *session);
-int kernel_consumer_add_stream(int sock, struct ltt_kernel_channel *channel,
- struct ltt_kernel_stream *stream, struct ltt_kernel_session *session);
+int kernel_consumer_add_stream(struct consumer_socket *sock,
+ struct ltt_kernel_channel *channel, struct ltt_kernel_stream *stream,
+ struct ltt_kernel_session *session);
-int kernel_consumer_add_metadata(int sock, struct ltt_kernel_session *session);
+int kernel_consumer_add_metadata(struct consumer_socket *sock,
+ struct ltt_kernel_session *session);
-int kernel_consumer_add_channel(int sock, struct ltt_kernel_channel *channel);
+int kernel_consumer_add_channel(struct consumer_socket *sock,
+ struct ltt_kernel_channel *channel);
assert(socket->fd >= 0);
pthread_mutex_lock(socket->lock);
assert(socket->fd >= 0);
pthread_mutex_lock(socket->lock);
- ret = kernel_consumer_send_channel_stream(socket->fd,
+ ret = kernel_consumer_send_channel_stream(socket,
channel, ksess);
pthread_mutex_unlock(socket->lock);
if (ret < 0) {
channel, ksess);
pthread_mutex_unlock(socket->lock);
if (ret < 0) {
/*
* Send a single channel to the consumer using command ADD_CHANNEL.
*/
/*
* Send a single channel to the consumer using command ADD_CHANNEL.
*/
-static int send_channel(int sock, struct ust_app_channel *uchan)
+static int send_channel(struct consumer_socket *sock,
+ struct ust_app_channel *uchan)
{
int ret, fd;
struct lttcomm_consumer_msg msg;
/* Safety net */
assert(uchan);
{
int ret, fd;
struct lttcomm_consumer_msg msg;
/* Safety net */
assert(uchan);
ret = -EINVAL;
goto error;
}
ret = -EINVAL;
goto error;
}
/*
* Send a single stream to the consumer using ADD_STREAM command.
*/
/*
* Send a single stream to the consumer using ADD_STREAM command.
*/
-static int send_channel_stream(int sock, struct ust_app_channel *uchan,
- struct ust_app_session *usess, struct ltt_ust_stream *stream,
- struct consumer_output *consumer, const char *pathname)
+static int send_channel_stream(struct consumer_socket *sock,
+ struct ust_app_channel *uchan, struct ust_app_session *usess,
+ struct ltt_ust_stream *stream, struct consumer_output *consumer,
+ const char *pathname)
{
int ret, fds[2];
struct lttcomm_consumer_msg msg;
{
int ret, fds[2];
struct lttcomm_consumer_msg msg;
assert(usess);
assert(stream);
assert(consumer);
assert(usess);
assert(stream);
assert(consumer);
DBG2("Sending stream %d of channel %s to kernel consumer",
stream->obj->shm_fd, uchan->name);
DBG2("Sending stream %d of channel %s to kernel consumer",
stream->obj->shm_fd, uchan->name);
/*
* Send all stream fds of UST channel to the consumer.
*/
/*
* Send all stream fds of UST channel to the consumer.
*/
-static int send_channel_streams(int sock,
+static int send_channel_streams(struct consumer_socket *sock,
struct ust_app_channel *uchan, struct ust_app_session *usess,
struct consumer_output *consumer)
{
struct ust_app_channel *uchan, struct ust_app_session *usess,
struct consumer_output *consumer)
{
const char *pathname;
struct ltt_ust_stream *stream, *tmp;
const char *pathname;
struct ltt_ust_stream *stream, *tmp;
DBG("Sending streams of channel %s to UST consumer", uchan->name);
ret = send_channel(sock, uchan);
DBG("Sending streams of channel %s to UST consumer", uchan->name);
ret = send_channel(sock, uchan);
/*
* Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM.
*/
/*
* Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM.
*/
-static int send_metadata(int sock, struct ust_app_session *usess,
- struct consumer_output *consumer)
+static int send_metadata(struct consumer_socket *sock,
+ struct ust_app_session *usess, struct consumer_output *consumer)
{
int ret, fd, fds[2];
char tmp_path[PATH_MAX];
{
int ret, fd, fds[2];
char tmp_path[PATH_MAX];
/* Safety net */
assert(usess);
assert(consumer);
/* Safety net */
assert(usess);
assert(consumer);
- if (sock < 0) {
- ERR("Consumer socket is negative (%d)", sock);
+ if (sock->fd < 0) {
+ ERR("Consumer socket is negative (%d)", sock->fd);
pthread_mutex_lock(sock->lock);
/* Sending metadata information to the consumer */
pthread_mutex_lock(sock->lock);
/* Sending metadata information to the consumer */
- ret = send_metadata(sock->fd, usess, consumer);
+ ret = send_metadata(sock, usess, consumer);
if (ret < 0) {
goto error;
}
if (ret < 0) {
goto error;
}
- ret = send_channel_streams(sock->fd, ua_chan, usess, consumer);
+ ret = send_channel_streams(sock, ua_chan, usess, consumer);
if (ret < 0) {
rcu_read_unlock();
goto error;
if (ret < 0) {
rcu_read_unlock();
goto error;
struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock)
{
int fd = -1, ret = -1;
struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock)
{
int fd = -1, ret = -1;
+ enum lttng_error_code ret_code = LTTNG_OK;
struct consumer_relayd_sock_pair *relayd;
DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx);
struct consumer_relayd_sock_pair *relayd;
DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx);
+ /* First send a status message before receiving the fds. */
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto error;
+ }
+
/* Get relayd reference if exists. */
relayd = consumer_find_relayd(net_seq_idx);
if (relayd == NULL) {
/* Get relayd reference if exists. */
relayd = consumer_find_relayd(net_seq_idx);
if (relayd == NULL) {
+ /* We have the fds without error. Send status back. */
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto error;
+ }
+
/* Copy socket information and received FD */
switch (sock_type) {
case LTTNG_STREAM_CONTROL:
/* Copy socket information and received FD */
switch (sock_type) {
case LTTNG_STREAM_CONTROL:
rcu_read_unlock();
return 1;
}
rcu_read_unlock();
return 1;
}
+
+/*
+ * Send a ret code status message to the sessiond daemon.
+ *
+ * Return the sendmsg() return value.
+ */
+int consumer_send_status_msg(int sock, int ret_code)
+{
+ struct lttcomm_consumer_status_msg msg;
+
+ msg.ret_code = ret_code;
+
+ return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
+}
void consumer_flag_relayd_for_destroy(
struct consumer_relayd_sock_pair *relayd);
int consumer_data_pending(uint64_t id);
void consumer_flag_relayd_for_destroy(
struct consumer_relayd_sock_pair *relayd);
int consumer_data_pending(uint64_t id);
+int consumer_send_status_msg(int sock, int ret_code);
#endif /* LIB_CONSUMER_H */
#endif /* LIB_CONSUMER_H */
int sock, struct pollfd *consumer_sockpoll)
{
ssize_t ret;
int sock, struct pollfd *consumer_sockpoll)
{
ssize_t ret;
+ enum lttng_error_code ret_code = LTTNG_OK;
struct lttcomm_consumer_msg msg;
ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
struct lttcomm_consumer_msg msg;
ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
return ret;
}
if (msg.cmd_type == LTTNG_CONSUMER_STOP) {
return ret;
}
if (msg.cmd_type == LTTNG_CONSUMER_STOP) {
+ /*
+ * Notify the session daemon that the command is completed.
+ *
+ * On transport layer error, the function call will print an error
+ * message so handling the returned code is a bit useless since we
+ * return an error code anyway.
+ */
+ (void) consumer_send_status_msg(sock, ret_code);
switch (msg.cmd_type) {
case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
{
switch (msg.cmd_type) {
case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
{
+ /* Session daemon status message are handled in the following call. */
ret = consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll,
&msg.u.relayd_sock.sock);
ret = consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll,
&msg.u.relayd_sock.sock);
{
struct lttng_consumer_channel *new_channel;
{
struct lttng_consumer_channel *new_channel;
+ /* First send a status message before receiving the fds. */
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto end_nosignal;
+ }
+
DBG("consumer_add_channel %d", msg.u.channel.channel_key);
new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
-1, -1,
DBG("consumer_add_channel %d", msg.u.channel.channel_key);
new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
-1, -1,
struct lttng_consumer_stream *new_stream;
int alloc_ret = 0;
struct lttng_consumer_stream *new_stream;
int alloc_ret = 0;
+ /* First send a status message before receiving the fds. */
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto end_nosignal;
+ }
+
/* block */
if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
rcu_read_unlock();
/* block */
if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
rcu_read_unlock();
+ /*
+ * Send status code to session daemon only if the recv works. If the
+ * above recv() failed, the session daemon is notified through the
+ * error socket and the teardown is eventually done.
+ */
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto end_nosignal;
+ }
+
new_stream = consumer_allocate_stream(msg.u.stream.channel_key,
msg.u.stream.stream_key,
fd, fd,
new_stream = consumer_allocate_stream(msg.u.stream.channel_key,
msg.u.stream.stream_key,
fd, fd,
relayd = consumer_find_relayd(index);
if (relayd == NULL) {
ERR("Unable to find relayd %" PRIu64, index);
relayd = consumer_find_relayd(index);
if (relayd == NULL) {
ERR("Unable to find relayd %" PRIu64, index);
+ ret_code = LTTNG_ERR_NO_CONSUMER;
*
* The destroy can happen either here or when a stream fd hangs up.
*/
*
* The destroy can happen either here or when a stream fd hangs up.
*/
- consumer_flag_relayd_for_destroy(relayd);
+ if (relayd) {
+ consumer_flag_relayd_for_destroy(relayd);
+ }
+
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto end_nosignal;
+ }
if (ret < 0) {
PERROR("send data pending ret code");
}
if (ret < 0) {
PERROR("send data pending ret code");
}
+
+ /*
+ * No need to send back a status message since the data pending
+ * returned value is the response.
+ */
+/*
+ * Status message returned to the sessiond after a received command.
+ */
+struct lttcomm_consumer_status_msg {
+ enum lttng_error_code ret_code;
+};
+
#ifdef HAVE_LIBLTTNG_UST_CTL
#include <lttng/ust-abi.h>
#ifdef HAVE_LIBLTTNG_UST_CTL
#include <lttng/ust-abi.h>
int sock, struct pollfd *consumer_sockpoll)
{
ssize_t ret;
int sock, struct pollfd *consumer_sockpoll)
{
ssize_t ret;
+ enum lttng_error_code ret_code = LTTNG_OK;
struct lttcomm_consumer_msg msg;
ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
struct lttcomm_consumer_msg msg;
ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
return ret;
}
if (msg.cmd_type == LTTNG_CONSUMER_STOP) {
return ret;
}
if (msg.cmd_type == LTTNG_CONSUMER_STOP) {
+ /*
+ * Notify the session daemon that the command is completed.
+ *
+ * On transport layer error, the function call will print an error
+ * message so handling the returned code is a bit useless since we
+ * return an error code anyway.
+ */
+ (void) consumer_send_status_msg(sock, ret_code);
switch (msg.cmd_type) {
case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
{
switch (msg.cmd_type) {
case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
{
+ /* Session daemon status message are handled in the following call. */
ret = consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll,
&msg.u.relayd_sock.sock);
ret = consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll,
&msg.u.relayd_sock.sock);
DBG("UST Consumer adding channel");
DBG("UST Consumer adding channel");
+ /* First send a status message before receiving the fds. */
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto end_nosignal;
+ }
+
/* block */
if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
rcu_read_unlock();
/* block */
if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
rcu_read_unlock();
+ /*
+ * Send status code to session daemon only if the recv works. If the
+ * above recv() failed, the session daemon is notified through the
+ * error socket and the teardown is eventually done.
+ */
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto end_nosignal;
+ }
+
DBG("consumer_add_channel %d", msg.u.channel.channel_key);
new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
DBG("consumer_add_channel %d", msg.u.channel.channel_key);
new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
DBG("UST Consumer adding stream");
DBG("UST Consumer adding stream");
+ /* First send a status message before receiving the fds. */
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto end_nosignal;
+ }
+
/* block */
if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
rcu_read_unlock();
/* block */
if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
rcu_read_unlock();
+ /*
+ * Send status code to session daemon only if the recv works. If the
+ * above recv() failed, the session daemon is notified through the
+ * error socket and the teardown is eventually done.
+ */
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto end_nosignal;
+ }
+
DBG("Consumer command ADD_STREAM chan %d stream %d",
msg.u.stream.channel_key, msg.u.stream.stream_key);
DBG("Consumer command ADD_STREAM chan %d stream %d",
msg.u.stream.channel_key, msg.u.stream.stream_key);
relayd = consumer_find_relayd(index);
if (relayd == NULL) {
ERR("Unable to find relayd %" PRIu64, index);
relayd = consumer_find_relayd(index);
if (relayd == NULL) {
ERR("Unable to find relayd %" PRIu64, index);
+ ret_code = LTTNG_ERR_NO_CONSUMER;
*
* The destroy can happen either here or when a stream fd hangs up.
*/
*
* The destroy can happen either here or when a stream fd hangs up.
*/
- consumer_flag_relayd_for_destroy(relayd);
+ if (relayd) {
+ consumer_flag_relayd_for_destroy(relayd);
+ }
+
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto end_nosignal;
+ }
if (ret < 0) {
PERROR("send data pending ret code");
}
if (ret < 0) {
PERROR("send data pending ret code");
}
+
+ /*
+ * No need to send back a status message since the data pending
+ * returned value is the response.
+ */