From b63af41cb645dcf8513e997714e47fafcb4492b4 Mon Sep 17 00:00:00 2001 From: Mathieu Desnoyers Date: Tue, 17 May 2016 21:23:24 -0400 Subject: [PATCH] Fix: do not generate packet at destroy after stop In the following scenario: - create, enable events (kernel), - start - ... - stop (await for data_pending to complete) - destroy - rm the trace directory We would expect that the "rm" operation would not conflict with the consumer daemon trying to output data into the trace files, since the "stop" operation ensured that there was no data_pending. However, the "destroy" operation currently generates an extra packet after the data_pending check. This causes the consumer daemon to try to perform trace file rotation concurrently with the trace directory removal in the scenario above, which triggers errors. The main reason why this empty packet is generated by "destroy" is to deal with trace start/stop scenario which would otherwise generate a completely empty stream. Therefore, introduce the concept of a "quiescent stream". It is initialized at false on stream creation (first packet is empty). When tracing is started, it is set to false (for cases of start/stop/start). When tracing is stopped, if the stream is not quiescent, perform a "final" flush (which will generate an empty packet if the current packet was empty), and set quiescent to true. On "destroy" stream: if the stream is not quiescent, perform a "final" flush, and set the quiescent state to true. Signed-off-by: Mathieu Desnoyers --- lib/ringbuffer/frontend.h | 3 ++ lib/ringbuffer/frontend_types.h | 3 +- lib/ringbuffer/ring_buffer_frontend.c | 76 +++++++++++++++++++++++++-- lttng-events.c | 12 +++++ 4 files changed, 88 insertions(+), 6 deletions(-) diff --git a/lib/ringbuffer/frontend.h b/lib/ringbuffer/frontend.h index 4473e86f..cf75dfe8 100644 --- a/lib/ringbuffer/frontend.h +++ b/lib/ringbuffer/frontend.h @@ -113,6 +113,9 @@ extern int lib_ring_buffer_get_subbuf(struct lib_ring_buffer *buf, unsigned long consumed); extern void lib_ring_buffer_put_subbuf(struct lib_ring_buffer *buf); +void lib_ring_buffer_set_quiescent_channel(struct channel *chan); +void lib_ring_buffer_clear_quiescent_channel(struct channel *chan); + /* * lib_ring_buffer_get_next_subbuf/lib_ring_buffer_put_next_subbuf are helpers * to read sub-buffers sequentially. diff --git a/lib/ringbuffer/frontend_types.h b/lib/ringbuffer/frontend_types.h index 9fc3a30a..453ec38c 100644 --- a/lib/ringbuffer/frontend_types.h +++ b/lib/ringbuffer/frontend_types.h @@ -152,7 +152,8 @@ struct lib_ring_buffer { unsigned long cons_snapshot; /* Consumer count snapshot */ unsigned int get_subbuf:1, /* Sub-buffer being held by reader */ switch_timer_enabled:1, /* Protected by ring_buffer_nohz_lock */ - read_timer_enabled:1; /* Protected by ring_buffer_nohz_lock */ + read_timer_enabled:1, /* Protected by ring_buffer_nohz_lock */ + quiescent:1; }; static inline diff --git a/lib/ringbuffer/ring_buffer_frontend.c b/lib/ringbuffer/ring_buffer_frontend.c index c4b797ce..42cf317a 100644 --- a/lib/ringbuffer/ring_buffer_frontend.c +++ b/lib/ringbuffer/ring_buffer_frontend.c @@ -91,6 +91,9 @@ EXPORT_PER_CPU_SYMBOL(lib_ring_buffer_nesting); static void lib_ring_buffer_print_errors(struct channel *chan, struct lib_ring_buffer *buf, int cpu); +static +void _lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf, + enum switch_mode mode); /* * Must be called under cpu hotplug protection. @@ -585,6 +588,63 @@ static void channel_unregister_notifiers(struct channel *chan) channel_backend_unregister_notifiers(&chan->backend); } +static void lib_ring_buffer_set_quiescent(struct lib_ring_buffer *buf) +{ + if (!buf->quiescent) { + buf->quiescent = true; + _lib_ring_buffer_switch_remote(buf, SWITCH_FLUSH); + } +} + +static void lib_ring_buffer_clear_quiescent(struct lib_ring_buffer *buf) +{ + buf->quiescent = false; +} + +void lib_ring_buffer_set_quiescent_channel(struct channel *chan) +{ + int cpu; + const struct lib_ring_buffer_config *config = &chan->backend.config; + + if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) { + get_online_cpus(); + for_each_channel_cpu(cpu, chan) { + struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, + cpu); + + lib_ring_buffer_set_quiescent(buf); + } + put_online_cpus(); + } else { + struct lib_ring_buffer *buf = chan->backend.buf; + + lib_ring_buffer_set_quiescent(buf); + } +} +EXPORT_SYMBOL_GPL(lib_ring_buffer_set_quiescent_channel); + +void lib_ring_buffer_clear_quiescent_channel(struct channel *chan) +{ + int cpu; + const struct lib_ring_buffer_config *config = &chan->backend.config; + + if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) { + get_online_cpus(); + for_each_channel_cpu(cpu, chan) { + struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, + cpu); + + lib_ring_buffer_clear_quiescent(buf); + } + put_online_cpus(); + } else { + struct lib_ring_buffer *buf = chan->backend.buf; + + lib_ring_buffer_clear_quiescent(buf); + } +} +EXPORT_SYMBOL_GPL(lib_ring_buffer_clear_quiescent_channel); + static void channel_free(struct channel *chan) { if (chan->backend.release_priv_ops) { @@ -745,7 +805,7 @@ void *channel_destroy(struct channel *chan) chan->backend.priv, cpu); if (buf->backend.allocated) - lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH); + lib_ring_buffer_set_quiescent(buf); /* * Perform flush before writing to finalized. */ @@ -759,7 +819,7 @@ void *channel_destroy(struct channel *chan) if (config->cb.buffer_finalize) config->cb.buffer_finalize(buf, chan->backend.priv, -1); if (buf->backend.allocated) - lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH); + lib_ring_buffer_set_quiescent(buf); /* * Perform flush before writing to finalized. */ @@ -1546,7 +1606,8 @@ static void remote_switch(void *info) lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE); } -void lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf) +static void _lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf, + enum switch_mode mode) { struct channel *chan = buf->backend.chan; const struct lib_ring_buffer_config *config = &chan->backend.config; @@ -1556,7 +1617,7 @@ void lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf) * With global synchronization we don't need to use the IPI scheme. */ if (config->sync == RING_BUFFER_SYNC_GLOBAL) { - lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE); + lib_ring_buffer_switch_slow(buf, mode); return; } @@ -1575,10 +1636,15 @@ void lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf) remote_switch, buf, 1); if (ret) { /* Remote CPU is offline, do it ourself. */ - lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE); + lib_ring_buffer_switch_slow(buf, mode); } put_online_cpus(); } + +void lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf) +{ + _lib_ring_buffer_switch_remote(buf, SWITCH_ACTIVE); +} EXPORT_SYMBOL_GPL(lib_ring_buffer_switch_remote); /* diff --git a/lttng-events.c b/lttng-events.c index d02f4dff..c1297674 100644 --- a/lttng-events.c +++ b/lttng-events.c @@ -45,6 +45,8 @@ #include "lttng-tracer.h" #include "lttng-abi-old.h" #include "wrapper/vzalloc.h" +#include "wrapper/ringbuffer/backend.h" +#include "wrapper/ringbuffer/frontend.h" #define METADATA_CACHE_DEFAULT_SIZE 4096 @@ -186,6 +188,10 @@ int lttng_session_enable(struct lttng_session *session) chan->header_type = 2; /* large */ } + /* Clear each stream's quiescent state. */ + list_for_each_entry(chan, &session->chan, list) + lib_ring_buffer_clear_quiescent_channel(chan->chan); + ACCESS_ONCE(session->active) = 1; ACCESS_ONCE(session->been_active) = 1; ret = _lttng_session_metadata_statedump(session); @@ -204,6 +210,7 @@ end: int lttng_session_disable(struct lttng_session *session) { int ret = 0; + struct lttng_channel *chan; mutex_lock(&sessions_mutex); if (!session->active) { @@ -211,6 +218,11 @@ int lttng_session_disable(struct lttng_session *session) goto end; } ACCESS_ONCE(session->active) = 0; + + /* Set each stream's quiescent state. */ + list_for_each_entry(chan, &session->chan, list) + lib_ring_buffer_set_quiescent_channel(chan->chan); + end: mutex_unlock(&sessions_mutex); return ret; -- 2.34.1