Cleanup: Remove unused label
[lttng-tools.git] / src / common / consumer.c
index 2f20ffb5d1d97fb585803d1721bdfeb7e494ffb9..b0b926bb01a1f65c58c9d25da47bc980b4125e83 100644 (file)
@@ -18,6 +18,7 @@
  */
 
 #define _GNU_SOURCE
+#define _LGPL_SOURCE
 #include <assert.h>
 #include <poll.h>
 #include <pthread.h>
 #include <inttypes.h>
 #include <signal.h>
 
+#include <bin/lttng-consumerd/health-consumerd.h>
 #include <common/common.h>
 #include <common/utils.h>
 #include <common/compat/poll.h>
+#include <common/compat/endian.h>
 #include <common/index/index.h>
 #include <common/kernel-ctl/kernel-ctl.h>
 #include <common/sessiond-comm/relayd.h>
@@ -44,7 +47,7 @@
 
 #include "consumer.h"
 #include "consumer-stream.h"
-#include "../bin/lttng-consumerd/health-consumerd.h"
+#include "consumer-testpoint.h"
 
 struct lttng_consumer_global_data consumer_data = {
        .stream_count = 0,
@@ -94,22 +97,33 @@ static void notify_thread_lttng_pipe(struct lttng_pipe *pipe)
        (void) lttng_pipe_write(pipe, &null_stream, sizeof(null_stream));
 }
 
+static void notify_health_quit_pipe(int *pipe)
+{
+       ssize_t ret;
+
+       ret = lttng_write(pipe[1], "4", 1);
+       if (ret < 1) {
+               PERROR("write consumer health quit");
+       }
+}
+
 static void notify_channel_pipe(struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_channel *chan,
                uint64_t key,
                enum consumer_channel_action action)
 {
        struct consumer_channel_msg msg;
-       int ret;
+       ssize_t ret;
 
        memset(&msg, 0, sizeof(msg));
 
        msg.action = action;
        msg.chan = chan;
        msg.key = key;
-       do {
-               ret = write(ctx->consumer_channel_pipe[1], &msg, sizeof(msg));
-       } while (ret < 0 && errno == EINTR);
+       ret = lttng_write(ctx->consumer_channel_pipe[1], &msg, sizeof(msg));
+       if (ret < sizeof(msg)) {
+               PERROR("notify_channel_pipe write error");
+       }
 }
 
 void notify_thread_del_channel(struct lttng_consumer_local_data *ctx,
@@ -124,17 +138,43 @@ static int read_channel_pipe(struct lttng_consumer_local_data *ctx,
                enum consumer_channel_action *action)
 {
        struct consumer_channel_msg msg;
-       int ret;
+       ssize_t ret;
 
-       do {
-               ret = read(ctx->consumer_channel_pipe[0], &msg, sizeof(msg));
-       } while (ret < 0 && errno == EINTR);
-       if (ret > 0) {
-               *action = msg.action;
-               *chan = msg.chan;
-               *key = msg.key;
+       ret = lttng_read(ctx->consumer_channel_pipe[0], &msg, sizeof(msg));
+       if (ret < sizeof(msg)) {
+               ret = -1;
+               goto error;
+       }
+       *action = msg.action;
+       *chan = msg.chan;
+       *key = msg.key;
+error:
+       return (int) ret;
+}
+
+/*
+ * Cleanup the stream list of a channel. Those streams are not yet globally
+ * visible
+ */
+static void clean_channel_stream_list(struct lttng_consumer_channel *channel)
+{
+       struct lttng_consumer_stream *stream, *stmp;
+
+       assert(channel);
+
+       /* Delete streams that might have been left in the stream list. */
+       cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
+                       send_node) {
+               cds_list_del(&stream->send_node);
+               /*
+                * Once a stream is added to this list, the buffers were created so we
+                * have a guarantee that this call will succeed. Setting the monitor
+                * mode to 0 so we don't lock nor try to delete the stream from the
+                * global hash table.
+                */
+               stream->monitor = 0;
+               consumer_stream_destroy(stream, NULL);
        }
-       return ret;
 }
 
 /*
@@ -212,14 +252,30 @@ struct lttng_consumer_channel *consumer_find_channel(uint64_t key)
        return channel;
 }
 
-static void free_stream_rcu(struct rcu_head *head)
+/*
+ * There is a possibility that the consumer does not have enough time between
+ * the close of the channel on the session daemon and the cleanup in here thus
+ * once we have a channel add with an existing key, we know for sure that this
+ * channel will eventually get cleaned up by all streams being closed.
+ *
+ * This function just nullifies the already existing channel key.
+ */
+static void steal_channel_key(uint64_t key)
 {
-       struct lttng_ht_node_u64 *node =
-               caa_container_of(head, struct lttng_ht_node_u64, head);
-       struct lttng_consumer_stream *stream =
-               caa_container_of(node, struct lttng_consumer_stream, node);
+       struct lttng_consumer_channel *channel;
 
-       free(stream);
+       rcu_read_lock();
+       channel = consumer_find_channel(key);
+       if (channel) {
+               channel->key = (uint64_t) -1ULL;
+               /*
+                * We don't want the lookup to match, but we still need to iterate on
+                * this channel when iterating over the hash table. Just change the
+                * node key.
+                */
+               channel->node.key = (uint64_t) -1ULL;
+       }
+       rcu_read_unlock();
 }
 
 static void free_channel_rcu(struct rcu_head *head)
@@ -289,23 +345,14 @@ void consumer_del_channel(struct lttng_consumer_channel *channel)
 {
        int ret;
        struct lttng_ht_iter iter;
-       struct lttng_consumer_stream *stream, *stmp;
 
        DBG("Consumer delete channel key %" PRIu64, channel->key);
 
        pthread_mutex_lock(&consumer_data.lock);
        pthread_mutex_lock(&channel->lock);
 
-       /* Delete streams that might have been left in the stream list. */
-       cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
-                       send_node) {
-               cds_list_del(&stream->send_node);
-               /*
-                * Once a stream is added to this list, the buffers were created so
-                * we have a guarantee that this call will succeed.
-                */
-               consumer_stream_destroy(stream, NULL);
-       }
+       /* Destroy streams that might have been left in the stream list. */
+       clean_channel_stream_list(channel);
 
        if (channel->live_timer_enabled == 1) {
                consumer_timer_live_stop(channel);
@@ -756,6 +803,44 @@ end:
        return ret;
 }
 
+/*
+ * Find a relayd and send the streams sent message
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int consumer_send_relayd_streams_sent(uint64_t net_seq_idx)
+{
+       int ret = 0;
+       struct consumer_relayd_sock_pair *relayd;
+
+       assert(net_seq_idx != -1ULL);
+
+       /* The stream is not metadata. Get relayd reference if exists. */
+       rcu_read_lock();
+       relayd = consumer_find_relayd(net_seq_idx);
+       if (relayd != NULL) {
+               /* Add stream on the relayd */
+               pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+               ret = relayd_streams_sent(&relayd->control_sock);
+               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+               if (ret < 0) {
+                       goto end;
+               }
+       } else {
+               ERR("Relayd ID %" PRIu64 " unknown. Can't send streams_sent.",
+                               net_seq_idx);
+               ret = -1;
+               goto end;
+       }
+
+       ret = 0;
+       DBG("All streams sent relayd id %" PRIu64, net_seq_idx);
+
+end:
+       rcu_read_unlock();
+       return ret;
+}
+
 /*
  * Find a relayd and close the stream
  */
