+ /*
+ * Do not flush a packet when rotating from a NULL trace
+ * chunk. The stream has no means to output data, and the prior
+ * rotation which rotated to NULL performed that side-effect
+ * already. No new data can be produced when a stream has no
+ * associated trace chunk (e.g. a stop followed by a rotate).
+ */
+ if (stream->trace_chunk) {
+ bool flush_active;
+
+ if (stream->metadata_flag) {
+ /*
+ * Don't produce an empty metadata packet,
+ * simply close the current one.
+ *
+ * Metadata is regenerated on every trace chunk
+ * switch; there is no concern that no data was
+ * produced.
+ */
+ flush_active = true;
+ } else {
+ /*
+ * Only flush an empty packet if the "packet
+ * open" could not be performed on transition
+ * to a new trace chunk and no packets were
+ * consumed within the chunk's lifetime.
+ */
+ if (stream->opened_packet_in_current_trace_chunk) {
+ flush_active = true;
+ } else {
+ /*
+ * Stream could have been full at the
+ * time of rotation, but then have had
+ * no activity at all.
+ *
+ * It is important to flush a packet
+ * to prevent 0-length files from being
+ * produced as most viewers choke on
+ * them.
+ *
+ * Unfortunately viewers will not be
+ * able to know that tracing was active
+ * for this stream during this trace
+ * chunk's lifetime.
+ */
+ ret = sample_stream_positions(stream, &produced_pos, &consumed_pos);
+ if (ret) {
+ goto end_unlock_stream;
+ }
+
+ /*
+ * Don't flush an empty packet if data
+ * was produced; it will be consumed
+ * before the rotation completes.
+ */
+ flush_active = produced_pos != consumed_pos;
+ if (!flush_active) {
+ enum lttng_trace_chunk_status chunk_status;
+ const char *trace_chunk_name;
+ uint64_t trace_chunk_id;
+
+ chunk_status = lttng_trace_chunk_get_name(
+ stream->trace_chunk,
+ &trace_chunk_name,
+ NULL);
+ if (chunk_status == LTTNG_TRACE_CHUNK_STATUS_NONE) {
+ trace_chunk_name = "none";
+ }
+
+ /*
+ * Consumer trace chunks are
+ * never anonymous.
+ */
+ chunk_status = lttng_trace_chunk_get_id(
+ stream->trace_chunk,
+ &trace_chunk_id);
+ assert(chunk_status ==
+ LTTNG_TRACE_CHUNK_STATUS_OK);
+
+ DBG("Unable to open packet for stream during trace chunk's lifetime. "
+ "Flushing an empty packet to prevent an empty file from being created: "
+ "stream id = %" PRIu64 ", trace chunk name = `%s`, trace chunk id = %" PRIu64,
+ stream->key, trace_chunk_name, trace_chunk_id);
+ }
+ }
+ }
+
+ /*
+ * Close the current packet before sampling the
+ * ring buffer positions.
+ */
+ ret = consumer_stream_flush_buffer(stream, flush_active);
+ if (ret < 0) {
+ ERR("Failed to flush stream %" PRIu64 " during channel rotation",
+ stream->key);
+ goto end_unlock_stream;
+ }
+ }
+
+ ret = lttng_consumer_take_snapshot(stream);
+ if (ret < 0 && ret != -ENODATA && ret != -EAGAIN) {
+ ERR("Failed to sample snapshot position during channel rotation");
+ goto end_unlock_stream;
+ }
+ if (!ret) {
+ ret = lttng_consumer_get_produced_snapshot(stream,
+ &produced_pos);
+ if (ret < 0) {
+ ERR("Failed to sample produced position during channel rotation");
+ goto end_unlock_stream;
+ }
+
+ ret = lttng_consumer_get_consumed_snapshot(stream,
+ &consumed_pos);
+ if (ret < 0) {
+ ERR("Failed to sample consumed position during channel rotation");
+ goto end_unlock_stream;
+ }
+ }
+ /*
+ * Align produced position on the start-of-packet boundary of the first
+ * packet going into the next trace chunk.
+ */
+ produced_pos = ALIGN_FLOOR(produced_pos, stream->max_sb_size);
+ if (consumed_pos == produced_pos) {
+ DBG("Set rotate ready for stream %" PRIu64 " produced = %lu consumed = %lu",
+ stream->key, produced_pos, consumed_pos);
+ stream->rotate_ready = true;
+ } else {
+ DBG("Different consumed and produced positions "
+ "for stream %" PRIu64 " produced = %lu consumed = %lu",
+ stream->key, produced_pos, consumed_pos);
+ }
+ /*
+ * The rotation position is based on the packet_seq_num of the
+ * packet following the last packet that was consumed for this
+ * stream, incremented by the offset between produced and
+ * consumed positions. This rotation position is a lower bound
+ * (inclusive) at which the next trace chunk starts. Since it
+ * is a lower bound, it is OK if the packet_seq_num does not
+ * correspond exactly to the same packet identified by the
+ * consumed_pos, which can happen in overwrite mode.
+ */
+ if (stream->sequence_number_unavailable) {
+ /*
+ * Rotation should never be performed on a session which
+ * interacts with a pre-2.8 lttng-modules, which does
+ * not implement packet sequence number.
+ */
+ ERR("Failure to rotate stream %" PRIu64 ": sequence number unavailable",
+ stream->key);
+ ret = -1;
+ goto end_unlock_stream;
+ }
+ stream->rotate_position = stream->last_sequence_number + 1 +
+ ((produced_pos - consumed_pos) / stream->max_sb_size);
+ DBG("Set rotation position for stream %" PRIu64 " at position %" PRIu64,
+ stream->key, stream->rotate_position);
+
+ if (!is_local_trace) {
+ /*
+ * The relay daemon control protocol expects a rotation
+ * position as "the sequence number of the first packet
+ * _after_ the current trace chunk".
+ */
+ const struct relayd_stream_rotation_position position = {
+ .stream_id = stream->relayd_stream_id,
+ .rotate_at_seq_num = stream->rotate_position,
+ };
+
+ ret = lttng_dynamic_array_add_element(
+ &stream_rotation_positions,
+ &position);
+ if (ret) {
+ ERR("Failed to allocate stream rotation position");
+ goto end_unlock_stream;
+ }
+ stream_count++;
+ }
+
+ stream->opened_packet_in_current_trace_chunk = false;
+
+ if (rotating_to_new_chunk && !stream->metadata_flag) {
+ /*
+ * Attempt to flush an empty packet as close to the
+ * rotation point as possible. In the event where a
+ * stream remains inactive after the rotation point,
+ * this ensures that the new trace chunk has a
+ * beginning timestamp set at the begining of the
+ * trace chunk instead of only creating an empty
+ * packet when the trace chunk is stopped.
+ *
+ * This indicates to the viewers that the stream
+ * was being recorded, but more importantly it
+ * allows viewers to determine a useable trace
+ * intersection.
+ *
+ * This presents a problem in the case where the
+ * ring-buffer is completely full.
+ *
+ * Consider the following scenario:
+ * - The consumption of data is slow (slow network,
+ * for instance),
+ * - The ring buffer is full,
+ * - A rotation is initiated,
+ * - The flush below does nothing (no space left to
+ * open a new packet),
+ * - The other streams rotate very soon, and new
+ * data is produced in the new chunk,
+ * - This stream completes its rotation long after the
+ * rotation was initiated
+ * - The session is stopped before any event can be
+ * produced in this stream's buffers.
+ *
+ * The resulting trace chunk will have a single packet
+ * temporaly at the end of the trace chunk for this
+ * stream making the stream intersection more narrow
+ * than it should be.
+ *
+ * To work-around this, an empty flush is performed
+ * after the first consumption of a packet during a
+ * rotation if open_packet fails. The idea is that
+ * consuming a packet frees enough space to switch
+ * packets in this scenario and allows the tracer to
+ * "stamp" the beginning of the new trace chunk at the
+ * earliest possible point.
+ *
+ * The packet open is performed after the channel
+ * rotation to ensure that no attempt to open a packet
+ * is performed in a stream that has no active trace
+ * chunk.
+ */
+ ret = lttng_dynamic_pointer_array_add_pointer(
+ &streams_packet_to_open, stream);
+ if (ret) {
+ PERROR("Failed to add a stream pointer to array of streams in which to open a packet");
+ ret = -1;
+ goto end_unlock_stream;
+ }
+ }