+int consumer_stream_sync_metadata(struct lttng_consumer_local_data *ctx,
+ uint64_t session_id)
+{
+ int ret;
+ struct lttng_consumer_stream *stream = NULL;
+ struct lttng_ht_iter iter;
+ struct lttng_ht *ht;
+
+ assert(ctx);
+
+ /* Ease our life a bit. */
+ ht = consumer_data.stream_list_ht;
+
+ rcu_read_lock();
+
+ /* Search the metadata associated with the session id of the given stream. */
+
+ cds_lfht_for_each_entry_duplicate(ht->ht,
+ ht->hash_fct(&session_id, lttng_ht_seed), ht->match_fct,
+ &session_id, &iter.iter, stream, node_session_id.node) {
+ if (!stream->metadata_flag) {
+ continue;
+ }
+
+ ret = do_sync_metadata(stream, ctx);
+ if (ret < 0) {
+ goto end;
+ }
+ }
+
+ /*
+ * Force return code to 0 (success) since ret might be ENODATA for instance
+ * which is not an error but rather that we should come back.
+ */
+ ret = 0;
+
+end:
+ rcu_read_unlock();
+ return ret;
+}
+
+static int consumer_stream_sync_metadata_index(
+ struct lttng_consumer_stream *stream,
+ const struct stream_subbuffer *subbuffer,
+ struct lttng_consumer_local_data *ctx)
+{
+ int ret;
+
+ /* Block until all the metadata is sent. */
+ pthread_mutex_lock(&stream->metadata_timer_lock);
+ assert(!stream->missed_metadata_flush);
+ stream->waiting_on_metadata = true;
+ pthread_mutex_unlock(&stream->metadata_timer_lock);
+
+ ret = consumer_stream_sync_metadata(ctx, stream->session_id);
+
+ pthread_mutex_lock(&stream->metadata_timer_lock);
+ stream->waiting_on_metadata = false;
+ if (stream->missed_metadata_flush) {
+ stream->missed_metadata_flush = false;
+ pthread_mutex_unlock(&stream->metadata_timer_lock);
+ (void) stream->read_subbuffer_ops.send_live_beacon(stream);
+ } else {
+ pthread_mutex_unlock(&stream->metadata_timer_lock);
+ }
+ if (ret < 0) {
+ goto end;
+ }
+
+ ret = consumer_stream_send_index(stream, subbuffer, ctx);
+end:
+ return ret;
+}
+
+/*
+ * Check if the local version of the metadata stream matches with the version
+ * of the metadata stream in the kernel. If it was updated, set the reset flag
+ * on the stream.
+ */
+static
+int metadata_stream_check_version(struct lttng_consumer_stream *stream,
+ const struct stream_subbuffer *subbuffer)
+{
+ if (stream->metadata_version == subbuffer->info.metadata.version) {
+ goto end;
+ }
+
+ DBG("New metadata version detected");
+ consumer_stream_metadata_set_version(stream,
+ subbuffer->info.metadata.version);
+
+ if (stream->read_subbuffer_ops.reset_metadata) {
+ stream->read_subbuffer_ops.reset_metadata(stream);
+ }
+
+end:
+ return 0;
+}
+
+static
+bool stream_is_rotating_to_null_chunk(
+ const struct lttng_consumer_stream *stream)
+{
+ bool rotating_to_null_chunk = false;
+
+ if (stream->rotate_position == -1ULL) {
+ /* No rotation ongoing. */
+ goto end;
+ }
+
+ if (stream->trace_chunk == stream->chan->trace_chunk ||
+ !stream->chan->trace_chunk) {
+ rotating_to_null_chunk = true;
+ }
+end:
+ return rotating_to_null_chunk;
+}
+
+enum consumer_stream_open_packet_status consumer_stream_open_packet(
+ struct lttng_consumer_stream *stream)
+{
+ int ret;
+ enum consumer_stream_open_packet_status status;
+ unsigned long produced_pos_before, produced_pos_after;
+
+ ret = lttng_consumer_sample_snapshot_positions(stream);
+ if (ret < 0) {
+ ERR("Failed to snapshot positions before post-rotation empty packet flush: stream id = %" PRIu64
+ ", channel name = %s, session id = %" PRIu64,
+ stream->key, stream->chan->name,
+ stream->chan->session_id);
+ status = CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR;
+ goto end;
+ }
+
+ ret = lttng_consumer_get_produced_snapshot(
+ stream, &produced_pos_before);
+ if (ret < 0) {
+ ERR("Failed to read produced position before post-rotation empty packet flush: stream id = %" PRIu64
+ ", channel name = %s, session id = %" PRIu64,
+ stream->key, stream->chan->name,
+ stream->chan->session_id);
+ status = CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR;
+ goto end;
+ }
+
+ ret = consumer_stream_flush_buffer(stream, 0);
+ if (ret) {
+ ERR("Failed to flush an empty packet at rotation point: stream id = %" PRIu64
+ ", channel name = %s, session id = %" PRIu64,
+ stream->key, stream->chan->name,
+ stream->chan->session_id);
+ status = CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR;
+ goto end;
+ }
+
+ ret = lttng_consumer_sample_snapshot_positions(stream);
+ if (ret < 0) {
+ ERR("Failed to snapshot positions after post-rotation empty packet flush: stream id = %" PRIu64
+ ", channel name = %s, session id = %" PRIu64,
+ stream->key, stream->chan->name,
+ stream->chan->session_id);
+ status = CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR;
+ goto end;
+ }
+
+ ret = lttng_consumer_get_produced_snapshot(stream, &produced_pos_after);
+ if (ret < 0) {
+ ERR("Failed to read produced position after post-rotation empty packet flush: stream id = %" PRIu64
+ ", channel name = %s, session id = %" PRIu64,
+ stream->key, stream->chan->name,
+ stream->chan->session_id);
+ status = CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR;
+ goto end;
+ }
+
+ /*
+ * Determine if the flush had an effect by comparing the produced
+ * positons before and after the flush.
+ */
+ status = produced_pos_before != produced_pos_after ?
+ CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED :
+ CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE;
+ if (status == CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED) {
+ stream->opened_packet_in_current_trace_chunk = true;
+ }
+
+end:
+ return status;
+}
+
+/*
+ * An attempt to open a new packet is performed after a rotation completes to
+ * get a begin timestamp as close as possible to the rotation point.
+ *
+ * However, that initial attempt at opening a packet can fail due to a full
+ * ring-buffer. In that case, a second attempt is performed after consuming
+ * a packet since that will have freed enough space in the ring-buffer.
+ */
+static
+int post_consume_open_new_packet(struct lttng_consumer_stream *stream,
+ const struct stream_subbuffer *subbuffer,
+ struct lttng_consumer_local_data *ctx)
+{
+ int ret = 0;
+
+ if (!stream->opened_packet_in_current_trace_chunk &&
+ stream->trace_chunk &&
+ !stream_is_rotating_to_null_chunk(stream)) {
+ const enum consumer_stream_open_packet_status status =
+ consumer_stream_open_packet(stream);
+
+ switch (status) {
+ case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
+ DBG("Opened a packet after consuming a packet rotation: stream id = %" PRIu64
+ ", channel name = %s, session id = %" PRIu64,
+ stream->key, stream->chan->name,
+ stream->chan->session_id);
+ stream->opened_packet_in_current_trace_chunk = true;
+ break;
+ case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE:
+ /*
+ * Can't open a packet as there is no space left.
+ * This means that new events were produced, resulting
+ * in a packet being opened, which is what we want
+ * anyhow.
+ */
+ DBG("No space left to open a packet after consuming a packet: stream id = %" PRIu64
+ ", channel name = %s, session id = %" PRIu64,
+ stream->key, stream->chan->name,
+ stream->chan->session_id);
+ stream->opened_packet_in_current_trace_chunk = true;
+ break;
+ case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
+ /* Logged by callee. */
+ ret = -1;
+ goto end;
+ default:
+ abort();
+ }
+
+ stream->opened_packet_in_current_trace_chunk = true;
+ }
+
+end:
+ return ret;
+}
+
+struct lttng_consumer_stream *consumer_stream_create(
+ struct lttng_consumer_channel *channel,
+ uint64_t channel_key,
+ uint64_t stream_key,
+ const char *channel_name,
+ uint64_t relayd_id,
+ uint64_t session_id,
+ struct lttng_trace_chunk *trace_chunk,
+ int cpu,
+ int *alloc_ret,
+ enum consumer_channel_type type,
+ unsigned int monitor)
+{
+ int ret;
+ struct lttng_consumer_stream *stream;
+
+ stream = zmalloc(sizeof(*stream));
+ if (stream == NULL) {
+ PERROR("malloc struct lttng_consumer_stream");
+ ret = -ENOMEM;
+ goto end;
+ }
+
+ if (trace_chunk && !lttng_trace_chunk_get(trace_chunk)) {
+ ERR("Failed to acquire trace chunk reference during the creation of a stream");
+ ret = -1;
+ goto error;
+ }
+
+ rcu_read_lock();
+ stream->chan = channel;
+ stream->key = stream_key;
+ stream->trace_chunk = trace_chunk;
+ stream->out_fd = -1;
+ stream->out_fd_offset = 0;
+ stream->output_written = 0;
+ stream->net_seq_idx = relayd_id;
+ stream->session_id = session_id;
+ stream->monitor = monitor;
+ stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE;
+ stream->index_file = NULL;
+ stream->last_sequence_number = -1ULL;
+ stream->rotate_position = -1ULL;
+ /* Buffer is created with an open packet. */
+ stream->opened_packet_in_current_trace_chunk = true;
+ pthread_mutex_init(&stream->lock, NULL);
+ pthread_mutex_init(&stream->metadata_timer_lock, NULL);
+
+ /* If channel is the metadata, flag this stream as metadata. */
+ if (type == CONSUMER_CHANNEL_TYPE_METADATA) {
+ stream->metadata_flag = 1;
+ /* Metadata is flat out. */
+ strncpy(stream->name, DEFAULT_METADATA_NAME, sizeof(stream->name));
+ /* Live rendez-vous point. */
+ pthread_cond_init(&stream->metadata_rdv, NULL);
+ pthread_mutex_init(&stream->metadata_rdv_lock, NULL);
+ } else {
+ /* Format stream name to <channel_name>_<cpu_number> */
+ ret = snprintf(stream->name, sizeof(stream->name), "%s_%d",
+ channel_name, cpu);
+ if (ret < 0) {
+ PERROR("snprintf stream name");
+ goto error;
+ }
+ }
+
+ switch (channel->output) {
+ case CONSUMER_CHANNEL_SPLICE:
+ stream->output = LTTNG_EVENT_SPLICE;
+ ret = utils_create_pipe(stream->splice_pipe);
+ if (ret < 0) {
+ goto error;
+ }
+ break;
+ case CONSUMER_CHANNEL_MMAP:
+ stream->output = LTTNG_EVENT_MMAP;
+ break;
+ default:
+ abort();
+ }
+
+ /* Key is always the wait_fd for streams. */
+ lttng_ht_node_init_u64(&stream->node, stream->key);
+
+ /* Init node per channel id key */
+ lttng_ht_node_init_u64(&stream->node_channel_id, channel_key);
+
+ /* Init session id node with the stream session id */
+ lttng_ht_node_init_u64(&stream->node_session_id, stream->session_id);
+
+ DBG3("Allocated stream %s (key %" PRIu64 ", chan_key %" PRIu64
+ " relayd_id %" PRIu64 ", session_id %" PRIu64,
+ stream->name, stream->key, channel_key,
+ stream->net_seq_idx, stream->session_id);
+
+ rcu_read_unlock();
+
+ lttng_dynamic_array_init(&stream->read_subbuffer_ops.post_consume_cbs,
+ sizeof(post_consume_cb), NULL);
+
+ if (type == CONSUMER_CHANNEL_TYPE_METADATA) {
+ stream->read_subbuffer_ops.lock =
+ consumer_stream_metadata_lock_all;
+ stream->read_subbuffer_ops.unlock =
+ consumer_stream_metadata_unlock_all;
+ stream->read_subbuffer_ops.pre_consume_subbuffer =
+ metadata_stream_check_version;
+ } else {
+ const post_consume_cb post_consume_index_op = channel->is_live ?
+ consumer_stream_sync_metadata_index :
+ consumer_stream_send_index;
+
+ ret = lttng_dynamic_array_add_element(
+ &stream->read_subbuffer_ops.post_consume_cbs,
+ &post_consume_index_op);
+ if (ret) {
+ PERROR("Failed to add `send index` callback to stream's post consumption callbacks");
+ goto error;
+ }