Fix: consumerd: use packet sequence number for rotation position
[lttng-tools.git] / src / common / consumer / consumer.c
index 00eb35670d7731c11048b2a6bbd7a9573cf538df..d81d0fc7a1da1bf0f57c3a670b216f95fdc7af68 100644 (file)
@@ -603,6 +603,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
        stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE;
        stream->index_file = NULL;
        stream->last_sequence_number = -1ULL;
+       stream->rotate_position = -1ULL;
        pthread_mutex_init(&stream->lock, NULL);
        pthread_mutex_init(&stream->metadata_timer_lock, NULL);
 
@@ -4002,7 +4003,7 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                        ht->hash_fct(&channel->key, lttng_ht_seed),
                        ht->match_fct, &channel->key, &iter.iter,
                        stream, node_channel_id.node) {
-               unsigned long consumed_pos;
+               unsigned long produced_pos = 0, consumed_pos = 0;
 
                health_code_update();
 
@@ -4015,65 +4016,78 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                        rotating_to_new_chunk = false;
                }
 
-               ret = lttng_consumer_sample_snapshot_positions(stream);
+               /*
+                * Active flush; has no effect if the production position
+                * is at a packet boundary.
+                */
+               ret = consumer_flush_buffer(stream, 1);
                if (ret < 0) {
-                       ERR("Failed to sample snapshot position during channel rotation");
+                       ERR("Failed to flush stream %" PRIu64 " during channel rotation",
+                                       stream->key);
                        goto end_unlock_stream;
                }
 
