+static
+bool stream_is_rotating_to_null_chunk(
+ const struct lttng_consumer_stream *stream)
+{
+ bool rotating_to_null_chunk = false;
+
+ if (stream->rotate_position == -1ULL) {
+ /* No rotation ongoing. */
+ goto end;
+ }
+
+ if (stream->trace_chunk == stream->chan->trace_chunk ||
+ !stream->chan->trace_chunk) {
+ rotating_to_null_chunk = true;
+ }
+end:
+ return rotating_to_null_chunk;
+}
+
+enum consumer_stream_open_packet_status consumer_stream_open_packet(
+ struct lttng_consumer_stream *stream)
+{
+ int ret;
+ enum consumer_stream_open_packet_status status;
+ unsigned long produced_pos_before, produced_pos_after;
+
+ ret = lttng_consumer_sample_snapshot_positions(stream);
+ if (ret < 0) {
+ ERR("Failed to snapshot positions before post-rotation empty packet flush: stream id = %" PRIu64
+ ", channel name = %s, session id = %" PRIu64,
+ stream->key, stream->chan->name,
+ stream->chan->session_id);
+ status = CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR;
+ goto end;
+ }
+
+ ret = lttng_consumer_get_produced_snapshot(
+ stream, &produced_pos_before);
+ if (ret < 0) {
+ ERR("Failed to read produced position before post-rotation empty packet flush: stream id = %" PRIu64
+ ", channel name = %s, session id = %" PRIu64,
+ stream->key, stream->chan->name,
+ stream->chan->session_id);
+ status = CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR;
+ goto end;
+ }
+
+ ret = consumer_stream_flush_buffer(stream, 0);
+ if (ret) {
+ ERR("Failed to flush an empty packet at rotation point: stream id = %" PRIu64
+ ", channel name = %s, session id = %" PRIu64,
+ stream->key, stream->chan->name,
+ stream->chan->session_id);
+ status = CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR;
+ goto end;
+ }
+
+ ret = lttng_consumer_sample_snapshot_positions(stream);
+ if (ret < 0) {
+ ERR("Failed to snapshot positions after post-rotation empty packet flush: stream id = %" PRIu64
+ ", channel name = %s, session id = %" PRIu64,
+ stream->key, stream->chan->name,
+ stream->chan->session_id);
+ status = CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR;
+ goto end;
+ }
+
+ ret = lttng_consumer_get_produced_snapshot(stream, &produced_pos_after);
+ if (ret < 0) {
+ ERR("Failed to read produced position after post-rotation empty packet flush: stream id = %" PRIu64
+ ", channel name = %s, session id = %" PRIu64,
+ stream->key, stream->chan->name,
+ stream->chan->session_id);
+ status = CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR;
+ goto end;
+ }
+
+ /*
+ * Determine if the flush had an effect by comparing the produced
+ * positons before and after the flush.
+ */
+ status = produced_pos_before != produced_pos_after ?
+ CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED :
+ CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE;
+ if (status == CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED) {
+ stream->opened_packet_in_current_trace_chunk = true;
+ }
+
+end:
+ return status;
+}
+
+/*
+ * An attempt to open a new packet is performed after a rotation completes to
+ * get a begin timestamp as close as possible to the rotation point.
+ *
+ * However, that initial attempt at opening a packet can fail due to a full
+ * ring-buffer. In that case, a second attempt is performed after consuming
+ * a packet since that will have freed enough space in the ring-buffer.
+ */
+static
+int post_consume_open_new_packet(struct lttng_consumer_stream *stream,
+ const struct stream_subbuffer *subbuffer,
+ struct lttng_consumer_local_data *ctx)
+{
+ int ret = 0;
+
+ if (!stream->opened_packet_in_current_trace_chunk &&
+ stream->trace_chunk &&
+ !stream_is_rotating_to_null_chunk(stream)) {
+ const enum consumer_stream_open_packet_status status =
+ consumer_stream_open_packet(stream);
+
+ switch (status) {
+ case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
+ DBG("Opened a packet after consuming a packet rotation: stream id = %" PRIu64
+ ", channel name = %s, session id = %" PRIu64,
+ stream->key, stream->chan->name,
+ stream->chan->session_id);
+ stream->opened_packet_in_current_trace_chunk = true;
+ break;
+ case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE:
+ /*
+ * Can't open a packet as there is no space left.
+ * This means that new events were produced, resulting
+ * in a packet being opened, which is what we want
+ * anyhow.
+ */
+ DBG("No space left to open a packet after consuming a packet: stream id = %" PRIu64
+ ", channel name = %s, session id = %" PRIu64,
+ stream->key, stream->chan->name,
+ stream->chan->session_id);
+ stream->opened_packet_in_current_trace_chunk = true;
+ break;
+ case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
+ /* Logged by callee. */
+ ret = -1;
+ goto end;
+ default:
+ abort();
+ }
+
+ stream->opened_packet_in_current_trace_chunk = true;
+ }
+
+end:
+ return ret;
+}
+