@@ -867,7 +952,6 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
        channel->uid = uid;
        channel->gid = gid;
        channel->relayd_id = relayd_id;
-       channel->output = output;
        channel->tracefile_size = tracefile_size;
        channel->tracefile_count = tracefile_count;
        channel->monitor = monitor;
@@ -875,6 +959,20 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
        pthread_mutex_init(&channel->lock, NULL);
        pthread_mutex_init(&channel->timer_lock, NULL);
 
+       switch (output) {
+       case LTTNG_EVENT_SPLICE:
+               channel->output = CONSUMER_CHANNEL_SPLICE;
+               break;
+       case LTTNG_EVENT_MMAP:
+               channel->output = CONSUMER_CHANNEL_MMAP;
+               break;
+       default:
+               assert(0);
+               free(channel);
+               channel = NULL;
+               goto end;
+       }
+
        /*
         * In monitor mode, the streams associated with the channel will be put in
         * a special list ONLY owned by this channel. So, the refcount is set to 1
@@ -909,43 +1007,35 @@ end:
 /*
  * Add a channel to the global list protected by a mutex.
  *
- * On success 0 is returned else a negative value.
+ * Always return 0 indicating success.
  */
 int consumer_add_channel(struct lttng_consumer_channel *channel,
                struct lttng_consumer_local_data *ctx)
 {
-       int ret = 0;
-       struct lttng_ht_node_u64 *node;
-       struct lttng_ht_iter iter;
-
        pthread_mutex_lock(&consumer_data.lock);
        pthread_mutex_lock(&channel->lock);
        pthread_mutex_lock(&channel->timer_lock);
-       rcu_read_lock();
 
-       lttng_ht_lookup(consumer_data.channel_ht, &channel->key, &iter);
-       node = lttng_ht_iter_get_node_u64(&iter);
-       if (node != NULL) {
-               /* Channel already exist. Ignore the insertion */
-               ERR("Consumer add channel key %" PRIu64 " already exists!",
-                       channel->key);
-               ret = -EEXIST;
-               goto end;
-       }
+       /*
+        * This gives us a guarantee that the channel we are about to add to the
+        * channel hash table will be unique. See this function comment on the why
+        * we need to steel the channel key at this stage.
+        */
+       steal_channel_key(channel->key);
 
+       rcu_read_lock();
        lttng_ht_add_unique_u64(consumer_data.channel_ht, &channel->node);
-
-end:
        rcu_read_unlock();
+
        pthread_mutex_unlock(&channel->timer_lock);
        pthread_mutex_unlock(&channel->lock);
        pthread_mutex_unlock(&consumer_data.lock);
 
-       if (!ret && channel->wait_fd != -1 &&
-                       channel->type == CONSUMER_CHANNEL_TYPE_DATA) {
+       if (channel->wait_fd != -1 && channel->type == CONSUMER_CHANNEL_TYPE_DATA) {
                notify_channel_pipe(ctx, channel, -1, CONSUMER_CHANNEL_ADD);
        }
-       return ret;
+
+       return 0;
 }
 
 /*
@@ -1003,12 +1093,15 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx,
         */
        (*pollfd)[i].fd = lttng_pipe_get_readfd(ctx->consumer_data_pipe);
        (*pollfd)[i].events = POLLIN | POLLPRI;
+
+       (*pollfd)[i + 1].fd = lttng_pipe_get_readfd(ctx->consumer_wakeup_pipe);
+       (*pollfd)[i + 1].events = POLLIN | POLLPRI;
        return i;
 }
 
 /*
- * Poll on the should_quit pipe and the command socket return -1 on error and
- * should exit, 0 if data is available on the command socket
+ * Poll on the should_quit pipe and the command socket return -1 on
+ * error, 1 if should exit, 0 if data is available on the command socket
  */
 int lttng_consumer_poll_socket(struct pollfd *consumer_sockpoll)
 {
@@ -1024,16 +1117,13 @@ restart:
                        goto restart;
                }
                PERROR("Poll error");
-               goto exit;
+               return -1;
        }
        if (consumer_sockpoll[0].revents & (POLLIN | POLLPRI)) {
                DBG("consumer_should_quit wake up");
-               goto exit;
+               return 1;
        }
        return 0;
-
-exit:
-       return -1;
 }
 
 /*
@@ -1105,12 +1195,11 @@ void lttng_consumer_cleanup(void)
  */
 void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx)
 {
-       int ret;
+       ssize_t ret;
+
        consumer_quit = 1;
-       do {
-               ret = write(ctx->consumer_should_quit[1], "4", 1);
-       } while (ret < 0 && errno == EINTR);
-       if (ret < 0 || ret != 1) {
+       ret = lttng_write(ctx->consumer_should_quit[1], "4", 1);
+       if (ret < 1) {
                PERROR("write consumer quit");
        }
 
@@ -1203,18 +1292,17 @@ struct lttng_consumer_local_data *lttng_consumer_create(
                goto error_poll_pipe;
        }
 
+       ctx->consumer_wakeup_pipe = lttng_pipe_open(0);
+       if (!ctx->consumer_wakeup_pipe) {
+               goto error_wakeup_pipe;
+       }
+
        ret = pipe(ctx->consumer_should_quit);
        if (ret < 0) {
                PERROR("Error creating recv pipe");
                goto error_quit_pipe;
        }
 
-       ret = pipe(ctx->consumer_thread_pipe);
-       if (ret < 0) {
-               PERROR("Error creating thread pipe");
-               goto error_thread_pipe;
-       }
-
        ret = pipe(ctx->consumer_channel_pipe);
        if (ret < 0) {
                PERROR("Error creating channel pipe");
@@ -1226,22 +1314,15 @@ struct lttng_consumer_local_data *lttng_consumer_create(
                goto error_metadata_pipe;
        }
 
-       ret = utils_create_pipe(ctx->consumer_splice_metadata_pipe);
-       if (ret < 0) {
-               goto error_splice_pipe;
-       }
-
        return ctx;
 
-error_splice_pipe:
-       lttng_pipe_destroy(ctx->consumer_metadata_pipe);
 error_metadata_pipe:
        utils_close_pipe(ctx->consumer_channel_pipe);
 error_channel_pipe:
-       utils_close_pipe(ctx->consumer_thread_pipe);
-error_thread_pipe:
        utils_close_pipe(ctx->consumer_should_quit);
 error_quit_pipe:
+       lttng_pipe_destroy(ctx->consumer_wakeup_pipe);
+error_wakeup_pipe:
        lttng_pipe_destroy(ctx->consumer_data_pipe);
 error_poll_pipe:
        free(ctx);
@@ -1249,6 +1330,57 @@ error:
        return NULL;
 }
 
+/*
+ * Iterate over all streams of the hashtable and free them properly.
+ */
+static void destroy_data_stream_ht(struct lttng_ht *ht)
+{
+       struct lttng_ht_iter iter;
+       struct lttng_consumer_stream *stream;
+
+       if (ht == NULL) {
+               return;
+       }
+
+       rcu_read_lock();
+       cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
+               /*
+                * Ignore return value since we are currently cleaning up so any error
+                * can't be handled.
+                */
+               (void) consumer_del_stream(stream, ht);
+       }
+       rcu_read_unlock();
+
+       lttng_ht_destroy(ht);
+}
+
+/*
+ * Iterate over all streams of the metadata hashtable and free them
+ * properly.
+ */
+static void destroy_metadata_stream_ht(struct lttng_ht *ht)
+{
+       struct lttng_ht_iter iter;
+       struct lttng_consumer_stream *stream;
+
+       if (ht == NULL) {
+               return;
+       }
+
+       rcu_read_lock();
+       cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
+               /*
+                * Ignore return value since we are currently cleaning up so any error
+                * can't be handled.
+                */
+               (void) consumer_del_metadata_stream(stream, ht);
+       }
+       rcu_read_unlock();
+
+       lttng_ht_destroy(ht);
+}
+
 /*
  * Close all fds associated with the instance and free the context.
  */