-               ret = lttng_consumer_get_produced_snapshot(stream,
-                               &stream->rotate_position);
-               if (ret < 0) {
-                       ERR("Failed to sample produced position during channel rotation");
+               ret = lttng_consumer_take_snapshot(stream);
+               if (ret < 0 && ret != -ENODATA && ret != -EAGAIN) {
+                       ERR("Failed to sample snapshot position during channel rotation");
                        goto end_unlock_stream;
                }
+               if (!ret) {
+                       ret = lttng_consumer_get_produced_snapshot(stream,
+                                       &produced_pos);
+                       if (ret < 0) {
+                               ERR("Failed to sample produced position during channel rotation");
+                               goto end_unlock_stream;
+                       }
 
-               lttng_consumer_get_consumed_snapshot(stream,
-                               &consumed_pos);
-               if (consumed_pos == stream->rotate_position) {
+                       ret = lttng_consumer_get_consumed_snapshot(stream,
+                                       &consumed_pos);
+                       if (ret < 0) {
+                               ERR("Failed to sample consumed position during channel rotation");
+                               goto end_unlock_stream;
+                       }
+               }
+               /*
+                * Align produced position on the start-of-packet boundary of the first
+                * packet going into the next trace chunk.
+                */
+               produced_pos = ALIGN_FLOOR(produced_pos, stream->max_sb_size);
+               if (consumed_pos == produced_pos) {
                        stream->rotate_ready = true;
                }
-
                /*
-                * Active flush; has no effect if the production position
-                * is at a packet boundary.
+                * The rotation position is based on the packet_seq_num of the
+                * packet following the last packet that was consumed for this
+                * stream, incremented by the offset between produced and
+                * consumed positions. This rotation position is a lower bound
+                * (inclusive) at which the next trace chunk starts. Since it
+                * is a lower bound, it is OK if the packet_seq_num does not
+                * correspond exactly to the same packet identified by the
+                * consumed_pos, which can happen in overwrite mode.
                 */
-               ret = consumer_flush_buffer(stream, 1);
-               if (ret < 0) {
-                       ERR("Failed to flush stream %" PRIu64 " during channel rotation",
+               if (stream->sequence_number_unavailable) {
+                       /*
+                        * Rotation should never be performed on a session which
+                        * interacts with a pre-2.8 lttng-modules, which does
+                        * not implement packet sequence number.
+                        */
+                       ERR("Failure to rotate stream %" PRIu64 ": sequence number unavailable",
                                        stream->key);
+                       ret = -1;
                        goto end_unlock_stream;
                }
+               stream->rotate_position = stream->last_sequence_number + 1 +
+                               ((produced_pos - consumed_pos) / stream->max_sb_size);
 
                if (!is_local_trace) {
                        /*
                         * The relay daemon control protocol expects a rotation
                         * position as "the sequence number of the first packet
-                        * _after_ the current trace chunk.
-                        *
-                        * At the moment when the positions of the buffers are
-                        * sampled, the production position does not necessarily
-                        * sit at a packet boundary. The 'active' flush
-                        * operation above will push the production position to
-                        * the next packet boundary _if_ it is not already
-                        * sitting at such a boundary.
-                        *
-                        * Assuming a current production position that is not
-                        * on the bound of a packet, the 'target' sequence
-                        * number is
-                        *   (consumed_pos / subbuffer_size) + 1
-                        * Note the '+ 1' to ensure the current packet is
-                        * part of the current trace chunk.
-                        *
-                        * However, if the production position is already at
-                        * a packet boundary, the '+ 1' is not necessary as the
-                        * last packet of the current chunk is already
-                        * 'complete'.
+                        * _after_ the current trace chunk".
                         */
                        const struct relayd_stream_rotation_position position = {
                                .stream_id = stream->relayd_stream_id,
-                               .rotate_at_seq_num = (stream->rotate_position / stream->max_sb_size) +
-                                       !!(stream->rotate_position % stream->max_sb_size),
+                               .rotate_at_seq_num = stream->rotate_position,
                        };
 
                        ret = lttng_dynamic_array_add_element(
@@ -4136,44 +4150,39 @@ end:
  */
 int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream)
 {
-       int ret;
-       unsigned long consumed_pos;
-
-       if (!stream->rotate_position && !stream->rotate_ready) {
-               ret = 0;
-               goto end;
-       }
-
        if (stream->rotate_ready) {
-               ret = 1;
-               goto end;
+               return 1;
        }
 
        /*
-        * If we don't have the rotate_ready flag, check the consumed position
-        * to determine if we need to rotate.
+        * If packet seq num is unavailable, it means we are interacting
+        * with a pre-2.8 lttng-modules which does not implement the
+        * sequence number. Rotation should never be used by sessiond in this
+        * scenario.
         */
-       ret = lttng_consumer_sample_snapshot_positions(stream);
-       if (ret < 0) {
-               ERR("Taking snapshot positions");
-               goto end;
+       if (stream->sequence_number_unavailable) {
+               ERR("Internal error: rotation used on stream %" PRIu64
+                               " with unavailable sequence number",
+                               stream->key);
+               return -1;
        }
 
-       ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos);
-       if (ret < 0) {
-               ERR("Consumed snapshot position");
-               goto end;
+       if (stream->rotate_position == -1ULL ||
+                       stream->last_sequence_number == -1ULL) {
+               return 0;
        }
 
-       /* Rotate position not reached yet (with check for overflow). */
-       if ((long) (consumed_pos - stream->rotate_position) < 0) {
-               ret = 0;
-               goto end;
+       /*
+        * Rotate position not reached yet. The stream rotate position is
+        * the position of the next packet belonging to the next trace chunk,
+        * but consumerd considers rotation ready when reaching the last
+        * packet of the current chunk, hence the "rotate_position - 1".
+        */
+       if (stream->last_sequence_number >= stream->rotate_position - 1) {
+               return 1;
        }
-       ret = 1;
 
-end:
-       return ret;
+       return 0;
 }
 
 /*
@@ -4181,7 +4190,7 @@ end:
  */
 void lttng_consumer_reset_stream_rotate_state(struct lttng_consumer_stream *stream)
 {
-       stream->rotate_position = 0;
+       stream->rotate_position = -1ULL;
        stream->rotate_ready = false;
 }
 
This page took 0.025491 seconds and 4 git commands to generate.