Fix: consumerd: use packet sequence number for rotation position
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Tue, 5 Nov 2019 18:07:44 +0000 (13:07 -0500)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Thu, 19 Dec 2019 22:12:27 +0000 (17:12 -0500)
Refer to "Fix: relayd: use packet sequence number for rotation position"
for context of this change.

This commit introduces the changes required in the consumerd.

Some notable points related to this commit:

- Internally, the rotate_position (per-stream) is now a 64-bit
  value rather than an unsigned long.
- The scheme to rotate a stream is changed to allow using the
  backward-compatible lttng_consumer_take_snapshot() rather than
  the newer lttng_consumer_get_produced_snapshot(), thus allowing
  backward compatibility of the implicit rotation on destroy with
  pre-2.10 lttng-modules.
- The rotate position used as pivot point for the rotation is
  based on the packet_seq_num of the last packet that has been
  send over the network by consumerd, incremented by the number of
  packets between the sampled produced_pos and the consumed_pos.
  In the worse case scenario where an overwrite mode ring buffer
  overwrites its contents enough to trigger a 4GB overflow on a
  32-bit producer since the last packet was sent (e.g. due to a
  slow network), the difference between produced_pos and
  consumed_pos will be lower that what would have been expected.
  However, because this pivot position is used as a lower bound,
  being smaller than the real value is fine: the data that would
  have been misplaced in the wrong trace chunk were actually
  overwritten, and will therefore never be consumed.
- When interacting with pre-2.8 lttng-modules, the packet sequence
  number is not available. The current approach is to disallow
  rotations performed on sessions which have kernel tracing active
  with a pre-2.8 lttng-modules.

Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
Change-Id: I8600cb5e2e9c05f3dfba0499a5fc4a3bb85dec24
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
include/lttng/lttng-error.h
src/bin/lttng-sessiond/cmd.c
src/bin/lttng-sessiond/kernel.c
src/bin/lttng-sessiond/kernel.h
src/common/consumer/consumer.c
src/common/consumer/consumer.h
src/common/error.c
src/common/kernel-consumer/kernel-consumer.c