@@ -1258,6 +1390,13 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
 
        DBG("Consumer destroying it. Closing everything.");
 
+       if (!ctx) {
+               return;
+       }
+
+       destroy_data_stream_ht(data_ht);
+       destroy_metadata_stream_ht(metadata_ht);
+
        ret = close(ctx->consumer_error_socket);
        if (ret) {
                PERROR("close");
@@ -1266,12 +1405,11 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
        if (ret) {
                PERROR("close");
        }
-       utils_close_pipe(ctx->consumer_thread_pipe);
        utils_close_pipe(ctx->consumer_channel_pipe);
        lttng_pipe_destroy(ctx->consumer_data_pipe);
        lttng_pipe_destroy(ctx->consumer_metadata_pipe);
+       lttng_pipe_destroy(ctx->consumer_wakeup_pipe);
        utils_close_pipe(ctx->consumer_should_quit);
-       utils_close_pipe(ctx->consumer_splice_metadata_pipe);
 
        unlink(ctx->consumer_command_sock_path);
        free(ctx);
@@ -1284,17 +1422,15 @@ static int write_relayd_metadata_id(int fd,
                struct lttng_consumer_stream *stream,
                struct consumer_relayd_sock_pair *relayd, unsigned long padding)
 {
-       int ret;
+       ssize_t ret;
        struct lttcomm_relayd_metadata_payload hdr;
 
        hdr.stream_id = htobe64(stream->relayd_stream_id);
        hdr.padding_size = htobe32(padding);
-       do {
-               ret = write(fd, (void *) &hdr, sizeof(hdr));
-       } while (ret < 0 && errno == EINTR);
-       if (ret < 0 || ret != sizeof(hdr)) {
+       ret = lttng_write(fd, (void *) &hdr, sizeof(hdr));
+       if (ret < sizeof(hdr)) {
                /*
-                * This error means that the fd's end is closed so ignore the perror
+                * This error means that the fd's end is closed so ignore the PERROR
                 * not to clubber the error output since this can happen in a normal
                 * code path.
                 */
@@ -1314,7 +1450,7 @@ static int write_relayd_metadata_id(int fd,
                        stream->relayd_stream_id, padding);
 
 end:
-       return ret;
+       return (int) ret;
 }
 
 /*
@@ -1332,11 +1468,11 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_stream *stream, unsigned long len,
                unsigned long padding,
-               struct lttng_packet_index *index)
+               struct ctf_packet_index *index)
 {
        unsigned long mmap_offset;
        void *mmap_base;
-       ssize_t ret = 0, written = 0;
+       ssize_t ret = 0;
        off_t orig_offset = stream->out_fd_offset;
        /* Default is on the disk */
        int outfd = stream->out_fd;
@@ -1360,9 +1496,9 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
        case LTTNG_CONSUMER_KERNEL:
                mmap_base = stream->mmap_base;
                ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset);
-               if (ret != 0) {
+               if (ret < 0) {
+                       ret = -errno;
                        PERROR("tracer ctl get_mmap_read_offset");
-                       written = -errno;
                        goto end;
                }
                break;
@@ -1371,13 +1507,13 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                mmap_base = lttng_ustctl_get_mmap_base(stream);
                if (!mmap_base) {
                        ERR("read mmap get mmap base for stream %s", stream->name);
-                       written = -EPERM;
+                       ret = -EPERM;
                        goto end;
                }
                ret = lttng_ustctl_get_mmap_read_offset(stream, &mmap_offset);
                if (ret != 0) {
                        PERROR("tracer ctl get_mmap_read_offset");
-                       written = ret;
+                       ret = -EINVAL;
                        goto end;
                }
                break;
@@ -1401,30 +1537,20 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                }
 
                ret = write_relayd_stream_header(stream, netlen, padding, relayd);
-               if (ret >= 0) {
-                       /* Use the returned socket. */
-                       outfd = ret;
+               if (ret < 0) {
+                       relayd_hang_up = 1;
+                       goto write_error;
+               }
+               /* Use the returned socket. */
+               outfd = ret;
 
-                       /* Write metadata stream id before payload */
-                       if (stream->metadata_flag) {
-                               ret = write_relayd_metadata_id(outfd, stream, relayd, padding);
-                               if (ret < 0) {
-                                       written = ret;
-                                       /* Socket operation failed. We consider the relayd dead */
-                                       if (ret == -EPIPE || ret == -EINVAL) {
-                                               relayd_hang_up = 1;
-                                               goto write_error;
-                                       }
-                                       goto end;
-                               }
-                       }
-               } else {
-                       /* Socket operation failed. We consider the relayd dead */
-                       if (ret == -EPIPE || ret == -EINVAL) {
+               /* Write metadata stream id before payload */
+               if (stream->metadata_flag) {
+                       ret = write_relayd_metadata_id(outfd, stream, relayd, padding);
+                       if (ret < 0) {
                                relayd_hang_up = 1;
                                goto write_error;
                        }
-                       /* Else, use the default set before which is the filesystem. */
                }
        } else {
                /* No streaming, we have to set the len with the full padding */
@@ -1469,46 +1595,45 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                }
        }
 
-       while (len > 0) {
-               do {
-                       ret = write(outfd, mmap_base + mmap_offset, len);
-               } while (ret < 0 && errno == EINTR);
-               DBG("Consumer mmap write() ret %zd (len %lu)", ret, len);
+       /*
+        * This call guarantee that len or less is returned. It's impossible to
+        * receive a ret value that is bigger than len.
+        */
+       ret = lttng_write(outfd, mmap_base + mmap_offset, len);
+       DBG("Consumer mmap write() ret %zd (len %lu)", ret, len);
+       if (ret < 0 || ((size_t) ret != len)) {
+               /*
+                * Report error to caller if nothing was written else at least send the
+                * amount written.
+                */
                if (ret < 0) {
+                       ret = -errno;
+               }
+               relayd_hang_up = 1;
+
+               /* Socket operation failed. We consider the relayd dead */
+               if (errno == EPIPE || errno == EINVAL || errno == EBADF) {
                        /*
-                        * This is possible if the fd is closed on the other side (outfd)
-                        * or any write problem. It can be verbose a bit for a normal
-                        * execution if for instance the relayd is stopped abruptly. This
-                        * can happen so set this to a DBG statement.
+                        * This is possible if the fd is closed on the other side
+                        * (outfd) or any write problem. It can be verbose a bit for a
+                        * normal execution if for instance the relayd is stopped
+                        * abruptly. This can happen so set this to a DBG statement.
                         */
-                       DBG("Error in file write mmap");
-                       if (written == 0) {
-                               written = -errno;
-                       }
-                       /* Socket operation failed. We consider the relayd dead */
-                       if (errno == EPIPE || errno == EINVAL) {
-                               relayd_hang_up = 1;
-                               goto write_error;
-                       }
-                       goto end;
-               } else if (ret > len) {
-                       PERROR("Error in file write (ret %zd > len %lu)", ret, len);
-                       written += ret;
-                       goto end;
+                       DBG("Consumer mmap write detected relayd hang up");
                } else {
-                       len -= ret;
-                       mmap_offset += ret;
+                       /* Unhandled error, print it and stop function right now. */
+                       PERROR("Error in write mmap (ret %zd != len %lu)", ret, len);
                }
+               goto write_error;
+       }
+       stream->output_written += ret;
 
-               /* This call is useless on a socket so better save a syscall. */
-               if (!relayd) {
-                       /* This won't block, but will start writeout asynchronously */
-                       lttng_sync_file_range(outfd, stream->out_fd_offset, ret,
-                                       SYNC_FILE_RANGE_WRITE);
-                       stream->out_fd_offset += ret;
-               }
-               stream->output_written += ret;
-               written += ret;
+       /* This call is useless on a socket so better save a syscall. */
+       if (!relayd) {
+               /* This won't block, but will start writeout asynchronously */
+               lttng_sync_file_range(outfd, stream->out_fd_offset, len,
+                               SYNC_FILE_RANGE_WRITE);
+               stream->out_fd_offset += len;
        }
        lttng_consumer_sync_trace_file(stream, orig_offset);
 
@@ -1528,7 +1653,7 @@ end:
        }
 
        rcu_read_unlock();
-       return written;
+       return ret;
 }
 
 /*
@@ -1542,7 +1667,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_stream *stream, unsigned long len,
                unsigned long padding,
-               struct lttng_packet_index *index)
+               struct ctf_packet_index *index)
 {
        ssize_t ret = 0, written = 0, ret_splice = 0;
        loff_t offset = 0;
@@ -1573,25 +1698,15 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
        if (stream->net_seq_idx != (uint64_t) -1ULL) {
                relayd = consumer_find_relayd(stream->net_seq_idx);
                if (relayd == NULL) {
-                       ret = -EPIPE;
+                       written = -ret;
                        goto end;
                }
        }
-
-       /*
-        * Choose right pipe for splice. Metadata and trace data are handled by
-        * different threads hence the use of two pipes in order not to race or
-        * corrupt the written data.
-        */
-       if (stream->metadata_flag) {
-               splice_pipe = ctx->consumer_splice_metadata_pipe;
-       } else {
-               splice_pipe = ctx->consumer_thread_pipe;
-       }
+       splice_pipe = stream->splice_pipe;
 
        /* Write metadata stream id before payload */
        if (relayd) {
-               int total_len = len;
+               unsigned long total_len = len;
 
                if (stream->metadata_flag) {
                        /*
@@ -1604,31 +1719,21 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                                        padding);
                        if (ret < 0) {
                                written = ret;
-                               /* Socket operation failed. We consider the relayd dead */
-                               if (ret == -EBADF) {
-                                       WARN("Remote relayd disconnected. Stopping");
-                                       relayd_hang_up = 1;
-                                       goto write_error;
-                               }
-                               goto end;
+                               relayd_hang_up = 1;
+                               goto write_error;
                        }
 
                        total_len += sizeof(struct lttcomm_relayd_metadata_payload);
                }
 
                ret = write_relayd_stream_header(stream, total_len, padding, relayd);
-               if (ret >= 0) {
-                       /* Use the returned socket. */
-                       outfd = ret;
-               } else {
-                       /* Socket operation failed. We consider the relayd dead */
-                       if (ret == -EBADF) {
-                               WARN("Remote relayd disconnected. Stopping");
-                               relayd_hang_up = 1;
-                               goto write_error;
-                       }
-                       goto end;
+               if (ret < 0) {
+                       written = ret;
+                       relayd_hang_up = 1;
+                       goto write_error;
                }
+               /* Use the returned socket. */
+               outfd = ret;
        } else {
                /* No streaming, we have to set the len with the full padding */
                len += padding;
@@ -1645,6 +1750,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                                        stream->out_fd, &(stream->tracefile_count_current),
                                        &stream->out_fd);
                        if (ret < 0) {
+                               written = ret;
                                ERR("Rotating output file");
                                goto end;
                        }
@@ -1656,6 +1762,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                                                stream->chan->tracefile_size,
                                                stream->tracefile_count_current);
                                if (ret < 0) {
+                                       written = ret;
                                        goto end;
                                }
                                stream->index_fd = ret;
