Fix: post-clear trace chunk has a late beginning packet
authorJérémie Galarneau <jeremie.galarneau@efficios.com>
Thu, 25 Jun 2020 23:22:24 +0000 (19:22 -0400)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Thu, 16 Jul 2020 15:24:28 +0000 (11:24 -0400)
Observed issue
==============

In the following scenario:
  - create a regular session
  - enable an event that occurs on only one core
  - start tracing
  - trace for a while
  - stop tracing
  - clear the session
  - start
  - trace for a while (referred to the "active period" later on)
  - stop

The resulting trace will contain a very short stream intersection as the
active stream will contain packets spanning the entire active period.
However, all other streams will contain a single packet at the end of
the active period with a duration of 0 ns.

This presents two problems:
  1) This makes the stream intersection mode of viewers unusable,
  2) This misleads the user into thinking that tracing was not active
     for some buffers.

Cause
=====

The packet beginning timestamps of the buffers are initialized on
creation (on the first "start" of a tracing session). When a "clear" is
performed on a session, all open packets are closed and the existing
contents are purged.

If a stream is inactive, it is possible for no packet to be "opened"
until the "stop" of the tracing session.

On stop, a "flush_empty" is performed. Such a flush opens a packet
(if it was not already the case), closes it, and marks the packet as
being ready for consumption.

Solution
========

Attempt to flush an empty packet as close to the rotation point as
possible. In the event where a stream remains inactive after the
rotation point, this ensures that the new trace chunk has a beginning
timestamp set at the begining of the trace chunk instead of only
creating an empty packet when the trace chunk is stopped.

This indicates to the viewers that the stream was being recorded, but
more importantly it allows viewers to determine a useable trace
intersection.

This presents a problem in the case where the ring-buffer is completely
full.

Consider the following scenario:
  - The consumption of data is slow (slow network, for instance),
  - The ring buffer is full,
  - A rotation is initiated,
    - The flush below does nothing (no space left to open a new packet),
  - The other streams rotate very soon, and new data is produced in the
    new chunk,
  - This stream completes its rotation long after the rotation was
    initiated
  - The session is stopped before any event can be produced in this
    stream's buffers.

The resulting trace chunk will have a single packet temporaly at the end
of the trace chunk for this stream making the stream intersection more
narrow than it should be.

To work-around this, an empty flush is performed after the first
consumption of a packet during a rotation if the initial flush failed.
The idea is that consuming a packet frees enough space to switch packets
in this scenario and allows the tracer to "stamp" the beginning of the
new trace chunk at the earliest possible point.

Note that metadata streams are always skipped when opening a packet.
This is done for two reasons:
  1) Timestamps are not relevant to the metadata stream
  2) Inserting an empty packet in the metadata stream after a rotation
     breaks the use of "clear" in live mode.

     The contents of the metadata streams of successive chunks must be
     strict superset of one another as live clients only receive the
     information appended to a metadata stream (i.e. the parts it
     already has received can't change).

     If a flush_empty was performed after a clear/rotation, it would
     result in an empty packet being inserted at the beginning of the
     metadata stream that wasn't present in the first chunk.

     This would cause the live client and relay daemon to have
     mismatching copies of the metadata stream.

Known drawbacks
===============

In the case of an inactive stream, this results in the completed trace
chunk archive containing an extra empty packet at the beginning of the
stream (typically 4kB).

In the case of an active stream, this change will cause the first packet
to be empty or contain few events.

Those are all efficiency losses that are inevitable (AFAIK) given the
current buffer control APIs. It will be possible to recoup those losses
if an API allowing the consumer daemon to open a new packet is
introduced.

As noted in the comments, this patch is not final. The flush after the
rotation should not be open-coded in lttng_consumer_read_subbuffer. It
should be a data-stream specific "post-consume" step.

Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
Change-Id: I7d8ab876e55e9d0718a55ec1bb77ec6466accc02

src/common/consumer/consumer-stream.c
src/common/consumer/consumer.c
src/common/consumer/consumer.h

index fb878d72cabb6a56e4ff5a8a0d0303062a99d165..398d71ae05f92f6d9c0437b4fb203532870eb5d6 100644 (file)
@@ -452,6 +452,8 @@ struct lttng_consumer_stream *consumer_stream_create(
        stream->index_file = NULL;
        stream->last_sequence_number = -1ULL;
        stream->rotate_position = -1ULL;
        stream->index_file = NULL;
        stream->last_sequence_number = -1ULL;
        stream->rotate_position = -1ULL;
+       /* Buffer is created with an open packet. */
+       stream->opened_packet_in_current_trace_chunk = true;
        pthread_mutex_init(&stream->lock, NULL);
        pthread_mutex_init(&stream->metadata_timer_lock, NULL);
 
        pthread_mutex_init(&stream->lock, NULL);
        pthread_mutex_init(&stream->metadata_timer_lock, NULL);
 
index 311c64020327cdcfdb1f55a925c7b2cc90b60729..6505490cdc04e212c54aadd3d8eb58d569eb523e 100644 (file)
@@ -63,6 +63,12 @@ struct consumer_channel_msg {
        uint64_t key;                           /* del */
 };
 
        uint64_t key;                           /* del */
 };
 
+enum open_packet_status {
+       OPEN_PACKET_STATUS_OPENED,
+       OPEN_PACKET_STATUS_NO_SPACE,
+       OPEN_PACKET_STATUS_ERROR,
+};
+
 /* Flag used to temporarily pause data consumption from testpoints. */
 int data_consumption_paused;
 
 /* Flag used to temporarily pause data consumption from testpoints. */
 int data_consumption_paused;
 
@@ -3310,6 +3316,137 @@ error_testpoint:
        return NULL;
 }
 
        return NULL;
 }
 
