+ assert(relayd);
+
+ DBG("Cleaning up relayd sockets");
+
+ /* Save the net sequence index before destroying the object */
+ netidx = relayd->net_seq_idx;
+
+ /*
+ * Delete the relayd from the relayd hash table, close the sockets and free
+ * the object in a RCU call.
+ */
+ consumer_destroy_relayd(relayd);
+
+ /* Set inactive endpoint to all streams */
+ update_endpoint_status_by_netidx(netidx, CONSUMER_ENDPOINT_INACTIVE);
+
+ /*
+ * With a local data context, notify the threads that the streams' state
+ * have changed. The write() action on the pipe acts as an "implicit"
+ * memory barrier ordering the updates of the end point status from the
+ * read of this status which happens AFTER receiving this notify.
+ */
+ if (ctx) {
+ notify_thread_lttng_pipe(ctx->consumer_data_pipe);
+ notify_thread_lttng_pipe(ctx->consumer_metadata_pipe);
+ }
+}
+
+/*
+ * Flag a relayd socket pair for destruction. Destroy it if the refcount
+ * reaches zero.
+ *
+ * RCU read side lock MUST be aquired before calling this function.
+ */
+void consumer_flag_relayd_for_destroy(struct consumer_relayd_sock_pair *relayd)
+{
+ assert(relayd);
+
+ /* Set destroy flag for this object */
+ uatomic_set(&relayd->destroy_flag, 1);
+
+ /* Destroy the relayd if refcount is 0 */
+ if (uatomic_read(&relayd->refcount) == 0) {
+ consumer_destroy_relayd(relayd);
+ }
+}
+
+/*
+ * Completly destroy stream from every visiable data structure and the given
+ * hash table if one.
+ *
+ * One this call returns, the stream object is not longer usable nor visible.
+ */
+void consumer_del_stream(struct lttng_consumer_stream *stream,
+ struct lttng_ht *ht)
+{
+ consumer_stream_destroy(stream, ht);
+}
+
+/*
+ * XXX naming of del vs destroy is all mixed up.
+ */
+void consumer_del_stream_for_data(struct lttng_consumer_stream *stream)
+{
+ consumer_stream_destroy(stream, data_ht);
+}
+
+void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream)
+{
+ consumer_stream_destroy(stream, metadata_ht);
+}
+
+struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
+ uint64_t stream_key,
+ enum lttng_consumer_stream_state state,
+ const char *channel_name,
+ uid_t uid,
+ gid_t gid,
+ uint64_t relayd_id,
+ uint64_t session_id,
+ int cpu,
+ int *alloc_ret,
+ enum consumer_channel_type type,
+ unsigned int monitor)
+{
+ int ret;
+ struct lttng_consumer_stream *stream;
+
+ stream = zmalloc(sizeof(*stream));
+ if (stream == NULL) {
+ PERROR("malloc struct lttng_consumer_stream");
+ ret = -ENOMEM;
+ goto end;
+ }
+
+ rcu_read_lock();
+
+ stream->key = stream_key;
+ stream->out_fd = -1;
+ stream->out_fd_offset = 0;
+ stream->output_written = 0;
+ stream->state = state;
+ stream->uid = uid;
+ stream->gid = gid;
+ stream->net_seq_idx = relayd_id;
+ stream->session_id = session_id;
+ stream->monitor = monitor;
+ stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE;
+ stream->index_fd = -1;
+ pthread_mutex_init(&stream->lock, NULL);
+
+ /* If channel is the metadata, flag this stream as metadata. */
+ if (type == CONSUMER_CHANNEL_TYPE_METADATA) {
+ stream->metadata_flag = 1;
+ /* Metadata is flat out. */
+ strncpy(stream->name, DEFAULT_METADATA_NAME, sizeof(stream->name));
+ /* Live rendez-vous point. */
+ pthread_cond_init(&stream->metadata_rdv, NULL);
+ pthread_mutex_init(&stream->metadata_rdv_lock, NULL);
+ } else {
+ /* Format stream name to <channel_name>_<cpu_number> */
+ ret = snprintf(stream->name, sizeof(stream->name), "%s_%d",
+ channel_name, cpu);
+ if (ret < 0) {
+ PERROR("snprintf stream name");
+ goto error;
+ }
+ }
+
+ /* Key is always the wait_fd for streams. */
+ lttng_ht_node_init_u64(&stream->node, stream->key);
+
+ /* Init node per channel id key */
+ lttng_ht_node_init_u64(&stream->node_channel_id, channel_key);
+
+ /* Init session id node with the stream session id */
+ lttng_ht_node_init_u64(&stream->node_session_id, stream->session_id);
+
+ DBG3("Allocated stream %s (key %" PRIu64 ", chan_key %" PRIu64
+ " relayd_id %" PRIu64 ", session_id %" PRIu64,
+ stream->name, stream->key, channel_key,
+ stream->net_seq_idx, stream->session_id);
+
+ rcu_read_unlock();
+ return stream;
+
+error:
+ rcu_read_unlock();
+ free(stream);
+end:
+ if (alloc_ret) {
+ *alloc_ret = ret;
+ }
+ return NULL;
+}
+
+/*
+ * Add a stream to the global list protected by a mutex.
+ */
+int consumer_add_data_stream(struct lttng_consumer_stream *stream)
+{
+ struct lttng_ht *ht = data_ht;
+ int ret = 0;
+
+ assert(stream);
+ assert(ht);
+
+ DBG3("Adding consumer stream %" PRIu64, stream->key);
+
+ pthread_mutex_lock(&consumer_data.lock);
+ pthread_mutex_lock(&stream->chan->lock);
+ pthread_mutex_lock(&stream->chan->timer_lock);
+ pthread_mutex_lock(&stream->lock);
+ rcu_read_lock();
+
+ /* Steal stream identifier to avoid having streams with the same key */
+ steal_stream_key(stream->key, ht);
+
+ lttng_ht_add_unique_u64(ht, &stream->node);
+
+ lttng_ht_add_u64(consumer_data.stream_per_chan_id_ht,
+ &stream->node_channel_id);
+
+ /*
+ * Add stream to the stream_list_ht of the consumer data. No need to steal
+ * the key since the HT does not use it and we allow to add redundant keys
+ * into this table.
+ */
+ lttng_ht_add_u64(consumer_data.stream_list_ht, &stream->node_session_id);
+
+ /*
+ * When nb_init_stream_left reaches 0, we don't need to trigger any action
+ * in terms of destroying the associated channel, because the action that
+ * causes the count to become 0 also causes a stream to be added. The
+ * channel deletion will thus be triggered by the following removal of this
+ * stream.
+ */
+ if (uatomic_read(&stream->chan->nb_init_stream_left) > 0) {
+ /* Increment refcount before decrementing nb_init_stream_left */
+ cmm_smp_wmb();
+ uatomic_dec(&stream->chan->nb_init_stream_left);
+ }
+
+ /* Update consumer data once the node is inserted. */
+ consumer_data.stream_count++;
+ consumer_data.need_update = 1;
+
+ rcu_read_unlock();
+ pthread_mutex_unlock(&stream->lock);
+ pthread_mutex_unlock(&stream->chan->timer_lock);
+ pthread_mutex_unlock(&stream->chan->lock);
+ pthread_mutex_unlock(&consumer_data.lock);
+
+ return ret;
+}
+
+void consumer_del_data_stream(struct lttng_consumer_stream *stream)
+{
+ consumer_del_stream(stream, data_ht);
+}
+
+/*
+ * Add relayd socket to global consumer data hashtable. RCU read side lock MUST
+ * be acquired before calling this.
+ */
+static int add_relayd(struct consumer_relayd_sock_pair *relayd)
+{
+ int ret = 0;
+ struct lttng_ht_node_u64 *node;
+ struct lttng_ht_iter iter;
+
+ assert(relayd);
+
+ lttng_ht_lookup(consumer_data.relayd_ht,
+ &relayd->net_seq_idx, &iter);
+ node = lttng_ht_iter_get_node_u64(&iter);
+ if (node != NULL) {
+ goto end;
+ }
+ lttng_ht_add_unique_u64(consumer_data.relayd_ht, &relayd->node);
+
+end:
+ return ret;
+}
+
+/*
+ * Allocate and return a consumer relayd socket.
+ */
+struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(
+ uint64_t net_seq_idx)
+{
+ struct consumer_relayd_sock_pair *obj = NULL;
+
+ /* net sequence index of -1 is a failure */
+ if (net_seq_idx == (uint64_t) -1ULL) {
+ goto error;
+ }
+
+ obj = zmalloc(sizeof(struct consumer_relayd_sock_pair));
+ if (obj == NULL) {
+ PERROR("zmalloc relayd sock");
+ goto error;
+ }
+
+ obj->net_seq_idx = net_seq_idx;
+ obj->refcount = 0;
+ obj->destroy_flag = 0;
+ obj->control_sock.sock.fd = -1;
+ obj->data_sock.sock.fd = -1;
+ lttng_ht_node_init_u64(&obj->node, obj->net_seq_idx);
+ pthread_mutex_init(&obj->ctrl_sock_mutex, NULL);
+
+error:
+ return obj;
+}
+
+/*
+ * Find a relayd socket pair in the global consumer data.
+ *
+ * Return the object if found else NULL.
+ * RCU read-side lock must be held across this call and while using the
+ * returned object.
+ */
+struct consumer_relayd_sock_pair *consumer_find_relayd(uint64_t key)
+{
+ struct lttng_ht_iter iter;
+ struct lttng_ht_node_u64 *node;
+ struct consumer_relayd_sock_pair *relayd = NULL;
+
+ /* Negative keys are lookup failures */
+ if (key == (uint64_t) -1ULL) {
+ goto error;
+ }
+
+ lttng_ht_lookup(consumer_data.relayd_ht, &key,
+ &iter);
+ node = lttng_ht_iter_get_node_u64(&iter);
+ if (node != NULL) {
+ relayd = caa_container_of(node, struct consumer_relayd_sock_pair, node);
+ }
+
+error:
+ return relayd;
+}
+
+/*
+ * Find a relayd and send the stream
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int consumer_send_relayd_stream(struct lttng_consumer_stream *stream,
+ char *path)
+{
+ int ret = 0;
+ struct consumer_relayd_sock_pair *relayd;
+
+ assert(stream);
+ assert(stream->net_seq_idx != -1ULL);
+ assert(path);
+
+ /* The stream is not metadata. Get relayd reference if exists. */
+ rcu_read_lock();
+ relayd = consumer_find_relayd(stream->net_seq_idx);
+ if (relayd != NULL) {
+ /* Add stream on the relayd */
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ ret = relayd_add_stream(&relayd->control_sock, stream->name,
+ path, &stream->relayd_stream_id,
+ stream->chan->tracefile_size, stream->chan->tracefile_count);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ if (ret < 0) {
+ goto end;
+ }
+
+ uatomic_inc(&relayd->refcount);
+ stream->sent_to_relayd = 1;
+ } else {
+ ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't send it.",
+ stream->key, stream->net_seq_idx);
+ ret = -1;
+ goto end;
+ }
+
+ DBG("Stream %s with key %" PRIu64 " sent to relayd id %" PRIu64,
+ stream->name, stream->key, stream->net_seq_idx);
+
+end:
+ rcu_read_unlock();
+ return ret;
+}
+
+/*
+ * Find a relayd and close the stream
+ */
+void close_relayd_stream(struct lttng_consumer_stream *stream)
+{
+ struct consumer_relayd_sock_pair *relayd;
+
+ /* The stream is not metadata. Get relayd reference if exists. */
+ rcu_read_lock();
+ relayd = consumer_find_relayd(stream->net_seq_idx);
+ if (relayd) {
+ consumer_stream_relayd_close(stream, relayd);
+ }
+ rcu_read_unlock();
+}
+
+/*
+ * Handle stream for relayd transmission if the stream applies for network
+ * streaming where the net sequence index is set.
+ *
+ * Return destination file descriptor or negative value on error.
+ */
+static int write_relayd_stream_header(struct lttng_consumer_stream *stream,
+ size_t data_size, unsigned long padding,
+ struct consumer_relayd_sock_pair *relayd)
+{
+ int outfd = -1, ret;
+ struct lttcomm_relayd_data_hdr data_hdr;
+
+ /* Safety net */
+ assert(stream);
+ assert(relayd);
+
+ /* Reset data header */
+ memset(&data_hdr, 0, sizeof(data_hdr));
+
+ if (stream->metadata_flag) {
+ /* Caller MUST acquire the relayd control socket lock */
+ ret = relayd_send_metadata(&relayd->control_sock, data_size);
+ if (ret < 0) {
+ goto error;
+ }
+
+ /* Metadata are always sent on the control socket. */
+ outfd = relayd->control_sock.sock.fd;
+ } else {
+ /* Set header with stream information */
+ data_hdr.stream_id = htobe64(stream->relayd_stream_id);
+ data_hdr.data_size = htobe32(data_size);
+ data_hdr.padding_size = htobe32(padding);
+ /*
+ * Note that net_seq_num below is assigned with the *current* value of
+ * next_net_seq_num and only after that the next_net_seq_num will be
+ * increment. This is why when issuing a command on the relayd using
+ * this next value, 1 should always be substracted in order to compare
+ * the last seen sequence number on the relayd side to the last sent.
+ */
+ data_hdr.net_seq_num = htobe64(stream->next_net_seq_num);
+ /* Other fields are zeroed previously */
+
+ ret = relayd_send_data_hdr(&relayd->data_sock, &data_hdr,
+ sizeof(data_hdr));
+ if (ret < 0) {
+ goto error;
+ }
+
+ ++stream->next_net_seq_num;
+
+ /* Set to go on data socket */
+ outfd = relayd->data_sock.sock.fd;
+ }
+
+error:
+ return outfd;
+}
+
+/*
+ * Allocate and return a new lttng_consumer_channel object using the given key
+ * to initialize the hash table node.
+ *
+ * On error, return NULL.
+ */
+struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
+ uint64_t session_id,
+ const char *pathname,
+ const char *name,
+ uid_t uid,
+ gid_t gid,
+ uint64_t relayd_id,
+ enum lttng_event_output output,
+ uint64_t tracefile_size,
+ uint64_t tracefile_count,
+ uint64_t session_id_per_pid,
+ unsigned int monitor,
+ unsigned int live_timer_interval)
+{
+ struct lttng_consumer_channel *channel;
+
+ channel = zmalloc(sizeof(*channel));
+ if (channel == NULL) {
+ PERROR("malloc struct lttng_consumer_channel");
+ goto end;
+ }
+
+ channel->key = key;
+ channel->refcount = 0;
+ channel->session_id = session_id;
+ channel->session_id_per_pid = session_id_per_pid;
+ channel->uid = uid;
+ channel->gid = gid;
+ channel->relayd_id = relayd_id;
+ channel->output = output;
+ channel->tracefile_size = tracefile_size;
+ channel->tracefile_count = tracefile_count;
+ channel->monitor = monitor;
+ channel->live_timer_interval = live_timer_interval;
+ pthread_mutex_init(&channel->lock, NULL);
+ pthread_mutex_init(&channel->timer_lock, NULL);
+
+ /*
+ * In monitor mode, the streams associated with the channel will be put in
+ * a special list ONLY owned by this channel. So, the refcount is set to 1
+ * here meaning that the channel itself has streams that are referenced.
+ *
+ * On a channel deletion, once the channel is no longer visible, the
+ * refcount is decremented and checked for a zero value to delete it. With
+ * streams in no monitor mode, it will now be safe to destroy the channel.
+ */
+ if (!channel->monitor) {
+ channel->refcount = 1;
+ }
+
+ strncpy(channel->pathname, pathname, sizeof(channel->pathname));
+ channel->pathname[sizeof(channel->pathname) - 1] = '\0';
+
+ strncpy(channel->name, name, sizeof(channel->name));
+ channel->name[sizeof(channel->name) - 1] = '\0';
+
+ lttng_ht_node_init_u64(&channel->node, channel->key);
+
+ channel->wait_fd = -1;
+
+ CDS_INIT_LIST_HEAD(&channel->streams.head);
+
+ DBG("Allocated channel (key %" PRIu64 ")", channel->key)
+
+end:
+ return channel;
+}
+
+/*
+ * Add a channel to the global list protected by a mutex.
+ *
+ * On success 0 is returned else a negative value.
+ */
+int consumer_add_channel(struct lttng_consumer_channel *channel,
+ struct lttng_consumer_local_data *ctx)
+{
+ int ret = 0;
+ struct lttng_ht_node_u64 *node;
+ struct lttng_ht_iter iter;
+
+ pthread_mutex_lock(&consumer_data.lock);
+ pthread_mutex_lock(&channel->lock);
+ pthread_mutex_lock(&channel->timer_lock);
+ rcu_read_lock();
+
+ lttng_ht_lookup(consumer_data.channel_ht, &channel->key, &iter);
+ node = lttng_ht_iter_get_node_u64(&iter);
+ if (node != NULL) {
+ /* Channel already exist. Ignore the insertion */
+ ERR("Consumer add channel key %" PRIu64 " already exists!",
+ channel->key);
+ ret = -EEXIST;
+ goto end;
+ }
+
+ lttng_ht_add_unique_u64(consumer_data.channel_ht, &channel->node);
+
+end:
+ rcu_read_unlock();
+ pthread_mutex_unlock(&channel->timer_lock);
+ pthread_mutex_unlock(&channel->lock);
+ pthread_mutex_unlock(&consumer_data.lock);
+
+ if (!ret && channel->wait_fd != -1 &&
+ channel->type == CONSUMER_CHANNEL_TYPE_DATA) {
+ notify_channel_pipe(ctx, channel, -1, CONSUMER_CHANNEL_ADD);
+ }
+ return ret;
+}
+
+/*
+ * Allocate the pollfd structure and the local view of the out fds to avoid
+ * doing a lookup in the linked list and concurrency issues when writing is
+ * needed. Called with consumer_data.lock held.
+ *
+ * Returns the number of fds in the structures.
+ */
+static int update_poll_array(struct lttng_consumer_local_data *ctx,
+ struct pollfd **pollfd, struct lttng_consumer_stream **local_stream,
+ struct lttng_ht *ht)
+{
+ int i = 0;
+ struct lttng_ht_iter iter;
+ struct lttng_consumer_stream *stream;
+
+ assert(ctx);
+ assert(ht);
+ assert(pollfd);
+ assert(local_stream);
+
+ DBG("Updating poll fd array");
+ rcu_read_lock();
+ cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
+ /*
+ * Only active streams with an active end point can be added to the
+ * poll set and local stream storage of the thread.
+ *
+ * There is a potential race here for endpoint_status to be updated
+ * just after the check. However, this is OK since the stream(s) will
+ * be deleted once the thread is notified that the end point state has
+ * changed where this function will be called back again.
+ */
+ if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM ||
+ stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) {
+ continue;
+ }
+ /*
+ * This clobbers way too much the debug output. Uncomment that if you
+ * need it for debugging purposes.
+ *
+ * DBG("Active FD %d", stream->wait_fd);
+ */
+ (*pollfd)[i].fd = stream->wait_fd;
+ (*pollfd)[i].events = POLLIN | POLLPRI;
+ local_stream[i] = stream;
+ i++;
+ }
+ rcu_read_unlock();
+
+ /*
+ * Insert the consumer_data_pipe at the end of the array and don't
+ * increment i so nb_fd is the number of real FD.
+ */
+ (*pollfd)[i].fd = lttng_pipe_get_readfd(ctx->consumer_data_pipe);
+ (*pollfd)[i].events = POLLIN | POLLPRI;
+ return i;
+}
+
+/*
+ * Poll on the should_quit pipe and the command socket return -1 on error and
+ * should exit, 0 if data is available on the command socket
+ */
+int lttng_consumer_poll_socket(struct pollfd *consumer_sockpoll)
+{
+ int num_rdy;
+
+restart:
+ num_rdy = poll(consumer_sockpoll, 2, -1);
+ if (num_rdy == -1) {
+ /*
+ * Restart interrupted system call.
+ */
+ if (errno == EINTR) {
+ goto restart;
+ }
+ PERROR("Poll error");
+ goto exit;
+ }
+ if (consumer_sockpoll[0].revents & (POLLIN | POLLPRI)) {
+ DBG("consumer_should_quit wake up");
+ goto exit;
+ }
+ return 0;
+
+exit:
+ return -1;
+}
+
+/*
+ * Set the error socket.
+ */
+void lttng_consumer_set_error_sock(struct lttng_consumer_local_data *ctx,
+ int sock)
+{
+ ctx->consumer_error_socket = sock;
+}
+
+/*
+ * Set the command socket path.
+ */
+void lttng_consumer_set_command_sock_path(
+ struct lttng_consumer_local_data *ctx, char *sock)
+{
+ ctx->consumer_command_sock_path = sock;
+}
+
+/*
+ * Send return code to the session daemon.
+ * If the socket is not defined, we return 0, it is not a fatal error
+ */
+int lttng_consumer_send_error(struct lttng_consumer_local_data *ctx, int cmd)
+{
+ if (ctx->consumer_error_socket > 0) {
+ return lttcomm_send_unix_sock(ctx->consumer_error_socket, &cmd,
+ sizeof(enum lttcomm_sessiond_command));
+ }
+
+ return 0;
+}
+
+/*
+ * Close all the tracefiles and stream fds and MUST be called when all
+ * instances are destroyed i.e. when all threads were joined and are ended.
+ */
+void lttng_consumer_cleanup(void)
+{
+ struct lttng_ht_iter iter;
+ struct lttng_consumer_channel *channel;
+
+ rcu_read_lock();
+
+ cds_lfht_for_each_entry(consumer_data.channel_ht->ht, &iter.iter, channel,
+ node.node) {
+ consumer_del_channel(channel);
+ }
+
+ rcu_read_unlock();
+
+ lttng_ht_destroy(consumer_data.channel_ht);
+
+ cleanup_relayd_ht();
+
+ lttng_ht_destroy(consumer_data.stream_per_chan_id_ht);
+
+ /*
+ * This HT contains streams that are freed by either the metadata thread or
+ * the data thread so we do *nothing* on the hash table and simply destroy
+ * it.
+ */
+ lttng_ht_destroy(consumer_data.stream_list_ht);
+}
+
+/*
+ * Called from signal handler.
+ */
+void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx)
+{
+ int ret;
+ consumer_quit = 1;
+ do {
+ ret = write(ctx->consumer_should_quit[1], "4", 1);
+ } while (ret < 0 && errno == EINTR);
+ if (ret < 0 || ret != 1) {
+ PERROR("write consumer quit");
+ }
+
+ DBG("Consumer flag that it should quit");
+}
+
+void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream,
+ off_t orig_offset)
+{
+ int outfd = stream->out_fd;
+
+ /*
+ * This does a blocking write-and-wait on any page that belongs to the
+ * subbuffer prior to the one we just wrote.
+ * Don't care about error values, as these are just hints and ways to
+ * limit the amount of page cache used.
+ */
+ if (orig_offset < stream->max_sb_size) {
+ return;
+ }
+ lttng_sync_file_range(outfd, orig_offset - stream->max_sb_size,
+ stream->max_sb_size,
+ SYNC_FILE_RANGE_WAIT_BEFORE
+ | SYNC_FILE_RANGE_WRITE
+ | SYNC_FILE_RANGE_WAIT_AFTER);
+ /*
+ * Give hints to the kernel about how we access the file:
+ * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
+ * we write it.
+ *
+ * We need to call fadvise again after the file grows because the
+ * kernel does not seem to apply fadvise to non-existing parts of the
+ * file.
+ *
+ * Call fadvise _after_ having waited for the page writeback to
+ * complete because the dirty page writeback semantic is not well
+ * defined. So it can be expected to lead to lower throughput in
+ * streaming.
+ */
+ posix_fadvise(outfd, orig_offset - stream->max_sb_size,
+ stream->max_sb_size, POSIX_FADV_DONTNEED);
+}
+
+/*
+ * Initialise the necessary environnement :
+ * - create a new context
+ * - create the poll_pipe
+ * - create the should_quit pipe (for signal handler)
+ * - create the thread pipe (for splice)
+ *
+ * Takes a function pointer as argument, this function is called when data is
+ * available on a buffer. This function is responsible to do the
+ * kernctl_get_next_subbuf, read the data with mmap or splice depending on the
+ * buffer configuration and then kernctl_put_next_subbuf at the end.
+ *
+ * Returns a pointer to the new context or NULL on error.
+ */
+struct lttng_consumer_local_data *lttng_consumer_create(
+ enum lttng_consumer_type type,
+ ssize_t (*buffer_ready)(struct lttng_consumer_stream *stream,
+ struct lttng_consumer_local_data *ctx),
+ int (*recv_channel)(struct lttng_consumer_channel *channel),
+ int (*recv_stream)(struct lttng_consumer_stream *stream),
+ int (*update_stream)(uint64_t stream_key, uint32_t state))
+{
+ int ret;
+ struct lttng_consumer_local_data *ctx;
+
+ assert(consumer_data.type == LTTNG_CONSUMER_UNKNOWN ||
+ consumer_data.type == type);
+ consumer_data.type = type;
+
+ ctx = zmalloc(sizeof(struct lttng_consumer_local_data));
+ if (ctx == NULL) {
+ PERROR("allocating context");
+ goto error;
+ }
+
+ ctx->consumer_error_socket = -1;
+ ctx->consumer_metadata_socket = -1;
+ pthread_mutex_init(&ctx->metadata_socket_lock, NULL);
+ /* assign the callbacks */
+ ctx->on_buffer_ready = buffer_ready;
+ ctx->on_recv_channel = recv_channel;
+ ctx->on_recv_stream = recv_stream;
+ ctx->on_update_stream = update_stream;
+
+ ctx->consumer_data_pipe = lttng_pipe_open(0);
+ if (!ctx->consumer_data_pipe) {
+ goto error_poll_pipe;