From 71c1d8431c8b94c34332cf2fbdbd7c9dc5f1489a Mon Sep 17 00:00:00 2001 From: Mathieu Desnoyers Date: Tue, 16 Aug 2011 13:02:50 -0400 Subject: [PATCH] Fix metadata buffer wait/wakeup The metadata producer should be waiting on its own wait queue, woken up when the consumer position is moved by the consumer. Signed-off-by: Mathieu Desnoyers --- lib/ringbuffer/frontend_types.h | 1 + lib/ringbuffer/ring_buffer_frontend.c | 5 +++++ ltt-events.c | 2 +- ltt-events.h | 2 +- ltt-ring-buffer-client.h | 8 +++++--- ltt-ring-buffer-metadata-client.h | 14 ++++++++++---- 6 files changed, 23 insertions(+), 9 deletions(-) diff --git a/lib/ringbuffer/frontend_types.h b/lib/ringbuffer/frontend_types.h index 283a254f..5c7437f8 100644 --- a/lib/ringbuffer/frontend_types.h +++ b/lib/ringbuffer/frontend_types.h @@ -129,6 +129,7 @@ struct lib_ring_buffer { union v_atomic records_count; /* Number of records written */ union v_atomic records_overrun; /* Number of overwritten records */ wait_queue_head_t read_wait; /* reader buffer-level wait queue */ + wait_queue_head_t write_wait; /* writer buffer-level wait queue (for metadata only) */ int finalized; /* buffer has been finalized */ struct timer_list switch_timer; /* timer for periodical switch */ struct timer_list read_timer; /* timer for read poll */ diff --git a/lib/ringbuffer/ring_buffer_frontend.c b/lib/ringbuffer/ring_buffer_frontend.c index db33d04c..6a4c241e 100644 --- a/lib/ringbuffer/ring_buffer_frontend.c +++ b/lib/ringbuffer/ring_buffer_frontend.c @@ -203,6 +203,7 @@ int lib_ring_buffer_create(struct lib_ring_buffer *buf, } init_waitqueue_head(&buf->read_wait); + init_waitqueue_head(&buf->write_wait); raw_spin_lock_init(&buf->raw_tick_nohz_spinlock); /* @@ -867,6 +868,8 @@ EXPORT_SYMBOL_GPL(lib_ring_buffer_snapshot); /** * lib_ring_buffer_put_snapshot - move consumed counter forward + * + * Should only be called from consumer context. * @buf: ring buffer * @consumed_new: new consumed count value */ @@ -888,6 +891,8 @@ void lib_ring_buffer_move_consumer(struct lib_ring_buffer *buf, while ((long) consumed - (long) consumed_new < 0) consumed = atomic_long_cmpxchg(&buf->consumed, consumed, consumed_new); + /* Wake-up the metadata producer */ + wake_up_interruptible(&buf->write_wait); } EXPORT_SYMBOL_GPL(lib_ring_buffer_move_consumer); diff --git a/ltt-events.c b/ltt-events.c index 9c4ba27b..28f7bdb4 100644 --- a/ltt-events.c +++ b/ltt-events.c @@ -487,7 +487,7 @@ int lttng_metadata_printf(struct ltt_session *session, * we need to bail out after timeout or being * interrupted. */ - waitret = wait_event_interruptible_timeout(*chan->ops->get_reader_wait_queue(chan->chan), + waitret = wait_event_interruptible_timeout(*chan->ops->get_writer_buf_wait_queue(chan->chan, -1), ({ ret = chan->ops->event_reserve(&ctx, 0); ret != -ENOBUFS || !ret; diff --git a/ltt-events.h b/ltt-events.h index 94ce295b..98cfae03 100644 --- a/ltt-events.h +++ b/ltt-events.h @@ -216,7 +216,7 @@ struct ltt_channel_ops { * may change due to concurrent writes. */ size_t (*packet_avail_size)(struct channel *chan); - wait_queue_head_t *(*get_reader_wait_queue)(struct channel *chan); + wait_queue_head_t *(*get_writer_buf_wait_queue)(struct channel *chan, int cpu); wait_queue_head_t *(*get_hp_wait_queue)(struct channel *chan); int (*is_finalized)(struct channel *chan); int (*is_disabled)(struct channel *chan); diff --git a/ltt-ring-buffer-client.h b/ltt-ring-buffer-client.h index 904c42eb..dc6bbd0b 100644 --- a/ltt-ring-buffer-client.h +++ b/ltt-ring-buffer-client.h @@ -479,9 +479,11 @@ void ltt_event_write(struct lib_ring_buffer_ctx *ctx, const void *src, } static -wait_queue_head_t *ltt_get_reader_wait_queue(struct channel *chan) +wait_queue_head_t *ltt_get_writer_buf_wait_queue(struct channel *chan, int cpu) { - return &chan->read_wait; + struct lib_ring_buffer *buf = channel_get_ring_buffer(&client_config, + chan, cpu); + return &buf->write_wait; } static @@ -516,7 +518,7 @@ static struct ltt_transport ltt_relay_transport = { .event_commit = ltt_event_commit, .event_write = ltt_event_write, .packet_avail_size = NULL, /* Would be racy anyway */ - .get_reader_wait_queue = ltt_get_reader_wait_queue, + .get_writer_buf_wait_queue = ltt_get_writer_buf_wait_queue, .get_hp_wait_queue = ltt_get_hp_wait_queue, .is_finalized = ltt_is_finalized, .is_disabled = ltt_is_disabled, diff --git a/ltt-ring-buffer-metadata-client.h b/ltt-ring-buffer-metadata-client.h index 65509f84..dc0e36e1 100644 --- a/ltt-ring-buffer-metadata-client.h +++ b/ltt-ring-buffer-metadata-client.h @@ -116,7 +116,11 @@ static void client_buffer_end(struct lib_ring_buffer *buf, u64 tsc, header->content_size = data_size * CHAR_BIT; /* in bits */ header->packet_size = PAGE_ALIGN(data_size) * CHAR_BIT; /* in bits */ - records_lost += lib_ring_buffer_get_records_lost_full(&client_config, buf); + /* + * We do not care about the records lost count, because the metadata + * channel waits and retry. + */ + (void) lib_ring_buffer_get_records_lost_full(&client_config, buf); records_lost += lib_ring_buffer_get_records_lost_wrap(&client_config, buf); records_lost += lib_ring_buffer_get_records_lost_big(&client_config, buf); WARN_ON_ONCE(records_lost != 0); @@ -238,9 +242,11 @@ size_t ltt_packet_avail_size(struct channel *chan) } static -wait_queue_head_t *ltt_get_reader_wait_queue(struct channel *chan) +wait_queue_head_t *ltt_get_writer_buf_wait_queue(struct channel *chan, int cpu) { - return &chan->read_wait; + struct lib_ring_buffer *buf = channel_get_ring_buffer(&client_config, + chan, cpu); + return &buf->write_wait; } static @@ -275,7 +281,7 @@ static struct ltt_transport ltt_relay_transport = { .event_commit = ltt_event_commit, .event_write = ltt_event_write, .packet_avail_size = ltt_packet_avail_size, - .get_reader_wait_queue = ltt_get_reader_wait_queue, + .get_writer_buf_wait_queue = ltt_get_writer_buf_wait_queue, .get_hp_wait_queue = ltt_get_hp_wait_queue, .is_finalized = ltt_is_finalized, .is_disabled = ltt_is_disabled, -- 2.34.1