+static
+int consumer_flush_buffer(struct lttng_consumer_stream *stream,
+               int producer_active)
+{
+       int ret = 0;
+
+       switch (consumer_data.type) {
+       case LTTNG_CONSUMER_KERNEL:
+               if (producer_active) {
+                       ret = kernctl_buffer_flush(stream->wait_fd);
+                       if (ret < 0) {
+                               ERR("Failed to flush kernel stream");
+                               goto end;
+                       }
+               } else {
+                       ret = kernctl_buffer_flush_empty(stream->wait_fd);
+                       if (ret < 0) {
+                               /*
+                                * Doing a buffer flush which does not take into
+                                * account empty packets. This is not perfect,
+                                * but required as a fall-back when
+                                * "flush_empty" is not implemented by
+                                * lttng-modules.
+                                */
+                               ret = kernctl_buffer_flush(stream->wait_fd);
+                               if (ret < 0) {
+                                       ERR("Failed to flush kernel stream");
+                                       goto end;
+                               }
+                       }
+               }
+               break;
+       case LTTNG_CONSUMER32_UST:
+       case LTTNG_CONSUMER64_UST:
+               lttng_ustconsumer_flush_buffer(stream, producer_active);
+               break;
+       default:
+               ERR("Unknown consumer_data type");
+               abort();
+       }
+
+end:
+       return ret;
+}
+
+static enum open_packet_status open_packet(struct lttng_consumer_stream *stream)
+{
+       int ret;
+       enum open_packet_status status;
+       unsigned long produced_pos_before, produced_pos_after;
+
+       ret = lttng_consumer_sample_snapshot_positions(stream);
+       if (ret < 0) {
+               ERR("Failed to snapshot positions before post-rotation empty packet flush: stream id = %" PRIu64
+                               ", channel name = %s, session id = %" PRIu64,
+                               stream->key, stream->chan->name,
+                               stream->chan->session_id);
+               status = OPEN_PACKET_STATUS_ERROR;
+               goto end;
+       }
+
+       ret = lttng_consumer_get_produced_snapshot(
+                       stream, &produced_pos_before);
+       if (ret < 0) {
+               ERR("Failed to read produced position before post-rotation empty packet flush: stream id = %" PRIu64
+                               ", channel name = %s, session id = %" PRIu64,
+                               stream->key, stream->chan->name,
+                               stream->chan->session_id);
+               status = OPEN_PACKET_STATUS_ERROR;
+               goto end;
+       }
+
+       ret = consumer_flush_buffer(stream, 0);
+       if (ret) {
+               ERR("Failed to flush an empty packet at rotation point: stream id = %" PRIu64
+                               ", channel name = %s, session id = %" PRIu64,
+                               stream->key, stream->chan->name,
+                               stream->chan->session_id);
+               status = OPEN_PACKET_STATUS_ERROR;
+               goto end;
+       }
+
+       ret = lttng_consumer_sample_snapshot_positions(stream);
+       if (ret < 0) {
+               ERR("Failed to snapshot positions after post-rotation empty packet flush: stream id = %" PRIu64
+                               ", channel name = %s, session id = %" PRIu64,
+                               stream->key, stream->chan->name,
+                               stream->chan->session_id);
+               status = OPEN_PACKET_STATUS_ERROR;
+               goto end;
+       }
+
+       ret = lttng_consumer_get_produced_snapshot(stream, &produced_pos_after);
+       if (ret < 0) {
+               ERR("Failed to read produced position after post-rotation empty packet flush: stream id = %" PRIu64
+                               ", channel name = %s, session id = %" PRIu64,
+                               stream->key, stream->chan->name,
+                               stream->chan->session_id);
+               status = OPEN_PACKET_STATUS_ERROR;
+               goto end;
+       }
+
+       /*
+        * Determine if the flush had an effect by comparing the produced
+        * positons before and after the flush.
+        */
+       status = produced_pos_before != produced_pos_after ?
+                       OPEN_PACKET_STATUS_OPENED :
+                       OPEN_PACKET_STATUS_NO_SPACE;
+end:
+       return status;
+}
+
+static bool stream_is_rotating_to_null_chunk(
+               const struct lttng_consumer_stream *stream)
+{
+       bool rotating_to_null_chunk = false;
+
+       if (stream->rotate_position == -1ULL) {
+               /* No rotation ongoing. */
+               goto end;
+       }
+
+       if (stream->trace_chunk == stream->chan->trace_chunk ||
+                       !stream->chan->trace_chunk) {
+               rotating_to_null_chunk = true;
+       }
+end:
+       return rotating_to_null_chunk;
+}
+
 ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
                struct lttng_consumer_local_data *ctx,
                bool locked_by_caller)
 ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
                struct lttng_consumer_local_data *ctx,
                bool locked_by_caller)
