X-Git-Url: http://git.lttng.org/?a=blobdiff_plain;f=lib%2Fringbuffer%2Fring_buffer_frontend.c;h=a04333a333b62527cb61574c450f42fa78fbab57;hb=4ef415a92af77f9324026b62e023e15824330dcd;hp=1931414b067a4c8836b1cece70be5c5ad45ad8a7;hpb=64c796d8aec1efa5d6f0d5850d2a0095cb7842e3;p=lttng-modules.git diff --git a/lib/ringbuffer/ring_buffer_frontend.c b/lib/ringbuffer/ring_buffer_frontend.c index 1931414b..a04333a3 100644 --- a/lib/ringbuffer/ring_buffer_frontend.c +++ b/lib/ringbuffer/ring_buffer_frontend.c @@ -1,7 +1,22 @@ /* * ring_buffer_frontend.c * - * (C) Copyright 2005-2010 - Mathieu Desnoyers + * Copyright (C) 2005-2012 Mathieu Desnoyers + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; only + * version 2.1 of the License. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + * * * Ring buffer wait-free buffer synchronization. Producer-consumer and flight * recorder (overwrite) modes. See thesis: @@ -34,14 +49,13 @@ * - splice one subbuffer worth of data to a pipe * - splice the data from pipe to disk/network * - put_subbuf - * - * Dual LGPL v2.1/GPL v2 license. */ #include #include #include +#include "../../lttng-tracer-core.h" #include "../../wrapper/ringbuffer/config.h" #include "../../wrapper/ringbuffer/backend.h" #include "../../wrapper/ringbuffer/frontend.h" @@ -103,7 +117,7 @@ void lib_ring_buffer_free(struct lib_ring_buffer *buf) void lib_ring_buffer_reset(struct lib_ring_buffer *buf) { struct channel *chan = buf->backend.chan; - const struct lib_ring_buffer_config *config = chan->backend.config; + const struct lib_ring_buffer_config *config = &chan->backend.config; unsigned int i; /* @@ -161,10 +175,9 @@ EXPORT_SYMBOL_GPL(channel_reset); int lib_ring_buffer_create(struct lib_ring_buffer *buf, struct channel_backend *chanb, int cpu) { - const struct lib_ring_buffer_config *config = chanb->config; + const struct lib_ring_buffer_config *config = &chanb->config; struct channel *chan = container_of(chanb, struct channel, backend); void *priv = chanb->priv; - unsigned int num_subbuf; size_t subbuf_header_size; u64 tsc; int ret; @@ -203,8 +216,8 @@ int lib_ring_buffer_create(struct lib_ring_buffer *buf, goto free_commit; } - num_subbuf = chan->backend.num_subbuf; init_waitqueue_head(&buf->read_wait); + init_waitqueue_head(&buf->write_wait); raw_spin_lock_init(&buf->raw_tick_nohz_spinlock); /* @@ -254,7 +267,7 @@ static void switch_buffer_timer(unsigned long data) { struct lib_ring_buffer *buf = (struct lib_ring_buffer *)data; struct channel *chan = buf->backend.chan; - const struct lib_ring_buffer_config *config = chan->backend.config; + const struct lib_ring_buffer_config *config = &chan->backend.config; /* * Only flush buffers periodically if readers are active. @@ -276,7 +289,7 @@ static void switch_buffer_timer(unsigned long data) static void lib_ring_buffer_start_switch_timer(struct lib_ring_buffer *buf) { struct channel *chan = buf->backend.chan; - const struct lib_ring_buffer_config *config = chan->backend.config; + const struct lib_ring_buffer_config *config = &chan->backend.config; if (!chan->switch_timer_interval || buf->switch_timer_enabled) return; @@ -312,7 +325,7 @@ static void read_buffer_timer(unsigned long data) { struct lib_ring_buffer *buf = (struct lib_ring_buffer *)data; struct channel *chan = buf->backend.chan; - const struct lib_ring_buffer_config *config = chan->backend.config; + const struct lib_ring_buffer_config *config = &chan->backend.config; CHAN_WARN_ON(chan, !buf->backend.allocated); @@ -336,7 +349,7 @@ static void read_buffer_timer(unsigned long data) static void lib_ring_buffer_start_read_timer(struct lib_ring_buffer *buf) { struct channel *chan = buf->backend.chan; - const struct lib_ring_buffer_config *config = chan->backend.config; + const struct lib_ring_buffer_config *config = &chan->backend.config; if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER || !chan->read_timer_interval @@ -361,7 +374,7 @@ static void lib_ring_buffer_start_read_timer(struct lib_ring_buffer *buf) static void lib_ring_buffer_stop_read_timer(struct lib_ring_buffer *buf) { struct channel *chan = buf->backend.chan; - const struct lib_ring_buffer_config *config = chan->backend.config; + const struct lib_ring_buffer_config *config = &chan->backend.config; if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER || !chan->read_timer_interval @@ -398,7 +411,7 @@ int __cpuinit lib_ring_buffer_cpu_hp_callback(struct notifier_block *nb, struct channel *chan = container_of(nb, struct channel, cpu_hp_notifier); struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu); - const struct lib_ring_buffer_config *config = chan->backend.config; + const struct lib_ring_buffer_config *config = &chan->backend.config; if (!chan->cpu_hp_enable) return NOTIFY_DONE; @@ -410,6 +423,7 @@ int __cpuinit lib_ring_buffer_cpu_hp_callback(struct notifier_block *nb, case CPU_DOWN_FAILED_FROZEN: case CPU_ONLINE: case CPU_ONLINE_FROZEN: + wake_up_interruptible(&chan->hp_wait); lib_ring_buffer_start_switch_timer(buf); lib_ring_buffer_start_read_timer(buf); return NOTIFY_OK; @@ -452,7 +466,7 @@ static int notrace ring_buffer_tick_nohz_callback(struct notifier_block *nb, { struct channel *chan = container_of(nb, struct channel, tick_nohz_notifier); - const struct lib_ring_buffer_config *config = chan->backend.config; + const struct lib_ring_buffer_config *config = &chan->backend.config; struct lib_ring_buffer *buf; int cpu = smp_processor_id(); @@ -524,7 +538,7 @@ void notrace lib_ring_buffer_tick_nohz_restart(void) */ static void channel_unregister_notifiers(struct channel *chan) { - const struct lib_ring_buffer_config *config = chan->backend.config; + const struct lib_ring_buffer_config *config = &chan->backend.config; int cpu; channel_iterator_unregister_notifiers(chan); @@ -626,6 +640,7 @@ struct channel *channel_create(const struct lib_ring_buffer_config *config, chan->read_timer_interval = usecs_to_jiffies(read_timer_interval); kref_init(&chan->ref); init_waitqueue_head(&chan->read_wait); + init_waitqueue_head(&chan->hp_wait); if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) { #if defined(CONFIG_NO_HZ) && defined(CONFIG_LIB_RING_BUFFER) @@ -698,16 +713,16 @@ void channel_release(struct kref *kref) * @chan: channel to destroy * * Holds cpu hotplug. - * Call "destroy" callback, finalize channels, wait for readers to release their - * reference, then destroy ring buffer data. Note that when readers have - * completed data consumption of finalized channels, get_subbuf() will return - * -ENODATA. They should release their handle at that point. - * Returns the private data pointer. + * Call "destroy" callback, finalize channels, and then decrement the + * channel reference count. Note that when readers have completed data + * consumption of finalized channels, get_subbuf() will return -ENODATA. + * They should release their handle at that point. Returns the private + * data pointer. */ void *channel_destroy(struct channel *chan) { int cpu; - const struct lib_ring_buffer_config *config = chan->backend.config; + const struct lib_ring_buffer_config *config = &chan->backend.config; void *priv; channel_unregister_notifiers(chan); @@ -748,9 +763,11 @@ void *channel_destroy(struct channel *chan) ACCESS_ONCE(buf->finalized) = 1; wake_up_interruptible(&buf->read_wait); } + ACCESS_ONCE(chan->finalized) = 1; + wake_up_interruptible(&chan->hp_wait); wake_up_interruptible(&chan->read_wait); - kref_put(&chan->ref, channel_release); priv = chan->backend.priv; + kref_put(&chan->ref, channel_release); return priv; } EXPORT_SYMBOL_GPL(channel_destroy); @@ -815,7 +832,7 @@ int lib_ring_buffer_snapshot(struct lib_ring_buffer *buf, unsigned long *consumed, unsigned long *produced) { struct channel *chan = buf->backend.chan; - const struct lib_ring_buffer_config *config = chan->backend.config; + const struct lib_ring_buffer_config *config = &chan->backend.config; unsigned long consumed_cur, write_offset; int finalized; @@ -865,6 +882,8 @@ EXPORT_SYMBOL_GPL(lib_ring_buffer_snapshot); /** * lib_ring_buffer_put_snapshot - move consumed counter forward + * + * Should only be called from consumer context. * @buf: ring buffer * @consumed_new: new consumed count value */ @@ -886,6 +905,8 @@ void lib_ring_buffer_move_consumer(struct lib_ring_buffer *buf, while ((long) consumed - (long) consumed_new < 0) consumed = atomic_long_cmpxchg(&buf->consumed, consumed, consumed_new); + /* Wake-up the metadata producer */ + wake_up_interruptible(&buf->write_wait); } EXPORT_SYMBOL_GPL(lib_ring_buffer_move_consumer); @@ -902,7 +923,7 @@ int lib_ring_buffer_get_subbuf(struct lib_ring_buffer *buf, unsigned long consumed) { struct channel *chan = buf->backend.chan; - const struct lib_ring_buffer_config *config = chan->backend.config; + const struct lib_ring_buffer_config *config = &chan->backend.config; unsigned long consumed_cur, consumed_idx, commit_count, write_offset; int ret; int finalized; @@ -1048,7 +1069,7 @@ void lib_ring_buffer_put_subbuf(struct lib_ring_buffer *buf) { struct lib_ring_buffer_backend *bufb = &buf->backend; struct channel *chan = bufb->chan; - const struct lib_ring_buffer_config *config = chan->backend.config; + const struct lib_ring_buffer_config *config = &chan->backend.config; unsigned long read_sb_bindex, consumed_idx, consumed; CHAN_WARN_ON(chan, atomic_long_read(&buf->active_readers) != 1); @@ -1107,7 +1128,7 @@ void lib_ring_buffer_print_subbuffer_errors(struct lib_ring_buffer *buf, unsigned long cons_offset, int cpu) { - const struct lib_ring_buffer_config *config = chan->backend.config; + const struct lib_ring_buffer_config *config = &chan->backend.config; unsigned long cons_idx, commit_count, commit_count_sb; cons_idx = subbuf_index(cons_offset, chan); @@ -1133,15 +1154,9 @@ void lib_ring_buffer_print_buffer_errors(struct lib_ring_buffer *buf, struct channel *chan, void *priv, int cpu) { - const struct lib_ring_buffer_config *config = chan->backend.config; + const struct lib_ring_buffer_config *config = &chan->backend.config; unsigned long write_offset, cons_offset; - /* - * Can be called in the error path of allocation when - * trans_channel_data is not yet set. - */ - if (!chan) - return; /* * No need to order commit_count, write_offset and cons_offset reads * because we execute at teardown when no more writer nor reader @@ -1150,7 +1165,7 @@ void lib_ring_buffer_print_buffer_errors(struct lib_ring_buffer *buf, write_offset = v_read(config, &buf->offset); cons_offset = atomic_long_read(&buf->consumed); if (write_offset != cons_offset) - printk(KERN_WARNING + printk(KERN_DEBUG "ring buffer %s, cpu %d: " "non-consumed data\n" " [ %lu bytes written, %lu bytes read ]\n", @@ -1169,27 +1184,34 @@ static void lib_ring_buffer_print_errors(struct channel *chan, struct lib_ring_buffer *buf, int cpu) { - const struct lib_ring_buffer_config *config = chan->backend.config; + const struct lib_ring_buffer_config *config = &chan->backend.config; void *priv = chan->backend.priv; - printk(KERN_DEBUG "ring buffer %s, cpu %d: %lu records written, " - "%lu records overrun\n", - chan->backend.name, cpu, - v_read(config, &buf->records_count), - v_read(config, &buf->records_overrun)); - - if (v_read(config, &buf->records_lost_full) - || v_read(config, &buf->records_lost_wrap) - || v_read(config, &buf->records_lost_big)) - printk(KERN_WARNING - "ring buffer %s, cpu %d: records were lost. Caused by:\n" - " [ %lu buffer full, %lu nest buffer wrap-around, " - "%lu event too big ]\n", - chan->backend.name, cpu, - v_read(config, &buf->records_lost_full), - v_read(config, &buf->records_lost_wrap), - v_read(config, &buf->records_lost_big)); - + if (!strcmp(chan->backend.name, "relay-metadata")) { + printk(KERN_DEBUG "ring buffer %s: %lu records written, " + "%lu records overrun\n", + chan->backend.name, + v_read(config, &buf->records_count), + v_read(config, &buf->records_overrun)); + } else { + printk(KERN_DEBUG "ring buffer %s, cpu %d: %lu records written, " + "%lu records overrun\n", + chan->backend.name, cpu, + v_read(config, &buf->records_count), + v_read(config, &buf->records_overrun)); + + if (v_read(config, &buf->records_lost_full) + || v_read(config, &buf->records_lost_wrap) + || v_read(config, &buf->records_lost_big)) + printk(KERN_WARNING + "ring buffer %s, cpu %d: records were lost. Caused by:\n" + " [ %lu buffer full, %lu nest buffer wrap-around, " + "%lu event too big ]\n", + chan->backend.name, cpu, + v_read(config, &buf->records_lost_full), + v_read(config, &buf->records_lost_wrap), + v_read(config, &buf->records_lost_big)); + } lib_ring_buffer_print_buffer_errors(buf, chan, priv, cpu); } @@ -1204,7 +1226,7 @@ void lib_ring_buffer_switch_old_start(struct lib_ring_buffer *buf, struct switch_offsets *offsets, u64 tsc) { - const struct lib_ring_buffer_config *config = chan->backend.config; + const struct lib_ring_buffer_config *config = &chan->backend.config; unsigned long oldidx = subbuf_index(offsets->old, chan); unsigned long commit_count; @@ -1248,7 +1270,7 @@ void lib_ring_buffer_switch_old_end(struct lib_ring_buffer *buf, struct switch_offsets *offsets, u64 tsc) { - const struct lib_ring_buffer_config *config = chan->backend.config; + const struct lib_ring_buffer_config *config = &chan->backend.config; unsigned long oldidx = subbuf_index(offsets->old - 1, chan); unsigned long commit_count, padding_size, data_size; @@ -1291,7 +1313,7 @@ void lib_ring_buffer_switch_new_start(struct lib_ring_buffer *buf, struct switch_offsets *offsets, u64 tsc) { - const struct lib_ring_buffer_config *config = chan->backend.config; + const struct lib_ring_buffer_config *config = &chan->backend.config; unsigned long beginidx = subbuf_index(offsets->begin, chan); unsigned long commit_count; @@ -1333,7 +1355,7 @@ void lib_ring_buffer_switch_new_end(struct lib_ring_buffer *buf, struct switch_offsets *offsets, u64 tsc) { - const struct lib_ring_buffer_config *config = chan->backend.config; + const struct lib_ring_buffer_config *config = &chan->backend.config; unsigned long endidx = subbuf_index(offsets->end - 1, chan); unsigned long commit_count, padding_size, data_size; @@ -1375,7 +1397,7 @@ int lib_ring_buffer_try_switch_slow(enum switch_mode mode, struct switch_offsets *offsets, u64 *tsc) { - const struct lib_ring_buffer_config *config = chan->backend.config; + const struct lib_ring_buffer_config *config = &chan->backend.config; unsigned long off; offsets->begin = v_read(config, &buf->offset); @@ -1434,7 +1456,7 @@ int lib_ring_buffer_try_switch_slow(enum switch_mode mode, void lib_ring_buffer_switch_slow(struct lib_ring_buffer *buf, enum switch_mode mode) { struct channel *chan = buf->backend.chan; - const struct lib_ring_buffer_config *config = chan->backend.config; + const struct lib_ring_buffer_config *config = &chan->backend.config; struct switch_offsets offsets; unsigned long oldidx; u64 tsc; @@ -1495,7 +1517,7 @@ int lib_ring_buffer_try_reserve_slow(struct lib_ring_buffer *buf, struct switch_offsets *offsets, struct lib_ring_buffer_ctx *ctx) { - const struct lib_ring_buffer_config *config = chan->backend.config; + const struct lib_ring_buffer_config *config = &chan->backend.config; unsigned long reserve_commit_diff; offsets->begin = v_read(config, &buf->offset); @@ -1630,7 +1652,7 @@ int lib_ring_buffer_try_reserve_slow(struct lib_ring_buffer *buf, int lib_ring_buffer_reserve_slow(struct lib_ring_buffer_ctx *ctx) { struct channel *chan = ctx->chan; - const struct lib_ring_buffer_config *config = chan->backend.config; + const struct lib_ring_buffer_config *config = &chan->backend.config; struct lib_ring_buffer *buf; struct switch_offsets offsets; int ret; @@ -1695,3 +1717,20 @@ int lib_ring_buffer_reserve_slow(struct lib_ring_buffer_ctx *ctx) return 0; } EXPORT_SYMBOL_GPL(lib_ring_buffer_reserve_slow); + +int __init init_lib_ring_buffer_frontend(void) +{ + int cpu; + + for_each_possible_cpu(cpu) + spin_lock_init(&per_cpu(ring_buffer_nohz_lock, cpu)); + return 0; +} + +module_init(init_lib_ring_buffer_frontend); + +void __exit exit_lib_ring_buffer_frontend(void) +{ +} + +module_exit(exit_lib_ring_buffer_frontend);