+end:
+ if (ret) {
+ if (stream->stream_fd) {
+ stream_fd_put(stream->stream_fd);
+ stream->stream_fd = NULL;
+ }
+ stream_put(stream);
+ stream = NULL;
+ }
+ if (acquired_reference) {
+ lttng_trace_chunk_put(current_trace_chunk);
+ }
+ return stream;
+
+error_no_alloc:
+ /*
+ * path_name and channel_name need to be freed explicitly here
+ * because we cannot rely on stream_put().
+ */
+ free(path_name);
+ free(channel_name);
+ return NULL;
+}
+
+/*
+ * Called with the session lock held.
+ */
+void stream_publish(struct relay_stream *stream)
+{
+ struct relay_session *session;
+
+ pthread_mutex_lock(&stream->lock);
+ if (stream->published) {
+ goto unlock;
+ }
+
+ session = stream->trace->session;
+
+ pthread_mutex_lock(&session->recv_list_lock);
+ if (stream->in_recv_list) {
+ cds_list_del_rcu(&stream->recv_node);
+ stream->in_recv_list = false;
+ }
+ pthread_mutex_unlock(&session->recv_list_lock);
+
+ pthread_mutex_lock(&stream->trace->stream_list_lock);
+ cds_list_add_rcu(&stream->stream_node, &stream->trace->stream_list);
+ pthread_mutex_unlock(&stream->trace->stream_list_lock);
+
+ stream->published = true;
+unlock:
+ pthread_mutex_unlock(&stream->lock);
+}
+
+/*
+ * Stream must be protected by holding the stream lock or by virtue of being
+ * called from stream_destroy.
+ */
+static void stream_unpublish(struct relay_stream *stream)
+{
+ if (stream->in_stream_ht) {
+ struct lttng_ht_iter iter;
+ int ret;
+
+ iter.iter.node = &stream->node.node;
+ ret = lttng_ht_del(relay_streams_ht, &iter);
+ assert(!ret);
+ stream->in_stream_ht = false;
+ }
+ if (stream->published) {
+ pthread_mutex_lock(&stream->trace->stream_list_lock);
+ cds_list_del_rcu(&stream->stream_node);
+ pthread_mutex_unlock(&stream->trace->stream_list_lock);
+ stream->published = false;
+ }
+}
+
+static void stream_destroy(struct relay_stream *stream)
+{
+ if (stream->indexes_ht) {
+ /*
+ * Calling lttng_ht_destroy in call_rcu worker thread so
+ * we don't hold the RCU read-side lock while calling
+ * it.
+ */
+ lttng_ht_destroy(stream->indexes_ht);
+ }
+ if (stream->tfa) {
+ tracefile_array_destroy(stream->tfa);
+ }
+ free(stream->path_name);
+ free(stream->channel_name);
+ free(stream);
+}
+
+static void stream_destroy_rcu(struct rcu_head *rcu_head)
+{
+ struct relay_stream *stream =
+ caa_container_of(rcu_head, struct relay_stream, rcu_node);
+
+ stream_destroy(stream);
+}
+
+/*
+ * No need to take stream->lock since this is only called on the final
+ * stream_put which ensures that a single thread may act on the stream.
+ */
+static void stream_release(struct urcu_ref *ref)
+{
+ struct relay_stream *stream =
+ caa_container_of(ref, struct relay_stream, ref);
+ struct relay_session *session;
+
+ session = stream->trace->session;
+
+ DBG("Releasing stream id %" PRIu64, stream->stream_handle);
+
+ pthread_mutex_lock(&session->recv_list_lock);
+ session->stream_count--;
+ if (stream->in_recv_list) {
+ cds_list_del_rcu(&stream->recv_node);
+ stream->in_recv_list = false;
+ }
+ pthread_mutex_unlock(&session->recv_list_lock);
+
+ stream_unpublish(stream);
+
+ if (stream->stream_fd) {
+ stream_fd_put(stream->stream_fd);
+ stream->stream_fd = NULL;
+ }
+ if (stream->index_file) {
+ lttng_index_file_put(stream->index_file);
+ stream->index_file = NULL;
+ }
+ if (stream->trace) {
+ ctf_trace_put(stream->trace);
+ stream->trace = NULL;
+ }
+ stream_complete_rotation(stream);
+ lttng_trace_chunk_put(stream->trace_chunk);
+ stream->trace_chunk = NULL;
+
+ call_rcu(&stream->rcu_node, stream_destroy_rcu);
+}
+
+void stream_put(struct relay_stream *stream)
+{
+ rcu_read_lock();
+ assert(stream->ref.refcount != 0);
+ /*
+ * Wait until we have processed all the stream packets before
+ * actually putting our last stream reference.
+ */
+ urcu_ref_put(&stream->ref, stream_release);
+ rcu_read_unlock();
+}
+
+int stream_set_pending_rotation(struct relay_stream *stream,
+ struct lttng_trace_chunk *next_trace_chunk,
+ uint64_t rotation_sequence_number)
+{
+ int ret = 0;
+ const struct relay_stream_rotation rotation = {
+ .data_rotated = false,
+ .index_rotated = false,
+ .packet_seq_num = rotation_sequence_number,
+ .prev_data_net_seq = -1ULL,
+ .next_trace_chunk = next_trace_chunk,
+ };
+
+ if (stream->ongoing_rotation.is_set) {
+ ERR("Attempted to set a pending rotation on a stream already being rotated (protocol error)");
+ ret = -1;
+ goto end;
+ }
+
+ if (next_trace_chunk) {
+ const bool reference_acquired =
+ lttng_trace_chunk_get(next_trace_chunk);
+
+ assert(reference_acquired);
+ }
+ LTTNG_OPTIONAL_SET(&stream->ongoing_rotation, rotation);
+
+ DBG("Setting pending rotation: stream_id = %" PRIu64
+ ", rotate_at_packet_seq_num = %" PRIu64,
+ stream->stream_handle, rotation_sequence_number);
+ if (stream->is_metadata) {
+ /*
+ * A metadata stream has no index; consider it already rotated.
+ */
+ stream->ongoing_rotation.value.index_rotated = true;
+ ret = stream_rotate_data_file(stream);
+ } else {
+ ret = try_rotate_stream_index(stream);
+ if (ret < 0) {
+ goto end;
+ }
+
+ ret = try_rotate_stream_data(stream);
+ if (ret < 0) {
+ goto end;
+ }
+ }
+end:
+ return ret;
+}
+
+void try_stream_close(struct relay_stream *stream)
+{
+ bool session_aborted;
+ struct relay_session *session = stream->trace->session;
+
+ DBG("Trying to close stream %" PRIu64, stream->stream_handle);
+
+ pthread_mutex_lock(&session->lock);
+ session_aborted = session->aborted;
+ pthread_mutex_unlock(&session->lock);
+
+ pthread_mutex_lock(&stream->lock);
+ /*
+ * Can be called concurently by connection close and reception of last
+ * pending data.
+ */
+ if (stream->closed) {
+ pthread_mutex_unlock(&stream->lock);
+ DBG("closing stream %" PRIu64 " aborted since it is already marked as closed", stream->stream_handle);
+ return;
+ }
+
+ stream->close_requested = true;
+
+ if (stream->last_net_seq_num == -1ULL) {
+ /*
+ * Handle connection close without explicit stream close
+ * command.
+ *
+ * We can be clever about indexes partially received in
+ * cases where we received the data socket part, but not
+ * the control socket part: since we're currently closing
+ * the stream on behalf of the control socket, we *know*
+ * there won't be any more control information for this
+ * socket. Therefore, we can destroy all indexes for
+ * which we have received only the file descriptor (from
+ * data socket). This takes care of consumerd crashes
+ * between sending the data and control information for
+ * a packet. Since those are sent in that order, we take
+ * care of consumerd crashes.
+ */
+ DBG("relay_index_close_partial_fd");
+ relay_index_close_partial_fd(stream);
+ /*
+ * Use the highest net_seq_num we currently have pending
+ * As end of stream indicator. Leave last_net_seq_num
+ * at -1ULL if we cannot find any index.
+ */
+ stream->last_net_seq_num = relay_index_find_last(stream);
+ DBG("Updating stream->last_net_seq_num to %" PRIu64, stream->last_net_seq_num);
+ /* Fall-through into the next check. */
+ }
+
+ if (stream->last_net_seq_num != -1ULL &&
+ ((int64_t) (stream->prev_data_seq - stream->last_net_seq_num)) < 0
+ && !session_aborted) {
+ /*
+ * Don't close since we still have data pending. This
+ * handles cases where an explicit close command has
+ * been received for this stream, and cases where the
+ * connection has been closed, and we are awaiting for
+ * index information from the data socket. It is
+ * therefore expected that all the index fd information
+ * we need has already been received on the control
+ * socket. Matching index information from data socket
+ * should be Expected Soon(TM).
+ *
+ * TODO: We should implement a timer to garbage collect
+ * streams after a timeout to be resilient against a
+ * consumerd implementation that would not match this
+ * expected behavior.
+ */
+ pthread_mutex_unlock(&stream->lock);
+ DBG("closing stream %" PRIu64 " aborted since it still has data pending", stream->stream_handle);
+ return;
+ }
+ /*
+ * We received all the indexes we can expect.
+ */
+ stream_unpublish(stream);
+ stream->closed = true;
+ /* Relay indexes are only used by the "consumer/sessiond" end. */
+ relay_index_close_all(stream);
+
+ /*
+ * If we are closed by an application exiting (per-pid buffers),
+ * we need to put our reference on the stream trace chunk right
+ * away, because otherwise still holding the reference on the
+ * trace chunk could allow a viewer stream (which holds a reference
+ * to the stream) to postpone destroy waiting for the chunk to cease
+ * to exist endlessly until the viewer is detached.
+ */
+
+ /* Put stream fd before put chunk. */
+ if (stream->stream_fd) {
+ stream_fd_put(stream->stream_fd);
+ stream->stream_fd = NULL;
+ }
+ if (stream->index_file) {
+ lttng_index_file_put(stream->index_file);
+ stream->index_file = NULL;
+ }
+ lttng_trace_chunk_put(stream->trace_chunk);
+ stream->trace_chunk = NULL;