- ret = consumer_handle_stream_before_relayd(stream, len);
- if (ret >= 0) {
- outfd = ret;
-
- /* Write metadata stream id before payload */
- if (stream->metadata_flag) {
- metadata_id = htobe64(stream->relayd_stream_id);
- do {
- ret = write(outfd, (void *) &metadata_id,
- sizeof(stream->relayd_stream_id));
- } while (ret < 0 && errno == EINTR);
- if (ret < 0) {
- PERROR("write metadata stream id");
- written = ret;
- goto end;
- }
- DBG("Metadata stream id %zu written before data",
- stream->relayd_stream_id);
- }
+ stream->chan = channel;
+
+error:
+ if (_alloc_ret) {
+ *_alloc_ret = alloc_ret;
+ }
+ return stream;
+}
+
+/*
+ * Send the given stream pointer to the corresponding thread.
+ *
+ * Returns 0 on success else a negative value.
+ */
+static int send_stream_to_thread(struct lttng_consumer_stream *stream,
+ struct lttng_consumer_local_data *ctx)
+{
+ int ret, stream_pipe;
+
+ /* Get the right pipe where the stream will be sent. */
+ if (stream->metadata_flag) {
+ stream_pipe = ctx->consumer_metadata_pipe[1];
+ } else {
+ stream_pipe = ctx->consumer_data_pipe[1];
+ }
+
+ do {
+ ret = write(stream_pipe, &stream, sizeof(stream));
+ } while (ret < 0 && errno == EINTR);
+ if (ret < 0) {
+ PERROR("Consumer write %s stream to pipe %d",
+ stream->metadata_flag ? "metadata" : "data", stream_pipe);
+ }
+
+ return ret;
+}
+
+/*
+ * Search for a relayd object related to the stream. If found, send the stream
+ * to the relayd.
+ *
+ * On success, returns 0 else a negative value.
+ */
+static int send_stream_to_relayd(struct lttng_consumer_stream *stream)
+{
+ int ret = 0;
+ struct consumer_relayd_sock_pair *relayd;
+
+ assert(stream);
+
+ relayd = consumer_find_relayd(stream->net_seq_idx);
+ if (relayd != NULL) {
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ /* Add stream on the relayd */
+ ret = relayd_add_stream(&relayd->control_sock, stream->name,
+ stream->chan->pathname, &stream->relayd_stream_id);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ if (ret < 0) {
+ goto error;