+ * Add channel to internal consumer state.
+ *
+ * Returns 0 on success or else a negative value.
+ */
+static int add_channel(struct lttng_consumer_channel *channel,
+ struct lttng_consumer_local_data *ctx)
+{
+ int ret = 0;
+
+ assert(channel);
+ assert(ctx);
+
+ if (ctx->on_recv_channel != NULL) {
+ ret = ctx->on_recv_channel(channel);
+ if (ret == 0) {
+ ret = consumer_add_channel(channel, ctx);
+ } else if (ret < 0) {
+ /* Most likely an ENOMEM. */
+ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
+ goto error;
+ }
+ } else {
+ ret = consumer_add_channel(channel, ctx);
+ }
+
+ DBG("UST consumer channel added (key: %" PRIu64 ")", channel->key);
+
+error:
+ return ret;
+}
+
+/*
+ * Allocate and return a consumer channel object.
+ */
+static struct lttng_consumer_channel *allocate_channel(uint64_t session_id,
+ const char *pathname, const char *name, uid_t uid, gid_t gid,
+ uint64_t relayd_id, uint64_t key, enum lttng_event_output output,
+ uint64_t tracefile_size, uint64_t tracefile_count,
+ uint64_t session_id_per_pid, unsigned int monitor)
+{
+ assert(pathname);
+ assert(name);
+
+ return consumer_allocate_channel(key, session_id, pathname, name, uid,
+ gid, relayd_id, output, tracefile_size,
+ tracefile_count, session_id_per_pid, monitor);
+}
+
+/*
+ * Allocate and return a consumer stream object. If _alloc_ret is not NULL, the
+ * error value if applicable is set in it else it is kept untouched.
+ *
+ * Return NULL on error else the newly allocated stream object.
+ */
+static struct lttng_consumer_stream *allocate_stream(int cpu, int key,
+ struct lttng_consumer_channel *channel,
+ struct lttng_consumer_local_data *ctx, int *_alloc_ret)
+{
+ int alloc_ret;
+ struct lttng_consumer_stream *stream = NULL;
+
+ assert(channel);
+ assert(ctx);
+
+ stream = consumer_allocate_stream(channel->key,
+ key,
+ LTTNG_CONSUMER_ACTIVE_STREAM,
+ channel->name,
+ channel->uid,
+ channel->gid,
+ channel->relayd_id,
+ channel->session_id,
+ cpu,
+ &alloc_ret,
+ channel->type,
+ channel->monitor);
+ if (stream == NULL) {
+ switch (alloc_ret) {
+ case -ENOENT:
+ /*
+ * We could not find the channel. Can happen if cpu hotplug
+ * happens while tearing down.
+ */
+ DBG3("Could not find channel");
+ break;
+ case -ENOMEM:
+ case -EINVAL:
+ default:
+ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
+ break;
+ }
+ goto error;
+ }
+
+ stream->chan = channel;
+
+error:
+ if (_alloc_ret) {
+ *_alloc_ret = alloc_ret;
+ }
+ return stream;
+}
+
+/*
+ * Send the given stream pointer to the corresponding thread.
+ *
+ * Returns 0 on success else a negative value.
+ */
+static int send_stream_to_thread(struct lttng_consumer_stream *stream,
+ struct lttng_consumer_local_data *ctx)
+{
+ int ret;
+ struct lttng_pipe *stream_pipe;
+
+ /* Get the right pipe where the stream will be sent. */
+ if (stream->metadata_flag) {
+ ret = consumer_add_metadata_stream(stream);
+ if (ret) {
+ ERR("Consumer add metadata stream %" PRIu64 " failed.",
+ stream->key);
+ goto error;
+ }
+ stream_pipe = ctx->consumer_metadata_pipe;
+ } else {
+ ret = consumer_add_data_stream(stream);
+ if (ret) {
+ ERR("Consumer add stream %" PRIu64 " failed.",
+ stream->key);
+ goto error;
+ }
+ stream_pipe = ctx->consumer_data_pipe;
+ }
+
+ /*
+ * From this point on, the stream's ownership has been moved away from
+ * the channel and becomes globally visible.
+ */
+ stream->globally_visible = 1;
+
+ ret = lttng_pipe_write(stream_pipe, &stream, sizeof(stream));
+ if (ret < 0) {
+ ERR("Consumer write %s stream to pipe %d",
+ stream->metadata_flag ? "metadata" : "data",
+ lttng_pipe_get_writefd(stream_pipe));
+ if (stream->metadata_flag) {
+ consumer_del_stream_for_metadata(stream);
+ } else {
+ consumer_del_stream_for_data(stream);
+ }
+ }
+error:
+ return ret;
+}
+
+/*
+ * Create streams for the given channel using liblttng-ust-ctl.
+ *
+ * Return 0 on success else a negative value.
+ */
+static int create_ust_streams(struct lttng_consumer_channel *channel,
+ struct lttng_consumer_local_data *ctx)
+{
+ int ret, cpu = 0;
+ struct ustctl_consumer_stream *ustream;
+ struct lttng_consumer_stream *stream;
+
+ assert(channel);
+ assert(ctx);
+
+ /*
+ * While a stream is available from ustctl. When NULL is returned, we've
+ * reached the end of the possible stream for the channel.
+ */
+ while ((ustream = ustctl_create_stream(channel->uchan, cpu))) {
+ int wait_fd;
+ int ust_metadata_pipe[2];
+
+ if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA && channel->monitor) {
+ ret = utils_create_pipe_cloexec_nonblock(ust_metadata_pipe);
+ if (ret < 0) {
+ ERR("Create ust metadata poll pipe");
+ goto error;
+ }
+ wait_fd = ust_metadata_pipe[0];
+ } else {
+ wait_fd = ustctl_stream_get_wait_fd(ustream);
+ }
+
+ /* Allocate consumer stream object. */
+ stream = allocate_stream(cpu, wait_fd, channel, ctx, &ret);
+ if (!stream) {
+ goto error_alloc;
+ }
+ stream->ustream = ustream;
+ /*
+ * Store it so we can save multiple function calls afterwards since
+ * this value is used heavily in the stream threads. This is UST
+ * specific so this is why it's done after allocation.
+ */
+ stream->wait_fd = wait_fd;
+
+ /*
+ * Increment channel refcount since the channel reference has now been
+ * assigned in the allocation process above.
+ */
+ if (stream->chan->monitor) {
+ uatomic_inc(&stream->chan->refcount);
+ }
+
+ /*
+ * Order is important this is why a list is used. On error, the caller
+ * should clean this list.
+ */
+ cds_list_add_tail(&stream->send_node, &channel->streams.head);
+
+ ret = ustctl_get_max_subbuf_size(stream->ustream,
+ &stream->max_sb_size);
+ if (ret < 0) {
+ ERR("ustctl_get_max_subbuf_size failed for stream %s",
+ stream->name);
+ goto error;
+ }
+
+ /* Do actions once stream has been received. */
+ if (ctx->on_recv_stream) {
+ ret = ctx->on_recv_stream(stream);
+ if (ret < 0) {
+ goto error;
+ }
+ }
+
+ DBG("UST consumer add stream %s (key: %" PRIu64 ") with relayd id %" PRIu64,
+ stream->name, stream->key, stream->relayd_stream_id);
+
+ /* Set next CPU stream. */
+ channel->streams.count = ++cpu;
+
+ /* Keep stream reference when creating metadata. */
+ if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA) {
+ channel->metadata_stream = stream;
+ stream->ust_metadata_poll_pipe[0] = ust_metadata_pipe[0];
+ stream->ust_metadata_poll_pipe[1] = ust_metadata_pipe[1];
+ }
+ }
+
+ return 0;
+
+error:
+error_alloc:
+ return ret;
+}
+
+/*
+ * Create an UST channel with the given attributes and send it to the session
+ * daemon using the ust ctl API.
+ *
+ * Return 0 on success or else a negative value.
+ */
+static int create_ust_channel(struct ustctl_consumer_channel_attr *attr,
+ struct ustctl_consumer_channel **chanp)
+{
+ int ret;
+ struct ustctl_consumer_channel *channel;
+
+ assert(attr);
+ assert(chanp);
+
+ DBG3("Creating channel to ustctl with attr: [overwrite: %d, "
+ "subbuf_size: %" PRIu64 ", num_subbuf: %" PRIu64 ", "
+ "switch_timer_interval: %u, read_timer_interval: %u, "
+ "output: %d, type: %d", attr->overwrite, attr->subbuf_size,
+ attr->num_subbuf, attr->switch_timer_interval,
+ attr->read_timer_interval, attr->output, attr->type);
+
+ channel = ustctl_create_channel(attr);
+ if (!channel) {
+ ret = -1;
+ goto error_create;
+ }
+
+ *chanp = channel;
+
+ return 0;
+
+error_create:
+ return ret;
+}
+
+/*
+ * Send a single given stream to the session daemon using the sock.
+ *
+ * Return 0 on success else a negative value.
+ */
+static int send_sessiond_stream(int sock, struct lttng_consumer_stream *stream)
+{
+ int ret;
+
+ assert(stream);
+ assert(sock >= 0);
+
+ DBG("UST consumer sending stream %" PRIu64 " to sessiond", stream->key);
+
+ /* Send stream to session daemon. */
+ ret = ustctl_send_stream_to_sessiond(sock, stream->ustream);
+ if (ret < 0) {
+ goto error;
+ }
+
+error:
+ return ret;
+}
+
+/*
+ * Send channel to sessiond.
+ *
+ * Return 0 on success or else a negative value.
+ */
+static int send_sessiond_channel(int sock,
+ struct lttng_consumer_channel *channel,
+ struct lttng_consumer_local_data *ctx, int *relayd_error)
+{
+ int ret, ret_code = LTTNG_OK;
+ struct lttng_consumer_stream *stream;
+
+ assert(channel);
+ assert(ctx);
+ assert(sock >= 0);
+
+ DBG("UST consumer sending channel %s to sessiond", channel->name);
+
+ if (channel->relayd_id != (uint64_t) -1ULL) {
+ cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
+ /* Try to send the stream to the relayd if one is available. */
+ ret = consumer_send_relayd_stream(stream, stream->chan->pathname);
+ if (ret < 0) {
+ /*
+ * Flag that the relayd was the problem here probably due to a
+ * communicaton error on the socket.
+ */
+ if (relayd_error) {
+ *relayd_error = 1;
+ }
+ ret_code = LTTNG_ERR_RELAYD_CONNECT_FAIL;
+ }
+ }
+ }
+
+ /* Inform sessiond that we are about to send channel and streams. */
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0 || ret_code != LTTNG_OK) {
+ /*
+ * Either the session daemon is not responding or the relayd died so we
+ * stop now.
+ */
+ goto error;
+ }
+
+ /* Send channel to sessiond. */
+ ret = ustctl_send_channel_to_sessiond(sock, channel->uchan);
+ if (ret < 0) {
+ goto error;
+ }
+
+ ret = ustctl_channel_close_wakeup_fd(channel->uchan);
+ if (ret < 0) {
+ goto error;
+ }
+
+ /* The channel was sent successfully to the sessiond at this point. */
+ cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
+ /* Send stream to session daemon. */
+ ret = send_sessiond_stream(sock, stream);
+ if (ret < 0) {
+ goto error;
+ }
+ }
+
+ /* Tell sessiond there is no more stream. */
+ ret = ustctl_send_stream_to_sessiond(sock, NULL);
+ if (ret < 0) {
+ goto error;
+ }
+
+ DBG("UST consumer NULL stream sent to sessiond");
+
+ return 0;
+
+error:
+ if (ret_code != LTTNG_OK) {
+ ret = -1;
+ }
+ return ret;
+}
+
+/*
+ * Creates a channel and streams and add the channel it to the channel internal
+ * state. The created stream must ONLY be sent once the GET_CHANNEL command is
+ * received.
+ *
+ * Return 0 on success or else, a negative value is returned and the channel
+ * MUST be destroyed by consumer_del_channel().
+ */
+static int ask_channel(struct lttng_consumer_local_data *ctx, int sock,
+ struct lttng_consumer_channel *channel,
+ struct ustctl_consumer_channel_attr *attr)
+{
+ int ret;
+
+ assert(ctx);
+ assert(channel);
+ assert(attr);
+
+ /*
+ * This value is still used by the kernel consumer since for the kernel,
+ * the stream ownership is not IN the consumer so we need to have the
+ * number of left stream that needs to be initialized so we can know when
+ * to delete the channel (see consumer.c).
+ *
+ * As for the user space tracer now, the consumer creates and sends the
+ * stream to the session daemon which only sends them to the application
+ * once every stream of a channel is received making this value useless
+ * because we they will be added to the poll thread before the application
+ * receives them. This ensures that a stream can not hang up during
+ * initilization of a channel.
+ */
+ channel->nb_init_stream_left = 0;
+
+ /* The reply msg status is handled in the following call. */
+ ret = create_ust_channel(attr, &channel->uchan);
+ if (ret < 0) {
+ goto end;
+ }
+
+ channel->wait_fd = ustctl_channel_get_wait_fd(channel->uchan);
+
+ /*
+ * For the snapshots (no monitor), we create the metadata streams
+ * on demand, not during the channel creation.
+ */
+ if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA && !channel->monitor) {
+ ret = 0;
+ goto end;
+ }
+
+ /* Open all streams for this channel. */
+ ret = create_ust_streams(channel, ctx);
+ if (ret < 0) {
+ goto end;
+ }
+
+end:
+ return ret;
+}
+
+/*
+ * Send all stream of a channel to the right thread handling it.
+ *
+ * On error, return a negative value else 0 on success.
+ */
+static int send_streams_to_thread(struct lttng_consumer_channel *channel,
+ struct lttng_consumer_local_data *ctx)
+{
+ int ret = 0;
+ struct lttng_consumer_stream *stream, *stmp;
+
+ assert(channel);
+ assert(ctx);
+
+ /* Send streams to the corresponding thread. */
+ cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
+ send_node) {
+ /* Sending the stream to the thread. */
+ ret = send_stream_to_thread(stream, ctx);
+ if (ret < 0) {
+ /*
+ * If we are unable to send the stream to the thread, there is
+ * a big problem so just stop everything.
+ */
+ /* Remove node from the channel stream list. */
+ cds_list_del(&stream->send_node);
+ goto error;
+ }
+
+ /* Remove node from the channel stream list. */
+ cds_list_del(&stream->send_node);
+
+ }
+
+error:
+ return ret;
+}
+
+/*
+ * Flush channel's streams using the given key to retrieve the channel.
+ *
+ * Return 0 on success else an LTTng error code.
+ */
+static int flush_channel(uint64_t chan_key)
+{
+ int ret = 0;
+ struct lttng_consumer_channel *channel;
+ struct lttng_consumer_stream *stream;
+ struct lttng_ht *ht;
+ struct lttng_ht_iter iter;
+
+ DBG("UST consumer flush channel key %" PRIu64, chan_key);
+
+ rcu_read_lock();
+ channel = consumer_find_channel(chan_key);
+ if (!channel) {
+ ERR("UST consumer flush channel %" PRIu64 " not found", chan_key);
+ ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
+ goto error;
+ }
+
+ ht = consumer_data.stream_per_chan_id_ht;
+
+ /* For each stream of the channel id, flush it. */
+ cds_lfht_for_each_entry_duplicate(ht->ht,
+ ht->hash_fct(&channel->key, lttng_ht_seed), ht->match_fct,
+ &channel->key, &iter.iter, stream, node_channel_id.node) {
+ ustctl_flush_buffer(stream->ustream, 1);
+ }
+error:
+ rcu_read_unlock();
+ return ret;
+}
+
+/*
+ * Close metadata stream wakeup_fd using the given key to retrieve the channel.
+ * RCU read side lock MUST be acquired before calling this function.
+ *
+ * Return 0 on success else an LTTng error code.
+ */
+static int close_metadata(uint64_t chan_key)
+{
+ int ret = 0;
+ struct lttng_consumer_channel *channel;
+
+ DBG("UST consumer close metadata key %" PRIu64, chan_key);
+
+ channel = consumer_find_channel(chan_key);
+ if (!channel) {
+ /*
+ * This is possible if the metadata thread has issue a delete because
+ * the endpoint point of the stream hung up. There is no way the
+ * session daemon can know about it thus use a DBG instead of an actual
+ * error.
+ */
+ DBG("UST consumer close metadata %" PRIu64 " not found", chan_key);
+ ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
+ goto error;
+ }
+
+ pthread_mutex_lock(&consumer_data.lock);
+ pthread_mutex_lock(&channel->lock);
+
+ if (cds_lfht_is_node_deleted(&channel->node.node)) {
+ goto error_unlock;
+ }
+
+ if (channel->switch_timer_enabled == 1) {
+ DBG("Deleting timer on metadata channel");
+ consumer_timer_switch_stop(channel);
+ }
+
+ if (channel->metadata_stream) {
+ ret = ustctl_stream_close_wakeup_fd(channel->metadata_stream->ustream);
+ if (ret < 0) {
+ ERR("UST consumer unable to close fd of metadata (ret: %d)", ret);
+ ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
+ goto error_unlock;
+ }
+ if (channel->monitor) {
+ /* close the read-side in consumer_del_metadata_stream */
+ ret = close(channel->metadata_stream->ust_metadata_poll_pipe[1]);
+ if (ret < 0) {
+ PERROR("Close UST metadata write-side poll pipe");
+ }
+ }
+ }
+
+error_unlock:
+ pthread_mutex_unlock(&channel->lock);
+ pthread_mutex_unlock(&consumer_data.lock);
+error:
+ return ret;
+}
+
+/*
+ * RCU read side lock MUST be acquired before calling this function.
+ *
+ * Return 0 on success else an LTTng error code.
+ */
+static int setup_metadata(struct lttng_consumer_local_data *ctx, uint64_t key)
+{
+ int ret;
+ struct lttng_consumer_channel *metadata;
+
+ DBG("UST consumer setup metadata key %" PRIu64, key);
+
+ metadata = consumer_find_channel(key);
+ if (!metadata) {
+ ERR("UST consumer push metadata %" PRIu64 " not found", key);
+ ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
+ goto end;
+ }
+
+ /*
+ * In no monitor mode, the metadata channel has no stream(s) so skip the
+ * ownership transfer to the metadata thread.
+ */
+ if (!metadata->monitor) {
+ DBG("Metadata channel in no monitor");
+ ret = 0;
+ goto end;
+ }
+
+ /*
+ * Send metadata stream to relayd if one available. Availability is
+ * known if the stream is still in the list of the channel.
+ */
+ if (cds_list_empty(&metadata->streams.head)) {
+ ERR("Metadata channel key %" PRIu64 ", no stream available.", key);
+ ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
+ goto error_no_stream;
+ }
+
+ /* Send metadata stream to relayd if needed. */
+ if (metadata->metadata_stream->net_seq_idx != (uint64_t) -1ULL) {
+ ret = consumer_send_relayd_stream(metadata->metadata_stream,
+ metadata->pathname);
+ if (ret < 0) {
+ ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
+ goto error;
+ }
+ }
+
+ ret = send_streams_to_thread(metadata, ctx);
+ if (ret < 0) {
+ /*
+ * If we are unable to send the stream to the thread, there is
+ * a big problem so just stop everything.
+ */
+ ret = LTTCOMM_CONSUMERD_FATAL;
+ goto error;
+ }
+ /* List MUST be empty after or else it could be reused. */
+ assert(cds_list_empty(&metadata->streams.head));
+
+ ret = 0;
+ goto end;
+
+error:
+ /*
+ * Delete metadata channel on error. At this point, the metadata stream can
+ * NOT be monitored by the metadata thread thus having the guarantee that
+ * the stream is still in the local stream list of the channel. This call
+ * will make sure to clean that list.
+ */
+ cds_list_del(&metadata->metadata_stream->send_node);
+ consumer_stream_destroy(metadata->metadata_stream, NULL);
+error_no_stream:
+end:
+ return ret;
+}
+
+/*
+ * Snapshot the whole metadata.