index c6e1575e78270e7d29c2ad7afc339cc713b41a90..c032d5a7d3757a37c0d2a1e338467f20c29dd137 100644 (file)
@@ -175,6 +175,7 @@ enum lttng_error_code {
        LTTNG_ERR_INVALID_PROTOCOL                     = 152, /* a protocol error occurred */
        LTTNG_ERR_FILE_CREATION_ERROR                  = 153, /* failed to create a file */
        LTTNG_ERR_TIMER_STOP_ERROR                     = 154, /* failed to stop timer. */
+       LTTNG_ERR_ROTATION_NOT_AVAILABLE_KERNEL = 155, /* Rotation feature not supported by the kernel tracer. */
 
        /* MUST be last element */
        LTTNG_ERR_NR,                           /* Last element */
index e067cb835e1a2739dfa35f83ce32a3d3c59b586a..02a92be1e07e45ba71e4c767ebca0f6a18050ec9 100644 (file)
@@ -4907,6 +4907,12 @@ int cmd_rotate_session(struct ltt_session *session,
                goto end;
        }
 
+       /* Unsupported feature in lttng-modules before 2.8 (lack of sequence number). */
+       if (session->kernel_session && !kernel_supports_ring_buffer_packet_sequence_number()) {
+               cmd_ret = LTTNG_ERR_ROTATION_NOT_AVAILABLE_KERNEL;
+               goto end;
+       }
+
        if (session->rotation_state == LTTNG_ROTATION_STATE_ONGOING) {
                DBG("Refusing to launch a rotation; a rotation is already in progress for session %s",
                                session->name);
index 76162e08c75b3b15b226f02cc4f2625519e3d064..b5c4a5e1267b5ae5fbe95cd70723ed2afb5710ab 100644 (file)
@@ -1425,6 +1425,37 @@ error:
        return ret;
 }
 
+/*
+ * Check for the support of the packet sequence number via abi version number.
+ *
+ * Return 1 on success, 0 when feature is not supported, negative value in case
+ * of errors.
+ */
+int kernel_supports_ring_buffer_packet_sequence_number(void)
+{
+       int ret = 0; // Not supported by default
+       struct lttng_kernel_tracer_abi_version abi;
+
+       ret = kernctl_tracer_abi_version(kernel_tracer_fd, &abi);
+       if (ret < 0) {
+               ERR("Failed to retrieve lttng-modules ABI version");
+               goto error;
+       }
+
+       /*
+        * Packet sequence number was introduced in 2.8
+        */
+       if (abi.major >= 2 && abi.minor >= 8) {
+               /* Supported */
+               ret = 1;
+       } else {
+               /* Not supported */
+               ret = 0;
+       }
+error:
+       return ret;
+}
+
 /*
  * Rotate a kernel session.
  *
index 0f1abd1d66eebeb706aee22eb982c225b8bbe9dd..fe14589b7f3e0f3ff637858857d1bc7f8ca6b671 100644 (file)
@@ -70,6 +70,7 @@ int init_kernel_workarounds(void);
 ssize_t kernel_list_tracker_pids(struct ltt_kernel_session *session,
                int **_pids);
 int kernel_supports_ring_buffer_snapshot_sample_positions(void);
+int kernel_supports_ring_buffer_packet_sequence_number(void);
 int init_kernel_tracer(void);
 void cleanup_kernel_tracer(void);
 bool kernel_tracer_is_initialized(void);
index 00eb35670d7731c11048b2a6bbd7a9573cf538df..d81d0fc7a1da1bf0f57c3a670b216f95fdc7af68 100644 (file)
@@ -603,6 +603,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
        stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE;
        stream->index_file = NULL;
        stream->last_sequence_number = -1ULL;
+       stream->rotate_position = -1ULL;
        pthread_mutex_init(&stream->lock, NULL);
        pthread_mutex_init(&stream->metadata_timer_lock, NULL);
 
@@ -4002,7 +4003,7 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                        ht->hash_fct(&channel->key, lttng_ht_seed),
                        ht->match_fct, &channel->key, &iter.iter,
                        stream, node_channel_id.node) {
-               unsigned long consumed_pos;
+               unsigned long produced_pos = 0, consumed_pos = 0;
 
                health_code_update();
 
@@ -4015,65 +4016,78 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                        rotating_to_new_chunk = false;
                }
 
-               ret = lttng_consumer_sample_snapshot_positions(stream);
+               /*
+                * Active flush; has no effect if the production position
+                * is at a packet boundary.
+                */
+               ret = consumer_flush_buffer(stream, 1);
                if (ret < 0) {
-                       ERR("Failed to sample snapshot position during channel rotation");
+                       ERR("Failed to flush stream %" PRIu64 " during channel rotation",
+                                       stream->key);
                        goto end_unlock_stream;
                }
 
-               ret = lttng_consumer_get_produced_snapshot(stream,
-                               &stream->rotate_position);
-               if (ret < 0) {
-                       ERR("Failed to sample produced position during channel rotation");
+               ret = lttng_consumer_take_snapshot(stream);
+               if (ret < 0 && ret != -ENODATA && ret != -EAGAIN) {
+                       ERR("Failed to sample snapshot position during channel rotation");
                        goto end_unlock_stream;
                }
+               if (!ret) {
+                       ret = lttng_consumer_get_produced_snapshot(stream,
+                                       &produced_pos);
+                       if (ret < 0) {
+                               ERR("Failed to sample produced position during channel rotation");
+                               goto end_unlock_stream;
+                       }
 
-               lttng_consumer_get_consumed_snapshot(stream,
-                               &consumed_pos);
-               if (consumed_pos == stream->rotate_position) {
+                       ret = lttng_consumer_get_consumed_snapshot(stream,
+                                       &consumed_pos);
+                       if (ret < 0) {
+                               ERR("Failed to sample consumed position during channel rotation");
+                               goto end_unlock_stream;
+                       }
+               }
+               /*
+                * Align produced position on the start-of-packet boundary of the first
+                * packet going into the next trace chunk.
+                */
+               produced_pos = ALIGN_FLOOR(produced_pos, stream->max_sb_size);
+               if (consumed_pos == produced_pos) {
                        stream->rotate_ready = true;
                }
-
                /*
-                * Active flush; has no effect if the production position
-                * is at a packet boundary.
+                * The rotation position is based on the packet_seq_num of the
+                * packet following the last packet that was consumed for this
+                * stream, incremented by the offset between produced and
+                * consumed positions. This rotation position is a lower bound
+                * (inclusive) at which the next trace chunk starts. Since it
+                * is a lower bound, it is OK if the packet_seq_num does not
+                * correspond exactly to the same packet identified by the
+                * consumed_pos, which can happen in overwrite mode.
                 */
-               ret = consumer_flush_buffer(stream, 1);
-               if (ret < 0) {
-                       ERR("Failed to flush stream %" PRIu64 " during channel rotation",
+               if (stream->sequence_number_unavailable) {
+                       /*
+                        * Rotation should never be performed on a session which
+                        * interacts with a pre-2.8 lttng-modules, which does
+                        * not implement packet sequence number.
+                        */
+                       ERR("Failure to rotate stream %" PRIu64 ": sequence number unavailable",
                                        stream->key);
+                       ret = -1;
                        goto end_unlock_stream;
                }
+               stream->rotate_position = stream->last_sequence_number + 1 +
+                               ((produced_pos - consumed_pos) / stream->max_sb_size);
 
                if (!is_local_trace) {
                        /*
                         * The relay daemon control protocol expects a rotation
                         * position as "the sequence number of the first packet
-                        * _after_ the current trace chunk.
-                        *
-                        * At the moment when the positions of the buffers are
-                        * sampled, the production position does not necessarily
-                        * sit at a packet boundary. The 'active' flush
-                        * operation above will push the production position to
-                        * the next packet boundary _if_ it is not already
-                        * sitting at such a boundary.
-                        *
-                        * Assuming a current production position that is not
-                        * on the bound of a packet, the 'target' sequence
-                        * number is
-                        *   (consumed_pos / subbuffer_size) + 1
-                        * Note the '+ 1' to ensure the current packet is
-                        * part of the current trace chunk.
-                        *
-                        * However, if the production position is already at
-                        * a packet boundary, the '+ 1' is not necessary as the
-                        * last packet of the current chunk is already
-                        * 'complete'.
+                        * _after_ the current trace chunk".
                         */
                        const struct relayd_stream_rotation_position position = {
                                .stream_id = stream->relayd_stream_id,
-                               .rotate_at_seq_num = (stream->rotate_position / stream->max_sb_size) +
-                                       !!(stream->rotate_position % stream->max_sb_size),
+                               .rotate_at_seq_num = stream->rotate_position,
                        };
 
                        ret = lttng_dynamic_array_add_element(
@@ -4136,44 +4150,39 @@ end:
  */
 int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream)
 {
-       int ret;
-       unsigned long consumed_pos;
-
-       if (!stream->rotate_position && !stream->rotate_ready) {
-               ret = 0;
-               goto end;
-       }
-
        if (stream->rotate_ready) {
-               ret = 1;
-               goto end;
+               return 1;
        }
 
        /*
-        * If we don't have the rotate_ready flag, check the consumed position
-        * to determine if we need to rotate.
+        * If packet seq num is unavailable, it means we are interacting
+        * with a pre-2.8 lttng-modules which does not implement the
+        * sequence number. Rotation should never be used by sessiond in this
+        * scenario.
         */
-       ret = lttng_consumer_sample_snapshot_positions(stream);
-       if (ret < 0) {
-               ERR("Taking snapshot positions");
-               goto end;
+       if (stream->sequence_number_unavailable) {
+               ERR("Internal error: rotation used on stream %" PRIu64
+                               " with unavailable sequence number",
+                               stream->key);
+               return -1;
        }
 
-       ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos);
-       if (ret < 0) {
-               ERR("Consumed snapshot position");
-               goto end;
+       if (stream->rotate_position == -1ULL ||
+                       stream->last_sequence_number == -1ULL) {
+               return 0;
        }
 
-       /* Rotate position not reached yet (with check for overflow). */
-       if ((long) (consumed_pos - stream->rotate_position) < 0) {
-               ret = 0;
-               goto end;
+       /*
+        * Rotate position not reached yet. The stream rotate position is
+        * the position of the next packet belonging to the next trace chunk,
+        * but consumerd considers rotation ready when reaching the last
+        * packet of the current chunk, hence the "rotate_position - 1".
+        */
+       if (stream->last_sequence_number >= stream->rotate_position - 1) {
+               return 1;
        }
-       ret = 1;
 
-end:
-       return ret;
+       return 0;
 }
 
 /*
@@ -4181,7 +4190,7 @@ end:
  */
 void lttng_consumer_reset_stream_rotate_state(struct lttng_consumer_stream *stream)
 {
-       stream->rotate_position = 0;
+       stream->rotate_position = -1ULL;
        stream->rotate_ready = false;
 }
 
index 72c580eb2e1be87894d98a4c40e83a09ce07b8a5..17c3ee581bf2ff079439d375337031df2b557195 100644 (file)
@@ -304,6 +304,11 @@ struct lttng_consumer_stream {
         */
        bool quiescent;
 
+       /*
+        * True if the sequence number is not available (lttng-modules < 2.8).
+        */
+       bool sequence_number_unavailable;
+
        /*
         * metadata_timer_lock protects flags waiting_on_metadata and
         * missed_metadata_flush.
@@ -438,12 +443,12 @@ struct lttng_consumer_stream {
        pthread_mutex_t metadata_rdv_lock;
 
        /*
-        * rotate_position represents the position in the ring-buffer that has to
-        * be flushed to disk to complete the ongoing rotation. When that position
-        * is reached, this tracefile can be closed and a new one is created in
-        * channel_read_only_attributes.path.
+        * rotate_position represents the packet sequence number of the last
+        * packet which belongs to the current trace chunk prior to the rotation.
+        * When that position is reached, this tracefile can be closed and a
+        * new one is created in channel_read_only_attributes.path.
         */
-       unsigned long rotate_position;
+       uint64_t rotate_position;
 
        /*
         * Read-only copies of channel values. We cannot safely access the
index 66fd8642f01c4813ec383b9a9769732f6754c917..cb24584ae33d44351819a1886069a59049783508 100644 (file)
@@ -220,6 +220,7 @@ static const char *error_string_array[] = {
        [ ERROR_INDEX(LTTNG_ERR_INVALID_PROTOCOL) ] = "Protocol error occurred",
        [ ERROR_INDEX(LTTNG_ERR_FILE_CREATION_ERROR) ] = "Failed to create file",
        [ ERROR_INDEX(LTTNG_ERR_TIMER_STOP_ERROR) ] = "Failed to stop a timer",
+       [ ERROR_INDEX(LTTNG_ERR_ROTATION_NOT_AVAILABLE_KERNEL) ] = "Rotation feature not supported by the kernel tracer.",
 
        /* Last element */
        [ ERROR_INDEX(LTTNG_ERR_NR) ] = "Unknown error code"
index fdd9ca65b05e8617299ee5909f11e886f8679a37..2cd9704442fe468829d4d0e79d9e1cba394edddd 100644 (file)
@@ -1445,6 +1445,7 @@ int update_stream_stats(struct lttng_consumer_stream *stream)
                if (ret == -ENOTTY) {
                        /* Command not implemented by lttng-modules. */
                        seq = -1ULL;
+                       stream->sequence_number_unavailable = true;
                } else {
                        PERROR("kernctl_get_sequence_number");
                        goto end;
This page took 0.03349 seconds and 4 git commands to generate.