Fix: do not generate packet at destroy after stop
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Wed, 18 May 2016 01:23:24 +0000 (21:23 -0400)
committerMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Wed, 18 May 2016 19:39:18 +0000 (15:39 -0400)
In the following scenario:
- create, enable events (kernel),
- start
- ...
- stop (await for data_pending to complete)
- destroy
- rm the trace directory

We would expect that the "rm" operation would not conflict with the
consumer daemon trying to output data into the trace files, since the
"stop" operation ensured that there was no data_pending.

However, the "destroy" operation currently generates an extra packet
after the data_pending check. This causes the consumer daemon to try to
perform trace file rotation concurrently with the trace directory
removal in the scenario above, which triggers errors. The main reason
why this empty packet is generated by "destroy" is to deal with trace
start/stop scenario which would otherwise generate a completely empty
stream.

Therefore, introduce the concept of a "quiescent stream". It is
initialized at false on stream creation (first packet is empty). When
tracing is started, it is set to false (for cases of start/stop/start).
When tracing is stopped, if the stream is not quiescent, perform a
"final" flush (which will generate an empty packet if the current packet
was empty), and set quiescent to true.  On "destroy" stream: if the
stream is not quiescent, perform a "final" flush, and set the quiescent
state to true.

Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
lib/ringbuffer/frontend.h
lib/ringbuffer/frontend_types.h
lib/ringbuffer/ring_buffer_frontend.c
lttng-events.c

index 4473e86f37592fdce2c286373ee0859342cbafaa..cf75dfe825e6a5fe7cc06772a408501169c4d049 100644 (file)
@@ -113,6 +113,9 @@ extern int lib_ring_buffer_get_subbuf(struct lib_ring_buffer *buf,
                                      unsigned long consumed);
 extern void lib_ring_buffer_put_subbuf(struct lib_ring_buffer *buf);
 
+void lib_ring_buffer_set_quiescent_channel(struct channel *chan);
+void lib_ring_buffer_clear_quiescent_channel(struct channel *chan);
+
 /*
  * lib_ring_buffer_get_next_subbuf/lib_ring_buffer_put_next_subbuf are helpers
  * to read sub-buffers sequentially.
index 9fc3a30a90319ffa89a20c3b243160fb830372a8..453ec38c75d8387979e830ed37e8aa5d2151f29d 100644 (file)
@@ -152,7 +152,8 @@ struct lib_ring_buffer {
        unsigned long cons_snapshot;    /* Consumer count snapshot */
        unsigned int get_subbuf:1,      /* Sub-buffer being held by reader */
                switch_timer_enabled:1, /* Protected by ring_buffer_nohz_lock */
-               read_timer_enabled:1;   /* Protected by ring_buffer_nohz_lock */
+               read_timer_enabled:1,   /* Protected by ring_buffer_nohz_lock */
+               quiescent:1;
 };
 
 static inline
index c4b797ce8af76faf9ba5209d054b89fb42326656..42cf317a01d56482ae07401b8d1d8123b332a1d5 100644 (file)
@@ -91,6 +91,9 @@ EXPORT_PER_CPU_SYMBOL(lib_ring_buffer_nesting);
 static
 void lib_ring_buffer_print_errors(struct channel *chan,
                                  struct lib_ring_buffer *buf, int cpu);
