+ /*
+ * Lock stream because we are about to change its state.
+ */
+ pthread_mutex_lock(&stream->lock);
+
+ if (stream->trace_chunk == stream->chan->trace_chunk) {
+ rotating_to_new_chunk = false;
+ }
+
+ /*
+ * Do not flush an empty packet when rotating from a NULL trace
+ * chunk. The stream has no means to output data, and the prior
+ * rotation which rotated to NULL performed that side-effect already.
+ */
+ if (stream->trace_chunk) {
+ /*
+ * For metadata stream, do an active flush, which does not
+ * produce empty packets. For data streams, empty-flush;
+ * ensures we have at least one packet in each stream per trace
+ * chunk, even if no data was produced.
+ */
+ ret = consumer_flush_buffer(stream, stream->metadata_flag ? 1 : 0);
+ if (ret < 0) {
+ ERR("Failed to flush stream %" PRIu64 " during channel rotation",
+ stream->key);
+ goto end_unlock_stream;
+ }
+ }
+
+ ret = lttng_consumer_take_snapshot(stream);
+ if (ret < 0 && ret != -ENODATA && ret != -EAGAIN) {
+ ERR("Failed to sample snapshot position during channel rotation");
+ goto end_unlock_stream;
+ }
+ if (!ret) {
+ ret = lttng_consumer_get_produced_snapshot(stream,
+ &produced_pos);
+ if (ret < 0) {
+ ERR("Failed to sample produced position during channel rotation");
+ goto end_unlock_stream;
+ }
+
+ ret = lttng_consumer_get_consumed_snapshot(stream,
+ &consumed_pos);
+ if (ret < 0) {
+ ERR("Failed to sample consumed position during channel rotation");
+ goto end_unlock_stream;
+ }
+ }
+ /*
+ * Align produced position on the start-of-packet boundary of the first
+ * packet going into the next trace chunk.
+ */
+ produced_pos = ALIGN_FLOOR(produced_pos, stream->max_sb_size);
+ if (consumed_pos == produced_pos) {
+ DBG("Set rotate ready for stream %" PRIu64 " produced = %lu consumed = %lu",
+ stream->key, produced_pos, consumed_pos);
+ stream->rotate_ready = true;
+ } else {
+ DBG("Different consumed and produced positions "
+ "for stream %" PRIu64 " produced = %lu consumed = %lu",
+ stream->key, produced_pos, consumed_pos);
+ }
+ /*
+ * The rotation position is based on the packet_seq_num of the
+ * packet following the last packet that was consumed for this
+ * stream, incremented by the offset between produced and
+ * consumed positions. This rotation position is a lower bound
+ * (inclusive) at which the next trace chunk starts. Since it
+ * is a lower bound, it is OK if the packet_seq_num does not
+ * correspond exactly to the same packet identified by the
+ * consumed_pos, which can happen in overwrite mode.
+ */
+ if (stream->sequence_number_unavailable) {
+ /*
+ * Rotation should never be performed on a session which
+ * interacts with a pre-2.8 lttng-modules, which does
+ * not implement packet sequence number.
+ */
+ ERR("Failure to rotate stream %" PRIu64 ": sequence number unavailable",
+ stream->key);
+ ret = -1;
+ goto end_unlock_stream;
+ }
+ stream->rotate_position = stream->last_sequence_number + 1 +
+ ((produced_pos - consumed_pos) / stream->max_sb_size);
+ DBG("Set rotation position for stream %" PRIu64 " at position %" PRIu64,
+ stream->key, stream->rotate_position);
+
+ if (!is_local_trace) {
+ /*
+ * The relay daemon control protocol expects a rotation
+ * position as "the sequence number of the first packet
+ * _after_ the current trace chunk".
+ */
+ const struct relayd_stream_rotation_position position = {
+ .stream_id = stream->relayd_stream_id,
+ .rotate_at_seq_num = stream->rotate_position,
+ };
+
+ ret = lttng_dynamic_array_add_element(
+ &stream_rotation_positions,
+ &position);
+ if (ret) {
+ ERR("Failed to allocate stream rotation position");
+ goto end_unlock_stream;
+ }
+ stream_count++;
+ }
+ pthread_mutex_unlock(&stream->lock);
+ }
+ stream = NULL;
+ pthread_mutex_unlock(&channel->lock);
+
+ if (is_local_trace) {
+ ret = 0;
+ goto end;
+ }
+
+ relayd = consumer_find_relayd(relayd_id);
+ if (!relayd) {
+ ERR("Failed to find relayd %" PRIu64, relayd_id);
+ ret = -1;
+ goto end;
+ }
+
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ ret = relayd_rotate_streams(&relayd->control_sock, stream_count,
+ rotating_to_new_chunk ? &next_chunk_id : NULL,
+ (const struct relayd_stream_rotation_position *)
+ stream_rotation_positions.buffer.data);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);