@@ -1677,57 +1784,51 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                                SPLICE_F_MOVE | SPLICE_F_MORE);
                DBG("splice chan to pipe, ret %zd", ret_splice);
                if (ret_splice < 0) {
-                       PERROR("Error in relay splice");
-                       if (written == 0) {
-                               written = ret_splice;
-                       }
                        ret = errno;
+                       written = -ret;
+                       PERROR("Error in relay splice");
                        goto splice_error;
                }
 
                /* Handle stream on the relayd if the output is on the network */
-               if (relayd) {
-                       if (stream->metadata_flag) {
-                               size_t metadata_payload_size =
-                                       sizeof(struct lttcomm_relayd_metadata_payload);
+               if (relayd && stream->metadata_flag) {
+                       size_t metadata_payload_size =
+                               sizeof(struct lttcomm_relayd_metadata_payload);
 
-                               /* Update counter to fit the spliced data */
-                               ret_splice += metadata_payload_size;
-                               len += metadata_payload_size;
-                               /*
-                                * We do this so the return value can match the len passed as
-                                * argument to this function.
-                                */
-                               written -= metadata_payload_size;
-                       }
+                       /* Update counter to fit the spliced data */
+                       ret_splice += metadata_payload_size;
+                       len += metadata_payload_size;
+                       /*
+                        * We do this so the return value can match the len passed as
+                        * argument to this function.
+                        */
+                       written -= metadata_payload_size;
                }
 
                /* Splice data out */
                ret_splice = splice(splice_pipe[0], NULL, outfd, NULL,
                                ret_splice, SPLICE_F_MOVE | SPLICE_F_MORE);
-               DBG("Consumer splice pipe to file, ret %zd", ret_splice);
+               DBG("Consumer splice pipe to file (out_fd: %d), ret %zd",
+                               outfd, ret_splice);
                if (ret_splice < 0) {
-                       PERROR("Error in file splice");
-                       if (written == 0) {
-                               written = ret_splice;
-                       }
-                       /* Socket operation failed. We consider the relayd dead */
-                       if (errno == EBADF || errno == EPIPE) {
-                               WARN("Remote relayd disconnected. Stopping");
-                               relayd_hang_up = 1;
-                               goto write_error;
-                       }
                        ret = errno;
-                       goto splice_error;
+                       written = -ret;
+                       relayd_hang_up = 1;
+                       goto write_error;
                } else if (ret_splice > len) {
-                       errno = EINVAL;
-                       PERROR("Wrote more data than requested %zd (len: %lu)",
-                                       ret_splice, len);
-                       written += ret_splice;
+                       /*
+                        * We don't expect this code path to be executed but you never know
+                        * so this is an extra protection agains a buggy splice().
+                        */
                        ret = errno;
+                       written += ret_splice;
+                       PERROR("Wrote more data than requested %zd (len: %lu)", ret_splice,
+                                       len);
                        goto splice_error;
+               } else {
+                       /* All good, update current len and continue. */
+                       len -= ret_splice;
                }
-               len -= ret_splice;
 
                /* This call is useless on a socket so better save a syscall. */
                if (!relayd) {
@@ -1740,9 +1841,6 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                written += ret_splice;
        }
        lttng_consumer_sync_trace_file(stream, orig_offset);
-
-       ret = ret_splice;
-
        goto end;
 
 write_error:
@@ -1836,61 +1934,7 @@ int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        }
 }
 
