From 64af2437213d40fdb1f386ffcad2c83ed5c11185 Mon Sep 17 00:00:00 2001 From: Mathieu Desnoyers Date: Tue, 17 May 2016 21:23:24 -0400 Subject: [PATCH] Fix: do not generate packet at destroy after stop In the following scenario: - create, enable events (kernel), - start - ... - stop (await for data_pending to complete) - destroy - rm the trace directory We would expect that the "rm" operation would not conflict with the consumer daemon trying to output data into the trace files, since the "stop" operation ensured that there was no data_pending. However, the "destroy" operation currently generates an extra packet after the data_pending check. This causes the consumer daemon to try to perform trace file rotation concurrently with the trace directory removal in the scenario above, which triggers errors. The main reason why this empty packet is generated by "destroy" is to deal with trace start/stop scenario which would otherwise generate a completely empty stream. Therefore, introduce the concept of a "quiescent stream". It is initialized at false on stream creation (first packet is empty). When tracing is started, it is set to false (for cases of start/stop/start). When tracing is stopped, if the stream is not quiescent, perform a "final" flush (which will generate an empty packet if the current packet was empty), and set quiescent to true. On "destroy" stream: if the stream is not quiescent, perform a "final" flush, and set the quiescent state to true. Signed-off-by: Mathieu Desnoyers --- lib/ringbuffer/frontend.h | 3 ++ lib/ringbuffer/frontend_types.h | 3 +- lib/ringbuffer/ring_buffer_frontend.c | 76 +++++++++++++++++++++++++-- lttng-events.c | 11 ++++ 4 files changed, 87 insertions(+), 6 deletions(-) diff --git a/lib/ringbuffer/frontend.h b/lib/ringbuffer/frontend.h index 4955b3dd..6ff15452 100644 --- a/lib/ringbuffer/frontend.h +++ b/lib/ringbuffer/frontend.h @@ -113,6 +113,9 @@ extern int lib_ring_buffer_get_subbuf(struct lib_ring_buffer *buf, unsigned long consumed); extern void lib_ring_buffer_put_subbuf(struct lib_ring_buffer *buf); +void lib_ring_buffer_set_quiescent_channel(struct channel *chan); +void lib_ring_buffer_clear_quiescent_channel(struct channel *chan); + /* * lib_ring_buffer_get_next_subbuf/lib_ring_buffer_put_next_subbuf are helpers * to read sub-buffers sequentially. diff --git a/lib/ringbuffer/frontend_types.h b/lib/ringbuffer/frontend_types.h index ef63cb50..346813e9 100644 --- a/lib/ringbuffer/frontend_types.h +++ b/lib/ringbuffer/frontend_types.h @@ -152,7 +152,8 @@ struct lib_ring_buffer { unsigned long cons_snapshot; /* Consumer count snapshot */ unsigned int get_subbuf:1, /* Sub-buffer being held by reader */ switch_timer_enabled:1, /* Protected by ring_buffer_nohz_lock */ - read_timer_enabled:1; /* Protected by ring_buffer_nohz_lock */ + read_timer_enabled:1, /* Protected by ring_buffer_nohz_lock */ + quiescent:1; }; static inline diff --git a/lib/ringbuffer/ring_buffer_frontend.c b/lib/ringbuffer/ring_buffer_frontend.c index c92e9271..c48ac620 100644 --- a/lib/ringbuffer/ring_buffer_frontend.c +++ b/lib/ringbuffer/ring_buffer_frontend.c @@ -92,6 +92,9 @@ EXPORT_PER_CPU_SYMBOL(lib_ring_buffer_nesting); static void lib_ring_buffer_print_errors(struct channel *chan, struct lib_ring_buffer *buf, int cpu); +static +void _lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf, + enum switch_mode mode); /* * Must be called under cpu hotplug protection. @@ -586,6 +589,63 @@ static void channel_unregister_notifiers(struct channel *chan) channel_backend_unregister_notifiers(&chan->backend); } +static void lib_ring_buffer_set_quiescent(struct lib_ring_buffer *buf) +{ + if (!buf->quiescent) { + buf->quiescent = true; + _lib_ring_buffer_switch_remote(buf, SWITCH_FLUSH); + } +} + +static void lib_ring_buffer_clear_quiescent(struct lib_ring_buffer *buf) +{ + buf->quiescent = false; +} + +void lib_ring_buffer_set_quiescent_channel(struct channel *chan) +{ + int cpu; + const struct lib_ring_buffer_config *config = &chan->backend.config; + + if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) { + get_online_cpus(); + for_each_channel_cpu(cpu, chan) { + struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, + cpu); + + lib_ring_buffer_set_quiescent(buf); + } + put_online_cpus(); + } else { + struct lib_ring_buffer *buf = chan->backend.buf; + + lib_ring_buffer_set_quiescent(buf); + } +} +EXPORT_SYMBOL_GPL(lib_ring_buffer_set_quiescent_channel); + +void lib_ring_buffer_clear_quiescent_channel(struct channel *chan) +{ + int cpu; + const struct lib_ring_buffer_config *config = &chan->backend.config; + + if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) { + get_online_cpus(); + for_each_channel_cpu(cpu, chan) { + struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, + cpu); + + lib_ring_buffer_clear_quiescent(buf); + } + put_online_cpus(); + } else { + struct lib_ring_buffer *buf = chan->backend.buf; + + lib_ring_buffer_clear_quiescent(buf); + } +} +EXPORT_SYMBOL_GPL(lib_ring_buffer_clear_quiescent_channel); + static void channel_free(struct channel *chan) { if (chan->backend.release_priv_ops) { @@ -746,7 +806,7 @@ void *channel_destroy(struct channel *chan) chan->backend.priv, cpu); if (buf->backend.allocated) - lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH); + lib_ring_buffer_set_quiescent(buf); /* * Perform flush before writing to finalized. */ @@ -760,7 +820,7 @@ void *channel_destroy(struct channel *chan) if (config->cb.buffer_finalize) config->cb.buffer_finalize(buf, chan->backend.priv, -1); if (buf->backend.allocated) - lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH); + lib_ring_buffer_set_quiescent(buf); /* * Perform flush before writing to finalized. */ @@ -1550,7 +1610,8 @@ static void remote_switch(void *info) lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE); } -void lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf) +static void _lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf, + enum switch_mode mode) { struct channel *chan = buf->backend.chan; const struct lib_ring_buffer_config *config = &chan->backend.config; @@ -1560,7 +1621,7 @@ void lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf) * With global synchronization we don't need to use the IPI scheme. */ if (config->sync == RING_BUFFER_SYNC_GLOBAL) { - lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE); + lib_ring_buffer_switch_slow(buf, mode); return; } @@ -1579,10 +1640,15 @@ void lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf) remote_switch, buf, 1); if (ret) { /* Remote CPU is offline, do it ourself. */ - lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE); + lib_ring_buffer_switch_slow(buf, mode); } put_online_cpus(); } + +void lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf) +{ + _lib_ring_buffer_switch_remote(buf, SWITCH_ACTIVE); +} EXPORT_SYMBOL_GPL(lib_ring_buffer_switch_remote); /* diff --git a/lttng-events.c b/lttng-events.c index 5307b722..f2a9a8a5 100644 --- a/lttng-events.c +++ b/lttng-events.c @@ -52,6 +52,8 @@ #include #include #include +#include +#include #define METADATA_CACHE_DEFAULT_SIZE 4096 @@ -241,6 +243,10 @@ int lttng_session_enable(struct lttng_session *session) /* We need to sync enablers with session before activation. */ lttng_session_sync_enablers(session); + /* Clear each stream's quiescent state. */ + list_for_each_entry(chan, &session->chan, list) + lib_ring_buffer_clear_quiescent_channel(chan->chan); + ACCESS_ONCE(session->active) = 1; ACCESS_ONCE(session->been_active) = 1; ret = _lttng_session_metadata_statedump(session); @@ -259,6 +265,7 @@ end: int lttng_session_disable(struct lttng_session *session) { int ret = 0; + struct lttng_channel *chan; mutex_lock(&sessions_mutex); if (!session->active) { @@ -270,6 +277,10 @@ int lttng_session_disable(struct lttng_session *session) /* Set transient enabler state to "disabled" */ session->tstate = 0; lttng_session_sync_enablers(session); + + /* Set each stream's quiescent state. */ + list_for_each_entry(chan, &session->chan, list) + lib_ring_buffer_set_quiescent_channel(chan->chan); end: mutex_unlock(&sessions_mutex); return ret; -- 2.34.1