@@ -3413,6 +3550,48 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
                goto end;
        }
 
                goto end;
        }
 
+       /*
+        * TODO roll into a post_consume op as this doesn't apply to metadata
+        * streams.
+        */
+       if (!stream->opened_packet_in_current_trace_chunk &&
+                       stream->trace_chunk && !stream->metadata_flag &&
+                       !stream_is_rotating_to_null_chunk(stream)) {
+               const enum open_packet_status status = open_packet(stream);
+
+               switch (status) {
+               case OPEN_PACKET_STATUS_OPENED:
+                       DBG("Opened a packet after consuming a packet rotation: stream id = %" PRIu64
+                                       ", channel name = %s, session id = %" PRIu64,
+                                       stream->key, stream->chan->name,
+                                       stream->chan->session_id);
+                       stream->opened_packet_in_current_trace_chunk =
+                                       true;
+                       break;
+               case OPEN_PACKET_STATUS_NO_SPACE:
+                       /*
+                        * Can't open a packet as there is no space left.
+                        * This means that new events were produced, resulting
+                        * in a packet being opened, which is what we want
+                        * anyhow.
+                        */
+                       DBG("No space left to open a packet after consuming a packet: stream id = %" PRIu64
+                                       ", channel name = %s, session id = %" PRIu64,
+                                       stream->key, stream->chan->name,
+                                       stream->chan->session_id);
+                       stream->opened_packet_in_current_trace_chunk = true;
+                       break;
+               case OPEN_PACKET_STATUS_ERROR:
+                       /* Logged by callee. */
+                       ret = -1;
+                       goto end;
+               default:
+                       abort();
+               }
+
+               stream->opened_packet_in_current_trace_chunk = true;
+       }
+
 sleep_stream:
        if (stream->read_subbuffer_ops.on_sleep) {
                stream->read_subbuffer_ops.on_sleep(stream, ctx);
 sleep_stream:
        if (stream->read_subbuffer_ops.on_sleep) {
                stream->read_subbuffer_ops.on_sleep(stream, ctx);
@@ -3901,50 +4080,6 @@ unsigned long consumer_get_consume_start_pos(unsigned long consumed_pos,
        return start_pos;
 }
 
        return start_pos;
 }
 
-static
-int consumer_flush_buffer(struct lttng_consumer_stream *stream, int producer_active)
-{
-       int ret = 0;
-
-       switch (consumer_data.type) {
-       case LTTNG_CONSUMER_KERNEL:
-               if (producer_active) {
-                       ret = kernctl_buffer_flush(stream->wait_fd);
-                       if (ret < 0) {
-                               ERR("Failed to flush kernel stream");
-                               goto end;
-                       }
-               } else {
-                       ret = kernctl_buffer_flush_empty(stream->wait_fd);
-                       if (ret < 0) {
-                               /*
-                                * Doing a buffer flush which does not take into
-                                * account empty packets. This is not perfect,
-                                * but required as a fall-back when
-                                * "flush_empty" is not implemented by
-                                * lttng-modules.
-                                */
-                               ret = kernctl_buffer_flush(stream->wait_fd);
-                               if (ret < 0) {
-                                       ERR("Failed to flush kernel stream");
-                                       goto end;
-                               }
-                       }
-               }
-               break;
-       case LTTNG_CONSUMER32_UST:
-       case LTTNG_CONSUMER64_UST:
-               lttng_ustconsumer_flush_buffer(stream, producer_active);
-               break;
-       default:
-               ERR("Unknown consumer_data type");
-               abort();
-       }
-
-end:
-       return ret;
-}
-
 /*
  * Sample the rotate position for all the streams of a channel. If a stream
  * is already at the rotate position (produced == consumed), we flag it as
 /*
  * Sample the rotate position for all the streams of a channel. If a stream
  * is already at the rotate position (produced == consumed), we flag it as
@@ -4102,6 +4237,86 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                        }
                        stream_count++;
                }
                        }
                        stream_count++;
                }
+
+               stream->opened_packet_in_current_trace_chunk = false;
+
+               if (rotating_to_new_chunk && !stream->metadata_flag) {
+                       /*
+                        * Attempt to flush an empty packet as close to the
+                        * rotation point as possible. In the event where a
+                        * stream remains inactive after the rotation point,
+                        * this ensures that the new trace chunk has a
+                        * beginning timestamp set at the begining of the
+                        * trace chunk instead of only creating an empty
+                        * packet when the trace chunk is stopped.
+                        *
+                        * This indicates to the viewers that the stream
+                        * was being recorded, but more importantly it
+                        * allows viewers to determine a useable trace
+                        * intersection.
+                        *
+                        * This presents a problem in the case where the
+                        * ring-buffer is completely full.
+                        *
+                        * Consider the following scenario:
+                        *   - The consumption of data is slow (slow network,
+                        *     for instance),
+                        *   - The ring buffer is full,
+                        *   - A rotation is initiated,
+                        *     - The flush below does nothing (no space left to
+                        *       open a new packet),
+                        *   - The other streams rotate very soon, and new
+                        *     data is produced in the new chunk,
+                        *   - This stream completes its rotation long after the
+                        *     rotation was initiated
+                        *   - The session is stopped before any event can be
+                        *     produced in this stream's buffers.
+                        *
+                        * The resulting trace chunk will have a single packet
+                        * temporaly at the end of the trace chunk for this
+                        * stream making the stream intersection more narrow
+                        * than it should be.
+                        *
+                        * To work-around this, an empty flush is performed
+                        * after the first consumption of a packet during a
+                        * rotation if open_packet fails. The idea is that
+                        * consuming a packet frees enough space to switch
+                        * packets in this scenario and allows the tracer to
+                        * "stamp" the beginning of the new trace chunk at the
+                        * earliest possible point.
+                        */
+                       const enum open_packet_status status =
+                               open_packet(stream);
+
+                       switch (status) {
+                       case OPEN_PACKET_STATUS_OPENED:
+                               DBG("Opened a packet after a rotation: stream id = %" PRIu64
+                                               ", channel name = %s, session id = %" PRIu64,
+                                               stream->key, stream->chan->name,
+                                               stream->chan->session_id);
+                               stream->opened_packet_in_current_trace_chunk =
+                                               true;
+                               break;
+                       case OPEN_PACKET_STATUS_NO_SPACE:
+                               /*
+                                * Can't open a packet as there is no space left
+                                * in the buffer. A new packet will be opened
+                                * once one has been consumed.
+                                */
+                               DBG("No space left to open a packet after a rotation: stream id = %" PRIu64
+                                               ", channel name = %s, session id = %" PRIu64,
+                                               stream->key, stream->chan->name,
+                                               stream->chan->session_id);
+                               break;
+                       case OPEN_PACKET_STATUS_ERROR:
+                               /* Logged by callee. */
+                               ret = -1;
+                               goto end_unlock_stream;
+                       default:
+                               abort();
+                       }
+               }
+
                pthread_mutex_unlock(&stream->lock);
        }
        stream = NULL;
                pthread_mutex_unlock(&stream->lock);
        }
        stream = NULL;
index c5b7023579ffac7d18e6013102cf0afd2dca708b..6dedb7525fe7724bc709a4fdb45e5ed35a5939b9 100644 (file)
@@ -596,6 +596,9 @@ struct lttng_consumer_stream {
         */
        uint64_t rotate_position;
 
         */
        uint64_t rotate_position;
 
+       /* Whether or not a packet was opened during the current trace chunk. */
+       bool opened_packet_in_current_trace_chunk;
+
        /*
         * Read-only copies of channel values. We cannot safely access the
         * channel from a stream, so we need to have a local copy of these
        /*
         * Read-only copies of channel values. We cannot safely access the
         * channel from a stream, so we need to have a local copy of these
This page took 0.032027 seconds and 4 git commands to generate.