-/*
- * Iterate over all streams of the hashtable and free them properly.
- *
- * WARNING: *MUST* be used with data stream only.
- */
-static void destroy_data_stream_ht(struct lttng_ht *ht)
-{
-       struct lttng_ht_iter iter;
-       struct lttng_consumer_stream *stream;
-
-       if (ht == NULL) {
-               return;
-       }
-
-       rcu_read_lock();
-       cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
-               /*
-                * Ignore return value since we are currently cleaning up so any error
-                * can't be handled.
-                */
-               (void) consumer_del_stream(stream, ht);
-       }
-       rcu_read_unlock();
-
-       lttng_ht_destroy(ht);
-}
-
-/*
- * Iterate over all streams of the hashtable and free them properly.
- *
- * XXX: Should not be only for metadata stream or else use an other name.
- */
-static void destroy_stream_ht(struct lttng_ht *ht)
-{
-       struct lttng_ht_iter iter;
-       struct lttng_consumer_stream *stream;
-
-       if (ht == NULL) {
-               return;
-       }
-
-       rcu_read_lock();
-       cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
-               /*
-                * Ignore return value since we are currently cleaning up so any error
-                * can't be handled.
-                */
-               (void) consumer_del_metadata_stream(stream, ht);
-       }
-       rcu_read_unlock();
-
-       lttng_ht_destroy(ht);
-}
-
-void lttng_consumer_close_metadata(void)
+void lttng_consumer_close_all_metadata(void)
 {
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
@@ -1908,7 +1952,7 @@ void lttng_consumer_close_metadata(void)
                 * because at this point we are sure that the metadata producer is
                 * either dead or blocked.
                 */
-               lttng_ustconsumer_close_metadata(metadata_ht);
+               lttng_ustconsumer_close_all_metadata(metadata_ht);
                break;
        default:
                ERR("Unknown consumer_data type");
@@ -1922,10 +1966,7 @@ void lttng_consumer_close_metadata(void)
 void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
                struct lttng_ht *ht)
 {
-       int ret;
-       struct lttng_ht_iter iter;
        struct lttng_consumer_channel *free_chan = NULL;
-       struct consumer_relayd_sock_pair *relayd;
 
        assert(stream);
        /*
@@ -1936,96 +1977,17 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
 
        DBG3("Consumer delete metadata stream %d", stream->wait_fd);
 
-       if (ht == NULL) {
-               /* Means the stream was allocated but not successfully added */
-               goto free_stream_rcu;
-       }
-
        pthread_mutex_lock(&consumer_data.lock);
        pthread_mutex_lock(&stream->chan->lock);
        pthread_mutex_lock(&stream->lock);
 
-       switch (consumer_data.type) {
-       case LTTNG_CONSUMER_KERNEL:
-               if (stream->mmap_base != NULL) {
-                       ret = munmap(stream->mmap_base, stream->mmap_len);
-                       if (ret != 0) {
-                               PERROR("munmap metadata stream");
-                       }
-               }
-               if (stream->wait_fd >= 0) {
-                       ret = close(stream->wait_fd);
-                       if (ret < 0) {
-                               PERROR("close kernel metadata wait_fd");
-                       }
-               }
-               break;
-       case LTTNG_CONSUMER32_UST:
-       case LTTNG_CONSUMER64_UST:
-               if (stream->monitor) {
-                       /* close the write-side in close_metadata */
-                       ret = close(stream->ust_metadata_poll_pipe[0]);
-                       if (ret < 0) {
-                               PERROR("Close UST metadata read-side poll pipe");
-                       }
-               }
-               lttng_ustconsumer_del_stream(stream);
-               break;
-       default:
-               ERR("Unknown consumer_data type");
-               assert(0);
-               goto end;
-       }
-
-       rcu_read_lock();
-       iter.iter.node = &stream->node.node;
-       ret = lttng_ht_del(ht, &iter);
-       assert(!ret);
-
-       iter.iter.node = &stream->node_channel_id.node;
-       ret = lttng_ht_del(consumer_data.stream_per_chan_id_ht, &iter);
-       assert(!ret);
-
-       iter.iter.node = &stream->node_session_id.node;
-       ret = lttng_ht_del(consumer_data.stream_list_ht, &iter);
-       assert(!ret);
-       rcu_read_unlock();
-
-       if (stream->out_fd >= 0) {
-               ret = close(stream->out_fd);
-               if (ret) {
-                       PERROR("close");
-               }
-       }
-
-       /* Check and cleanup relayd */
-       rcu_read_lock();
-       relayd = consumer_find_relayd(stream->net_seq_idx);
-       if (relayd != NULL) {
-               uatomic_dec(&relayd->refcount);
-               assert(uatomic_read(&relayd->refcount) >= 0);
-
-               /* Closing streams requires to lock the control socket. */
-               pthread_mutex_lock(&relayd->ctrl_sock_mutex);
-               ret = relayd_send_close_stream(&relayd->control_sock,
-                               stream->relayd_stream_id, stream->next_net_seq_num - 1);
-               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
-               if (ret < 0) {
-                       DBG("Unable to close stream on the relayd. Continuing");
-                       /*
-                        * Continue here. There is nothing we can do for the relayd.
-                        * Chances are that the relayd has closed the socket so we just
-                        * continue cleaning up.
-                        */
-               }
+       /* Remove any reference to that stream. */
+       consumer_stream_delete(stream, ht);
 
-               /* Both conditions are met, we destroy the relayd. */
-               if (uatomic_read(&relayd->refcount) == 0 &&
-                               uatomic_read(&relayd->destroy_flag)) {
-                       consumer_destroy_relayd(relayd);
-               }
-       }
-       rcu_read_unlock();
+       /* Close down everything including the relayd if one. */
+       consumer_stream_close(stream);
+       /* Destroy tracer buffers of the stream. */
+       consumer_stream_destroy_buffers(stream);
 
        /* Atomically decrement channel refcount since other threads can use it. */
        if (!uatomic_sub_return(&stream->chan->refcount, 1)
@@ -2034,11 +1996,10 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
                free_chan = stream->chan;
        }
 
