#include <common/common.h>
#include <common/utils.h>
#include <common/compat/poll.h>
+#include <common/compat/endian.h>
#include <common/index/index.h>
#include <common/kernel-ctl/kernel-ctl.h>
#include <common/sessiond-comm/relayd.h>
return channel;
}
+/*
+ * There is a possibility that the consumer does not have enough time between
+ * the close of the channel on the session daemon and the cleanup in here thus
+ * once we have a channel add with an existing key, we know for sure that this
+ * channel will eventually get cleaned up by all streams being closed.
+ *
+ * This function just nullifies the already existing channel key.
+ */
+static void steal_channel_key(uint64_t key)
+{
+ struct lttng_consumer_channel *channel;
+
+ rcu_read_lock();
+ channel = consumer_find_channel(key);
+ if (channel) {
+ channel->key = (uint64_t) -1ULL;
+ /*
+ * We don't want the lookup to match, but we still need to iterate on
+ * this channel when iterating over the hash table. Just change the
+ * node key.
+ */
+ channel->node.key = (uint64_t) -1ULL;
+ }
+ rcu_read_unlock();
+}
+
static void free_channel_rcu(struct rcu_head *head)
{
struct lttng_ht_node_u64 *node =
/*
* Add a channel to the global list protected by a mutex.
*
- * On success 0 is returned else a negative value.
+ * Always return 0 indicating success.
*/
int consumer_add_channel(struct lttng_consumer_channel *channel,
struct lttng_consumer_local_data *ctx)
{
- int ret = 0;
- struct lttng_ht_node_u64 *node;
- struct lttng_ht_iter iter;
-
pthread_mutex_lock(&consumer_data.lock);
pthread_mutex_lock(&channel->lock);
pthread_mutex_lock(&channel->timer_lock);
- rcu_read_lock();
- lttng_ht_lookup(consumer_data.channel_ht, &channel->key, &iter);
- node = lttng_ht_iter_get_node_u64(&iter);
- if (node != NULL) {
- /* Channel already exist. Ignore the insertion */
- ERR("Consumer add channel key %" PRIu64 " already exists!",
- channel->key);
- ret = -EEXIST;
- goto end;
- }
+ /*
+ * This gives us a guarantee that the channel we are about to add to the
+ * channel hash table will be unique. See this function comment on the why
+ * we need to steel the channel key at this stage.
+ */
+ steal_channel_key(channel->key);
+ rcu_read_lock();
lttng_ht_add_unique_u64(consumer_data.channel_ht, &channel->node);
-
-end:
rcu_read_unlock();
+
pthread_mutex_unlock(&channel->timer_lock);
pthread_mutex_unlock(&channel->lock);
pthread_mutex_unlock(&consumer_data.lock);
- if (!ret && channel->wait_fd != -1 &&
- channel->type == CONSUMER_CHANNEL_TYPE_DATA) {
+ if (channel->wait_fd != -1 && channel->type == CONSUMER_CHANNEL_TYPE_DATA) {
notify_channel_pipe(ctx, channel, -1, CONSUMER_CHANNEL_ADD);
}
- return ret;
+
+ return 0;
}
/*
*/
(*pollfd)[i].fd = lttng_pipe_get_readfd(ctx->consumer_data_pipe);
(*pollfd)[i].events = POLLIN | POLLPRI;
+
+ (*pollfd)[i + 1].fd = lttng_pipe_get_readfd(ctx->consumer_wakeup_pipe);
+ (*pollfd)[i + 1].events = POLLIN | POLLPRI;
return i;
}
/*
- * Poll on the should_quit pipe and the command socket return -1 on error and
- * should exit, 0 if data is available on the command socket
+ * Poll on the should_quit pipe and the command socket return -1 on
+ * error, 1 if should exit, 0 if data is available on the command socket
*/
int lttng_consumer_poll_socket(struct pollfd *consumer_sockpoll)
{
goto restart;
}
PERROR("Poll error");
- goto exit;
+ return -1;
}
if (consumer_sockpoll[0].revents & (POLLIN | POLLPRI)) {
DBG("consumer_should_quit wake up");
- goto exit;
+ return 1;
}
return 0;
-
-exit:
- return -1;
}
/*
goto error_poll_pipe;
}
+ ctx->consumer_wakeup_pipe = lttng_pipe_open(0);
+ if (!ctx->consumer_wakeup_pipe) {
+ goto error_wakeup_pipe;
+ }
+
ret = pipe(ctx->consumer_should_quit);
if (ret < 0) {
PERROR("Error creating recv pipe");
error_thread_pipe:
utils_close_pipe(ctx->consumer_should_quit);
error_quit_pipe:
+ lttng_pipe_destroy(ctx->consumer_wakeup_pipe);
+error_wakeup_pipe:
lttng_pipe_destroy(ctx->consumer_data_pipe);
error_poll_pipe:
free(ctx);
utils_close_pipe(ctx->consumer_channel_pipe);
lttng_pipe_destroy(ctx->consumer_data_pipe);
lttng_pipe_destroy(ctx->consumer_metadata_pipe);
+ lttng_pipe_destroy(ctx->consumer_wakeup_pipe);
utils_close_pipe(ctx->consumer_should_quit);
utils_close_pipe(ctx->consumer_splice_metadata_pipe);
if (stream->net_seq_idx != (uint64_t) -1ULL) {
relayd = consumer_find_relayd(stream->net_seq_idx);
if (relayd == NULL) {
- ret = -EPIPE;
+ written = -ret;
goto end;
}
}
/* Write metadata stream id before payload */
if (relayd) {
- int total_len = len;
+ unsigned long total_len = len;
if (stream->metadata_flag) {
/*
padding);
if (ret < 0) {
written = ret;
- /* Socket operation failed. We consider the relayd dead */
- if (ret == -EBADF) {
- WARN("Remote relayd disconnected. Stopping");
- relayd_hang_up = 1;
- goto write_error;
- }
- goto end;
+ relayd_hang_up = 1;
+ goto write_error;
}
total_len += sizeof(struct lttcomm_relayd_metadata_payload);
}
ret = write_relayd_stream_header(stream, total_len, padding, relayd);
- if (ret >= 0) {
- /* Use the returned socket. */
- outfd = ret;
- } else {
- /* Socket operation failed. We consider the relayd dead */
- if (ret == -EBADF) {
- WARN("Remote relayd disconnected. Stopping");
- relayd_hang_up = 1;
- goto write_error;
- }
- goto end;
+ if (ret < 0) {
+ written = ret;
+ relayd_hang_up = 1;
+ goto write_error;
}
+ /* Use the returned socket. */
+ outfd = ret;
} else {
/* No streaming, we have to set the len with the full padding */
len += padding;
stream->out_fd, &(stream->tracefile_count_current),
&stream->out_fd);
if (ret < 0) {
+ written = ret;
ERR("Rotating output file");
goto end;
}
stream->chan->tracefile_size,
stream->tracefile_count_current);
if (ret < 0) {
+ written = ret;
goto end;
}
stream->index_fd = ret;
DBG("splice chan to pipe, ret %zd", ret_splice);
if (ret_splice < 0) {
ret = errno;
- if (written == 0) {
- written = ret_splice;
- }
+ written = -ret;
PERROR("Error in relay splice");
goto splice_error;
}
/* Handle stream on the relayd if the output is on the network */
- if (relayd) {
- if (stream->metadata_flag) {
- size_t metadata_payload_size =
- sizeof(struct lttcomm_relayd_metadata_payload);
+ if (relayd && stream->metadata_flag) {
+ size_t metadata_payload_size =
+ sizeof(struct lttcomm_relayd_metadata_payload);
- /* Update counter to fit the spliced data */
- ret_splice += metadata_payload_size;
- len += metadata_payload_size;
- /*
- * We do this so the return value can match the len passed as
- * argument to this function.
- */
- written -= metadata_payload_size;
- }
+ /* Update counter to fit the spliced data */
+ ret_splice += metadata_payload_size;
+ len += metadata_payload_size;
+ /*
+ * We do this so the return value can match the len passed as
+ * argument to this function.
+ */
+ written -= metadata_payload_size;
}
/* Splice data out */
DBG("Consumer splice pipe to file, ret %zd", ret_splice);
if (ret_splice < 0) {
ret = errno;
- if (written == 0) {
- written = ret_splice;
- }
- /* Socket operation failed. We consider the relayd dead */
- if (errno == EBADF || errno == EPIPE || errno == ESPIPE) {
- WARN("Remote relayd disconnected. Stopping");
- relayd_hang_up = 1;
- goto write_error;
- }
- PERROR("Error in file splice");
- goto splice_error;
+ written = -ret;
+ relayd_hang_up = 1;
+ goto write_error;
} else if (ret_splice > len) {
/*
* We don't expect this code path to be executed but you never know
* so this is an extra protection agains a buggy splice().
*/
- written += ret_splice;
ret = errno;
+ written += ret_splice;
PERROR("Wrote more data than requested %zd (len: %lu)", ret_splice,
len);
goto splice_error;
free(local_stream);
local_stream = NULL;
- /* allocate for all fds + 1 for the consumer_data_pipe */
- pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd));
+ /*
+ * Allocate for all fds +1 for the consumer_data_pipe and +1 for
+ * wake up pipe.
+ */
+ pollfd = zmalloc((consumer_data.stream_count + 2) * sizeof(struct pollfd));
if (pollfd == NULL) {
PERROR("pollfd malloc");
pthread_mutex_unlock(&consumer_data.lock);
goto end;
}
- /* allocate for all fds + 1 for the consumer_data_pipe */
- local_stream = zmalloc((consumer_data.stream_count + 1) *
+ local_stream = zmalloc((consumer_data.stream_count + 2) *
sizeof(struct lttng_consumer_stream *));
if (local_stream == NULL) {
PERROR("local_stream malloc");
}
/* poll on the array of fds */
restart:
- DBG("polling on %d fd", nb_fd + 1);
+ DBG("polling on %d fd", nb_fd + 2);
health_poll_entry();
- num_rdy = poll(pollfd, nb_fd + 1, -1);
+ num_rdy = poll(pollfd, nb_fd + 2, -1);
health_poll_exit();
DBG("poll num_rdy : %d", num_rdy);
if (num_rdy == -1) {
continue;
}
+ /* Handle wakeup pipe. */
+ if (pollfd[nb_fd + 1].revents & (POLLIN | POLLPRI)) {
+ char dummy;
+ ssize_t pipe_readlen;
+
+ pipe_readlen = lttng_pipe_read(ctx->consumer_wakeup_pipe, &dummy,
+ sizeof(dummy));
+ if (pipe_readlen < 0) {
+ PERROR("Consumer data wakeup pipe");
+ }
+ /* We've been awakened to handle stream(s). */
+ ctx->has_wakeup = 0;
+ }
+
/* Take care of high priority channels first. */
for (i = 0; i < nb_fd; i++) {
health_code_update();
continue;
}
if ((pollfd[i].revents & POLLIN) ||
- local_stream[i]->hangup_flush_done) {
+ local_stream[i]->hangup_flush_done ||
+ local_stream[i]->has_data) {
DBG("Normal read on fd %d", pollfd[i].fd);
len = ctx->on_buffer_ready(local_stream[i], ctx);
/* it's ok to have an unavailable sub-buffer */
assert(ctx);
assert(sockpoll);
- if (lttng_consumer_poll_socket(sockpoll) < 0) {
- ret = -1;
+ ret = lttng_consumer_poll_socket(sockpoll);
+ if (ret) {
goto error;
}
DBG("Metadata connection on client_socket");
consumer_sockpoll[1].fd = client_socket;
consumer_sockpoll[1].events = POLLIN | POLLPRI;
- if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+ ret = lttng_consumer_poll_socket(consumer_sockpoll);
+ if (ret) {
+ if (ret > 0) {
+ /* should exit */
+ err = 0;
+ }
goto end;
}
DBG("Connection on client_socket");
* command unix socket.
*/
ret = set_metadata_socket(ctx, consumer_sockpoll, client_socket);
- if (ret < 0) {
+ if (ret) {
+ if (ret > 0) {
+ /* should exit */
+ err = 0;
+ }
goto end;
}
health_poll_entry();
ret = lttng_consumer_poll_socket(consumer_sockpoll);
health_poll_exit();
- if (ret < 0) {
+ if (ret) {
+ if (ret > 0) {
+ /* should exit */
+ err = 0;
+ }
goto end;
}
DBG("Incoming command on sock");
ret = lttng_consumer_recv_cmd(ctx, sock, consumer_sockpoll);
- if (ret == -ENOENT) {
- DBG("Received STOP command");
- goto end;
- }
if (ret <= 0) {
/*
* This could simply be a session daemon quitting. Don't output
}
/* Poll on consumer socket. */
- if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+ ret = lttng_consumer_poll_socket(consumer_sockpoll);
+ if (ret) {
+ /* Needing to exit in the middle of a command: error. */
lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
ret = -EINTR;
goto error_nosignal;
{
struct lttcomm_consumer_status_msg msg;
+ memset(&msg, 0, sizeof(msg));
msg.ret_code = ret_code;
return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
assert(sock >= 0);
+ memset(&msg, 0, sizeof(msg));
if (!channel) {
msg.ret_code = LTTCOMM_CONSUMERD_CHANNEL_FAIL;
} else {