+static
+void _lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf,
+               enum switch_mode mode);
 
 /*
  * Must be called under cpu hotplug protection.
@@ -585,6 +588,63 @@ static void channel_unregister_notifiers(struct channel *chan)
        channel_backend_unregister_notifiers(&chan->backend);
 }
 
+static void lib_ring_buffer_set_quiescent(struct lib_ring_buffer *buf)
+{
+       if (!buf->quiescent) {
+               buf->quiescent = true;
+               _lib_ring_buffer_switch_remote(buf, SWITCH_FLUSH);
+       }
+}
+
+static void lib_ring_buffer_clear_quiescent(struct lib_ring_buffer *buf)
+{
+       buf->quiescent = false;
+}
+
+void lib_ring_buffer_set_quiescent_channel(struct channel *chan)
+{
+       int cpu;
+       const struct lib_ring_buffer_config *config = &chan->backend.config;
+
+       if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
+               get_online_cpus();
+               for_each_channel_cpu(cpu, chan) {
+                       struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
+                                                             cpu);
+
+                       lib_ring_buffer_set_quiescent(buf);
+               }
+               put_online_cpus();
+       } else {
+               struct lib_ring_buffer *buf = chan->backend.buf;
+
+               lib_ring_buffer_set_quiescent(buf);
+       }
+}
+EXPORT_SYMBOL_GPL(lib_ring_buffer_set_quiescent_channel);
+
+void lib_ring_buffer_clear_quiescent_channel(struct channel *chan)
+{
+       int cpu;
+       const struct lib_ring_buffer_config *config = &chan->backend.config;
+
+       if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
+               get_online_cpus();
+               for_each_channel_cpu(cpu, chan) {
+                       struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
+                                                             cpu);
+
+                       lib_ring_buffer_clear_quiescent(buf);
+               }
+               put_online_cpus();
+       } else {
+               struct lib_ring_buffer *buf = chan->backend.buf;
+
+               lib_ring_buffer_clear_quiescent(buf);
+       }
+}
+EXPORT_SYMBOL_GPL(lib_ring_buffer_clear_quiescent_channel);
+
 static void channel_free(struct channel *chan)
 {
        if (chan->backend.release_priv_ops) {
@@ -745,7 +805,7 @@ void *channel_destroy(struct channel *chan)
                                                           chan->backend.priv,
                                                           cpu);
                        if (buf->backend.allocated)
-                               lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH);
+                               lib_ring_buffer_set_quiescent(buf);
                        /*
                         * Perform flush before writing to finalized.
                         */
@@ -759,7 +819,7 @@ void *channel_destroy(struct channel *chan)
                if (config->cb.buffer_finalize)
                        config->cb.buffer_finalize(buf, chan->backend.priv, -1);
                if (buf->backend.allocated)
-                       lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH);
+                       lib_ring_buffer_set_quiescent(buf);
                /*
                 * Perform flush before writing to finalized.
                 */
@@ -1546,7 +1606,8 @@ static void remote_switch(void *info)
        lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
 }
 
-void lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf)
+static void _lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf,
+               enum switch_mode mode)
 {
        struct channel *chan = buf->backend.chan;
        const struct lib_ring_buffer_config *config = &chan->backend.config;
@@ -1556,7 +1617,7 @@ void lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf)
         * With global synchronization we don't need to use the IPI scheme.
         */
        if (config->sync == RING_BUFFER_SYNC_GLOBAL) {
-               lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
+               lib_ring_buffer_switch_slow(buf, mode);
                return;
        }
 
@@ -1575,10 +1636,15 @@ void lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf)
                                 remote_switch, buf, 1);
        if (ret) {
                /* Remote CPU is offline, do it ourself. */
-               lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
+               lib_ring_buffer_switch_slow(buf, mode);
        }
        put_online_cpus();
 }
+
+void lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf)
+{
+       _lib_ring_buffer_switch_remote(buf, SWITCH_ACTIVE);
+}
 EXPORT_SYMBOL_GPL(lib_ring_buffer_switch_remote);
 
 /*
index d02f4dff0596934ad256c92fe45c01fa3edf5b8c..c129767469db48ebc3880496a0bf2fa0e7249dea 100644 (file)
@@ -45,6 +45,8 @@
 #include "lttng-tracer.h"
 #include "lttng-abi-old.h"
 #include "wrapper/vzalloc.h"
+#include "wrapper/ringbuffer/backend.h"
+#include "wrapper/ringbuffer/frontend.h"
 
 #define METADATA_CACHE_DEFAULT_SIZE 4096
 
@@ -186,6 +188,10 @@ int lttng_session_enable(struct lttng_session *session)
                        chan->header_type = 2;  /* large */
        }
 
+       /* Clear each stream's quiescent state. */
+       list_for_each_entry(chan, &session->chan, list)
+               lib_ring_buffer_clear_quiescent_channel(chan->chan);
+
        ACCESS_ONCE(session->active) = 1;
        ACCESS_ONCE(session->been_active) = 1;
        ret = _lttng_session_metadata_statedump(session);
@@ -204,6 +210,7 @@ end:
 int lttng_session_disable(struct lttng_session *session)
 {
        int ret = 0;
+       struct lttng_channel *chan;
 
        mutex_lock(&sessions_mutex);
        if (!session->active) {
@@ -211,6 +218,11 @@ int lttng_session_disable(struct lttng_session *session)
                goto end;
        }
        ACCESS_ONCE(session->active) = 0;
+
+       /* Set each stream's quiescent state. */
+       list_for_each_entry(chan, &session->chan, list)
+               lib_ring_buffer_set_quiescent_channel(chan->chan);
+
 end:
        mutex_unlock(&sessions_mutex);
        return ret;
This page took 0.02954 seconds and 4 git commands to generate.