-end:
        /*
         * Nullify the stream reference so it is not used after deletion. The
-        * channel lock MUST be acquired before being able to check for
-        * a NULL pointer value.
+        * channel lock MUST be acquired before being able to check for a NULL
+        * pointer value.
         */
        stream->chan->metadata_stream = NULL;
 
@@ -2050,8 +2011,7 @@ end:
                consumer_del_channel(free_chan);
        }
 
-free_stream_rcu:
-       call_rcu(&stream->node.head, free_stream_rcu);
+       consumer_stream_free(stream);
 }
 
 /*
@@ -2196,14 +2156,12 @@ void *consumer_thread_metadata_poll(void *data)
 
        health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA);
 
-       health_code_update();
-
-       metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
-       if (!metadata_ht) {
-               /* ENOMEM at this point. Better to bail out. */
-               goto end_ht;
+       if (testpoint(consumerd_thread_metadata)) {
+               goto error_testpoint;
        }
 
+       health_code_update();
+
        DBG("Thread metadata poll started");
 
        /* Size is set to 1 for the consumer_metadata pipe */
@@ -2223,18 +2181,13 @@ void *consumer_thread_metadata_poll(void *data)
        DBG("Metadata main loop started");
 
        while (1) {
-               health_code_update();
-
-               /* Only the metadata pipe is set */
-               if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) {
-                       err = 0;        /* All is OK */
-                       goto end;
-               }
-
 restart:
-               DBG("Metadata poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
+               health_code_update();
                health_poll_entry();
+               DBG("Metadata poll wait");
                ret = lttng_poll_wait(&events, -1);
+               DBG("Metadata poll return from wait with %d fd(s)",
+                               LTTNG_POLL_GETNB(&events));
                health_poll_exit();
                DBG("Metadata event catched in thread");
                if (ret < 0) {
@@ -2242,7 +2195,10 @@ restart:
                                ERR("Poll EINTR catched");
                                goto restart;
                        }
-                       goto error;
+                       if (LTTNG_POLL_GETNB(&events) == 0) {
+                               err = 0;        /* All is OK */
+                       }
+                       goto end;
                }
 
                nb_fd = ret;
@@ -2254,6 +2210,11 @@ restart:
                        revents = LTTNG_POLL_GETEV(&events, i);
                        pollfd = LTTNG_POLL_GETFD(&events, i);
 
+                       if (!revents) {
+                               /* No activity for this FD (poll implementation). */
+                               continue;
+                       }
+
                        if (pollfd == lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)) {
                                if (revents & (LPOLLERR | LPOLLHUP )) {
                                        DBG("Metadata thread pipe hung up");
@@ -2270,8 +2231,8 @@ restart:
 
                                        pipe_len = lttng_pipe_read(ctx->consumer_metadata_pipe,
                                                        &stream, sizeof(stream));
-                                       if (pipe_len < 0) {
-                                               ERR("read metadata stream, ret: %zd", pipe_len);
+                                       if (pipe_len < sizeof(stream)) {
+                                               PERROR("read metadata stream");
                                                /*
                                                 * Continue here to handle the rest of the streams.
                                                 */
@@ -2290,7 +2251,7 @@ restart:
 
                                        /* Add metadata stream to the global poll events list */
                                        lttng_poll_add(&events, stream->wait_fd,
-                                                       LPOLLIN | LPOLLPRI);
+                                                       LPOLLIN | LPOLLPRI | LPOLLHUP);
                                }
 
                                /* Handle other stream */
@@ -2370,14 +2331,12 @@ restart:
 
        /* All is OK */
        err = 0;
-error:
 end:
        DBG("Metadata poll thread exiting");
 
        lttng_poll_clean(&events);
 end_poll:
-       destroy_stream_ht(metadata_ht);
-end_ht:
+error_testpoint:
        if (err) {
                health_error();
                ERR("Health error occurred in %s", __func__);
@@ -2406,14 +2365,12 @@ void *consumer_thread_data_poll(void *data)
 
        health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_DATA);
 
-       health_code_update();
-
-       data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
-       if (data_ht == NULL) {
-               /* ENOMEM at this point. Better to bail out. */
-               goto end;
+       if (testpoint(consumerd_thread_data)) {
+               goto error_testpoint;
        }
 
+       health_code_update();
+
        local_stream = zmalloc(sizeof(struct lttng_consumer_stream *));
        if (local_stream == NULL) {
                PERROR("local_stream malloc");
@@ -2438,16 +2395,18 @@ void *consumer_thread_data_poll(void *data)
                        free(local_stream);
                        local_stream = NULL;
 
-                       /* allocate for all fds + 1 for the consumer_data_pipe */
-                       pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd));
+                       /*
+                        * Allocate for all fds +1 for the consumer_data_pipe and +1 for
+                        * wake up pipe.
+                        */
+                       pollfd = zmalloc((consumer_data.stream_count + 2) * sizeof(struct pollfd));
                        if (pollfd == NULL) {
                                PERROR("pollfd malloc");
                                pthread_mutex_unlock(&consumer_data.lock);
                                goto end;
                        }
 
-                       /* allocate for all fds + 1 for the consumer_data_pipe */
-                       local_stream = zmalloc((consumer_data.stream_count + 1) *
+                       local_stream = zmalloc((consumer_data.stream_count + 2) *
                                        sizeof(struct lttng_consumer_stream *));
                        if (local_stream == NULL) {
                                PERROR("local_stream malloc");
@@ -2474,9 +2433,9 @@ void *consumer_thread_data_poll(void *data)
                }
                /* poll on the array of fds */
        restart:
-               DBG("polling on %d fd", nb_fd + 1);
+               DBG("polling on %d fd", nb_fd + 2);
                health_poll_entry();
-               num_rdy = poll(pollfd, nb_fd + 1, -1);
+               num_rdy = poll(pollfd, nb_fd + 2, -1);
                health_poll_exit();
                DBG("poll num_rdy : %d", num_rdy);
                if (num_rdy == -1) {
@@ -2505,8 +2464,8 @@ void *consumer_thread_data_poll(void *data)
                        DBG("consumer_data_pipe wake up");
                        pipe_readlen = lttng_pipe_read(ctx->consumer_data_pipe,
                                        &new_stream, sizeof(new_stream));
-                       if (pipe_readlen < 0) {
-                               ERR("Consumer data pipe ret %zd", pipe_readlen);
+                       if (pipe_readlen < sizeof(new_stream)) {
+                               PERROR("Consumer data pipe");
                                /* Continue so we can at least handle the current stream(s). */
                                continue;
                        }
@@ -2525,6 +2484,20 @@ void *consumer_thread_data_poll(void *data)
                        continue;
                }
 
+               /* Handle wakeup pipe. */
+               if (pollfd[nb_fd + 1].revents & (POLLIN | POLLPRI)) {
+                       char dummy;
+                       ssize_t pipe_readlen;
+
+                       pipe_readlen = lttng_pipe_read(ctx->consumer_wakeup_pipe, &dummy,
+                                       sizeof(dummy));
+                       if (pipe_readlen < 0) {
+                               PERROR("Consumer data wakeup pipe");
+                       }
+                       /* We've been awakened to handle stream(s). */
+                       ctx->has_wakeup = 0;
+               }
+
                /* Take care of high priority channels first. */
                for (i = 0; i < nb_fd; i++) {
                        health_code_update();
@@ -2563,7 +2536,8 @@ void *consumer_thread_data_poll(void *data)
                                continue;
                        }
                        if ((pollfd[i].revents & POLLIN) ||
-                                       local_stream[i]->hangup_flush_done) {
+                                       local_stream[i]->hangup_flush_done ||
+                                       local_stream[i]->has_data) {
                                DBG("Normal read on fd %d", pollfd[i].fd);
                                len = ctx->on_buffer_ready(local_stream[i], ctx);
                                /* it's ok to have an unavailable sub-buffer */
@@ -2643,8 +2617,7 @@ end:
         */
        (void) lttng_pipe_write_close(ctx->consumer_metadata_pipe);
 
-       destroy_data_stream_ht(data_ht);
-
+error_testpoint:
        if (err) {
                health_error();
                ERR("Health error occurred in %s", __func__);
@@ -2686,12 +2659,17 @@ void consumer_close_channel_streams(struct lttng_consumer_channel *channel)
                        break;
                case LTTNG_CONSUMER32_UST:
                case LTTNG_CONSUMER64_UST:
-                       /*
-                        * Note: a mutex is taken internally within
-                        * liblttng-ust-ctl to protect timer wakeup_fd
-                        * use from concurrent close.
-                        */
-                       lttng_ustconsumer_close_stream_wakeup(stream);
+                       if (stream->metadata_flag) {
+                               /* Safe and protected by the stream lock. */
+                               lttng_ustconsumer_close_metadata(stream->chan);
+                       } else {
+                               /*
+                                * Note: a mutex is taken internally within
+                                * liblttng-ust-ctl to protect timer wakeup_fd
+                                * use from concurrent close.
+                                */
+                               lttng_ustconsumer_close_stream_wakeup(stream);
+                       }
                        break;
                default:
                        ERR("Unknown consumer_data type");
@@ -2745,6 +2723,10 @@ void *consumer_thread_channel_poll(void *data)
 
        health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_CHANNEL);
 
+       if (testpoint(consumerd_thread_channel)) {
+               goto error_testpoint;
+       }
+
        health_code_update();
 
        channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
@@ -2771,18 +2753,13 @@ void *consumer_thread_channel_poll(void *data)
        DBG("Channel main loop started");
 
        while (1) {
-               health_code_update();
-
-               /* Only the channel pipe is set */
-               if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) {
-                       err = 0;        /* All is OK */
-                       goto end;
-               }
-
 restart:
-               DBG("Channel poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
+               health_code_update();
+               DBG("Channel poll wait");
                health_poll_entry();
                ret = lttng_poll_wait(&events, -1);
+               DBG("Channel poll return from wait with %d fd(s)",
+                               LTTNG_POLL_GETNB(&events));
                health_poll_exit();
                DBG("Channel event catched in thread");
                if (ret < 0) {
@@ -2790,6 +2767,9 @@ restart:
                                ERR("Poll EINTR catched");
                                goto restart;
                        }
+                       if (LTTNG_POLL_GETNB(&events) == 0) {
+                               err = 0;        /* All is OK */
+                       }
                        goto end;
                }
 
@@ -2802,10 +2782,11 @@ restart:
                        revents = LTTNG_POLL_GETEV(&events, i);
                        pollfd = LTTNG_POLL_GETFD(&events, i);
 
-                       /* Just don't waste time if no returned events for the fd */
                        if (!revents) {
+                               /* No activity for this FD (poll implementation). */
                                continue;
                        }
+
                        if (pollfd == ctx->consumer_channel_pipe[0]) {
                                if (revents & (LPOLLERR | LPOLLHUP)) {
                                        DBG("Channel thread pipe hung up");
@@ -2842,7 +2823,14 @@ restart:
                                                break;
                                        case CONSUMER_CHANNEL_DEL:
                                        {
-                                               struct lttng_consumer_stream *stream, *stmp;
+                                               /*
+                                                * This command should never be called if the channel
+                                                * has streams monitored by either the data or metadata
+                                                * thread. The consumer only notify this thread with a
+                                                * channel del. command if it receives a destroy
+                                                * channel command from the session daemon that send it
+                                                * if a command prior to the GET_CHANNEL failed.
+                                                */
 
                                                rcu_read_lock();
                                                chan = consumer_find_channel(key);
@@ -2855,24 +2843,15 @@ restart:
                                                iter.iter.node = &chan->wait_fd_node.node;
                                                ret = lttng_ht_del(channel_ht, &iter);
                                                assert(ret == 0);
-                                               consumer_close_channel_streams(chan);
 
                                                switch (consumer_data.type) {
                                                case LTTNG_CONSUMER_KERNEL:
                                                        break;
                                                case LTTNG_CONSUMER32_UST:
                                                case LTTNG_CONSUMER64_UST:
-                                                       /* Delete streams that might have been left in the stream list. */
-                                                       cds_list_for_each_entry_safe(stream, stmp, &chan->streams.head,
-                                                                       send_node) {
-                                                               health_code_update();
-
-                                                               cds_list_del(&stream->send_node);
-                                                               lttng_ustconsumer_del_stream(stream);
-                                                               uatomic_sub(&stream->chan->refcount, 1);
-                                                               assert(&chan->refcount);
-                                                               free(stream);
-                                                       }
+                                                       health_code_update();
+                                                       /* Destroy streams that might have been left in the stream list. */
+                                                       clean_channel_stream_list(chan);
                                                        break;
                                                default:
                                                        ERR("Unknown consumer_data type");
@@ -2925,6 +2904,12 @@ restart:
                                lttng_poll_del(&events, chan->wait_fd);
                                ret = lttng_ht_del(channel_ht, &iter);
                                assert(ret == 0);
+
+                               /*
+                                * This will close the wait fd for each stream associated to
+                                * this channel AND monitored by the data/metadata thread thus
+                                * will be clean by the right thread.
+                                */
                                consumer_close_channel_streams(chan);
 
                                /* Release our own refcount */
@@ -2946,6 +2931,7 @@ end:
 end_poll:
        destroy_channel_ht(channel_ht);
 end_ht:
+error_testpoint:
        DBG("Channel poll thread exiting");
        if (err) {
                health_error();
@@ -2964,8 +2950,8 @@ static int set_metadata_socket(struct lttng_consumer_local_data *ctx,
        assert(ctx);
        assert(sockpoll);
 
-       if (lttng_consumer_poll_socket(sockpoll) < 0) {
-               ret = -1;
+       ret = lttng_consumer_poll_socket(sockpoll);
+       if (ret) {
                goto error;
        }
        DBG("Metadata connection on client_socket");
@@ -3001,6 +2987,10 @@ void *consumer_thread_sessiond_poll(void *data)
 
        health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_SESSIOND);
 
+       if (testpoint(consumerd_thread_sessiond)) {
+               goto error_testpoint;
+       }
+
        health_code_update();
 
        DBG("Creating command socket %s", ctx->consumer_command_sock_path);
@@ -3030,7 +3020,12 @@ void *consumer_thread_sessiond_poll(void *data)
        consumer_sockpoll[1].fd = client_socket;
        consumer_sockpoll[1].events = POLLIN | POLLPRI;
 
-       if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+       ret = lttng_consumer_poll_socket(consumer_sockpoll);
+       if (ret) {
+               if (ret > 0) {
+                       /* should exit */
+                       err = 0;
+               }
                goto end;
        }
        DBG("Connection on client_socket");
@@ -3047,7 +3042,11 @@ void *consumer_thread_sessiond_poll(void *data)
         * command unix socket.
         */
        ret = set_metadata_socket(ctx, consumer_sockpoll, client_socket);
-       if (ret < 0) {
+       if (ret) {
+               if (ret > 0) {
+                       /* should exit */
+                       err = 0;
+               }
                goto end;
        }
 
@@ -3068,15 +3067,15 @@ void *consumer_thread_sessiond_poll(void *data)
                health_poll_entry();
                ret = lttng_consumer_poll_socket(consumer_sockpoll);
                health_poll_exit();
-               if (ret < 0) {
+               if (ret) {
+                       if (ret > 0) {
+                               /* should exit */
+                               err = 0;
+                       }
                        goto end;
                }
                DBG("Incoming command on sock");
                ret = lttng_consumer_recv_cmd(ctx, sock, consumer_sockpoll);
-               if (ret == -ENOENT) {
-                       DBG("Received STOP command");
-                       goto end;
-               }
                if (ret <= 0) {
                        /*
                         * This could simply be a session daemon quitting. Don't output
@@ -3105,7 +3104,7 @@ end:
         *
         * NOTE: for now, this only applies to the UST tracer.
         */
-       lttng_consumer_close_metadata();
+       lttng_consumer_close_all_metadata();
 
        /*
         * when all fds have hung up, the polling thread
@@ -3121,6 +3120,8 @@ end:
 
        notify_channel_pipe(ctx, NULL, -1, CONSUMER_CHANNEL_QUIT);
 
+       notify_health_quit_pipe(health_quit_pipe);
+
        /* Cleaning up possibly open sockets. */
        if (sock >= 0) {
                ret = close(sock);
@@ -3135,6 +3136,7 @@ end:
                }
        }
 
+error_testpoint:
        if (err) {
                health_error();
                ERR("Health error occurred in %s", __func__);
@@ -3196,12 +3198,42 @@ int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream)
 /*
  * Allocate and set consumer data hash tables.
  */
-void lttng_consumer_init(void)
+int lttng_consumer_init(void)
 {
        consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+       if (!consumer_data.channel_ht) {
+               goto error;
+       }
+
        consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+       if (!consumer_data.relayd_ht) {
+               goto error;
+       }
+
        consumer_data.stream_list_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+       if (!consumer_data.stream_list_ht) {
+               goto error;
+       }
+
        consumer_data.stream_per_chan_id_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+       if (!consumer_data.stream_per_chan_id_ht) {
+               goto error;
+       }
+
+       data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+       if (!data_ht) {
+               goto error;
+       }
+
+       metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+       if (!metadata_ht) {
+               goto error;
+       }
+
+       return 0;
+
+error:
+       return -1;
 }
 
 /*
@@ -3217,7 +3249,7 @@ int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
                uint64_t relayd_session_id)
 {
        int fd = -1, ret = -1, relayd_created = 0;
-       enum lttng_error_code ret_code = LTTNG_OK;
+       enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
        struct consumer_relayd_sock_pair *relayd = NULL;
 
        assert(ctx);
@@ -3253,7 +3285,7 @@ int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
        }
 
        /* First send a status message before receiving the fds. */
-       ret = consumer_send_status_msg(sock, LTTNG_OK);
+       ret = consumer_send_status_msg(sock, LTTCOMM_CONSUMERD_SUCCESS);
        if (ret < 0) {
                /* Somehow, the session daemon is not responding anymore. */
                lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
@@ -3261,7 +3293,9 @@ int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
        }
 
        /* Poll on consumer socket. */
-       if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+       ret = lttng_consumer_poll_socket(consumer_sockpoll);
+       if (ret) {
+               /* Needing to exit in the middle of a command: error. */
                lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
                ret = -EINTR;
                goto error_nosignal;
@@ -3515,15 +3549,6 @@ int consumer_data_pending(uint64_t id)
                 */
                ret = cds_lfht_is_node_deleted(&stream->node.node);
                if (!ret) {
-                       /*
-                        * An empty output file is not valid. We need at least one packet
-                        * generated per stream, even if it contains no event, so it
-                        * contains at least one packet header.
-                        */
-                       if (stream->output_written == 0) {
-                               pthread_mutex_unlock(&stream->lock);
-                               goto data_pending;
-                       }
                        /* Check the stream if there is data in the buffers. */
                        ret = data_pending(stream);
                        if (ret == 1) {
@@ -3596,6 +3621,7 @@ int consumer_send_status_msg(int sock, int ret_code)
 {
        struct lttcomm_consumer_status_msg msg;
 
+       memset(&msg, 0, sizeof(msg));
        msg.ret_code = ret_code;
 
        return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
@@ -3613,10 +3639,11 @@ int consumer_send_status_channel(int sock,
 
        assert(sock >= 0);
 
+       memset(&msg, 0, sizeof(msg));
        if (!channel) {
-               msg.ret_code = -LTTNG_ERR_UST_CHAN_FAIL;
+               msg.ret_code = LTTCOMM_CONSUMERD_CHANNEL_FAIL;
        } else {
-               msg.ret_code = LTTNG_OK;
+               msg.ret_code = LTTCOMM_CONSUMERD_SUCCESS;
                msg.key = channel->key;
                msg.stream_count = channel->streams.count;
        }
This page took 0.043447 seconds and 4 git commands to generate.