X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=lib%2Fringbuffer%2Fring_buffer_frontend.c;h=eaeb571562230f92e4b30df3616ce3907c9b8be7;hb=5e3912524c2841304e38f962dd8d4cfc6150909e;hp=3003dd8324458ee62c8e99460b295bc3ab8e481d;hpb=ba1d61bc8909b4796516221bf97130631e8d0bc6;p=lttng-modules.git diff --git a/lib/ringbuffer/ring_buffer_frontend.c b/lib/ringbuffer/ring_buffer_frontend.c index 3003dd83..eaeb5715 100644 --- a/lib/ringbuffer/ring_buffer_frontend.c +++ b/lib/ringbuffer/ring_buffer_frontend.c @@ -1,7 +1,22 @@ /* * ring_buffer_frontend.c * - * (C) Copyright 2005-2010 - Mathieu Desnoyers + * Copyright (C) 2005-2012 Mathieu Desnoyers + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; only + * version 2.1 of the License. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + * * * Ring buffer wait-free buffer synchronization. Producer-consumer and flight * recorder (overwrite) modes. See thesis: @@ -34,8 +49,6 @@ * - splice one subbuffer worth of data to a pipe * - splice the data from pipe to disk/network * - put_subbuf - * - * Dual LGPL v2.1/GPL v2 license. */ #include @@ -54,8 +67,7 @@ struct switch_offsets { unsigned long begin, end, old; size_t pre_header_padding, size; - unsigned int switch_new_start:1, switch_new_end:1, switch_old_start:1, - switch_old_end:1; + unsigned int switch_new_start:1, switch_old_start:1, switch_old_end:1; }; #ifdef CONFIG_NO_HZ @@ -103,7 +115,7 @@ void lib_ring_buffer_free(struct lib_ring_buffer *buf) void lib_ring_buffer_reset(struct lib_ring_buffer *buf) { struct channel *chan = buf->backend.chan; - const struct lib_ring_buffer_config *config = chan->backend.config; + const struct lib_ring_buffer_config *config = &chan->backend.config; unsigned int i; /* @@ -161,7 +173,7 @@ EXPORT_SYMBOL_GPL(channel_reset); int lib_ring_buffer_create(struct lib_ring_buffer *buf, struct channel_backend *chanb, int cpu) { - const struct lib_ring_buffer_config *config = chanb->config; + const struct lib_ring_buffer_config *config = &chanb->config; struct channel *chan = container_of(chanb, struct channel, backend); void *priv = chanb->priv; size_t subbuf_header_size; @@ -253,7 +265,7 @@ static void switch_buffer_timer(unsigned long data) { struct lib_ring_buffer *buf = (struct lib_ring_buffer *)data; struct channel *chan = buf->backend.chan; - const struct lib_ring_buffer_config *config = chan->backend.config; + const struct lib_ring_buffer_config *config = &chan->backend.config; /* * Only flush buffers periodically if readers are active. @@ -275,7 +287,7 @@ static void switch_buffer_timer(unsigned long data) static void lib_ring_buffer_start_switch_timer(struct lib_ring_buffer *buf) { struct channel *chan = buf->backend.chan; - const struct lib_ring_buffer_config *config = chan->backend.config; + const struct lib_ring_buffer_config *config = &chan->backend.config; if (!chan->switch_timer_interval || buf->switch_timer_enabled) return; @@ -311,7 +323,7 @@ static void read_buffer_timer(unsigned long data) { struct lib_ring_buffer *buf = (struct lib_ring_buffer *)data; struct channel *chan = buf->backend.chan; - const struct lib_ring_buffer_config *config = chan->backend.config; + const struct lib_ring_buffer_config *config = &chan->backend.config; CHAN_WARN_ON(chan, !buf->backend.allocated); @@ -335,7 +347,7 @@ static void read_buffer_timer(unsigned long data) static void lib_ring_buffer_start_read_timer(struct lib_ring_buffer *buf) { struct channel *chan = buf->backend.chan; - const struct lib_ring_buffer_config *config = chan->backend.config; + const struct lib_ring_buffer_config *config = &chan->backend.config; if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER || !chan->read_timer_interval @@ -360,7 +372,7 @@ static void lib_ring_buffer_start_read_timer(struct lib_ring_buffer *buf) static void lib_ring_buffer_stop_read_timer(struct lib_ring_buffer *buf) { struct channel *chan = buf->backend.chan; - const struct lib_ring_buffer_config *config = chan->backend.config; + const struct lib_ring_buffer_config *config = &chan->backend.config; if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER || !chan->read_timer_interval @@ -389,7 +401,7 @@ static void lib_ring_buffer_stop_read_timer(struct lib_ring_buffer *buf) * Returns the success/failure of the operation. (%NOTIFY_OK, %NOTIFY_BAD) */ static -int __cpuinit lib_ring_buffer_cpu_hp_callback(struct notifier_block *nb, +int lib_ring_buffer_cpu_hp_callback(struct notifier_block *nb, unsigned long action, void *hcpu) { @@ -397,7 +409,7 @@ int __cpuinit lib_ring_buffer_cpu_hp_callback(struct notifier_block *nb, struct channel *chan = container_of(nb, struct channel, cpu_hp_notifier); struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu); - const struct lib_ring_buffer_config *config = chan->backend.config; + const struct lib_ring_buffer_config *config = &chan->backend.config; if (!chan->cpu_hp_enable) return NOTIFY_DONE; @@ -452,7 +464,7 @@ static int notrace ring_buffer_tick_nohz_callback(struct notifier_block *nb, { struct channel *chan = container_of(nb, struct channel, tick_nohz_notifier); - const struct lib_ring_buffer_config *config = chan->backend.config; + const struct lib_ring_buffer_config *config = &chan->backend.config; struct lib_ring_buffer *buf; int cpu = smp_processor_id(); @@ -524,7 +536,7 @@ void notrace lib_ring_buffer_tick_nohz_restart(void) */ static void channel_unregister_notifiers(struct channel *chan) { - const struct lib_ring_buffer_config *config = chan->backend.config; + const struct lib_ring_buffer_config *config = &chan->backend.config; int cpu; channel_iterator_unregister_notifiers(chan); @@ -708,7 +720,7 @@ void channel_release(struct kref *kref) void *channel_destroy(struct channel *chan) { int cpu; - const struct lib_ring_buffer_config *config = chan->backend.config; + const struct lib_ring_buffer_config *config = &chan->backend.config; void *priv; channel_unregister_notifiers(chan); @@ -818,7 +830,7 @@ int lib_ring_buffer_snapshot(struct lib_ring_buffer *buf, unsigned long *consumed, unsigned long *produced) { struct channel *chan = buf->backend.chan; - const struct lib_ring_buffer_config *config = chan->backend.config; + const struct lib_ring_buffer_config *config = &chan->backend.config; unsigned long consumed_cur, write_offset; int finalized; @@ -909,11 +921,18 @@ int lib_ring_buffer_get_subbuf(struct lib_ring_buffer *buf, unsigned long consumed) { struct channel *chan = buf->backend.chan; - const struct lib_ring_buffer_config *config = chan->backend.config; + const struct lib_ring_buffer_config *config = &chan->backend.config; unsigned long consumed_cur, consumed_idx, commit_count, write_offset; int ret; int finalized; + if (buf->get_subbuf) { + /* + * Reader is trying to get a subbuffer twice. + */ + CHAN_WARN_ON(chan, 1); + return -EBUSY; + } retry: finalized = ACCESS_ONCE(buf->finalized); /* @@ -1055,7 +1074,7 @@ void lib_ring_buffer_put_subbuf(struct lib_ring_buffer *buf) { struct lib_ring_buffer_backend *bufb = &buf->backend; struct channel *chan = bufb->chan; - const struct lib_ring_buffer_config *config = chan->backend.config; + const struct lib_ring_buffer_config *config = &chan->backend.config; unsigned long read_sb_bindex, consumed_idx, consumed; CHAN_WARN_ON(chan, atomic_long_read(&buf->active_readers) != 1); @@ -1114,7 +1133,7 @@ void lib_ring_buffer_print_subbuffer_errors(struct lib_ring_buffer *buf, unsigned long cons_offset, int cpu) { - const struct lib_ring_buffer_config *config = chan->backend.config; + const struct lib_ring_buffer_config *config = &chan->backend.config; unsigned long cons_idx, commit_count, commit_count_sb; cons_idx = subbuf_index(cons_offset, chan); @@ -1140,15 +1159,9 @@ void lib_ring_buffer_print_buffer_errors(struct lib_ring_buffer *buf, struct channel *chan, void *priv, int cpu) { - const struct lib_ring_buffer_config *config = chan->backend.config; + const struct lib_ring_buffer_config *config = &chan->backend.config; unsigned long write_offset, cons_offset; - /* - * Can be called in the error path of allocation when - * trans_channel_data is not yet set. - */ - if (!chan) - return; /* * No need to order commit_count, write_offset and cons_offset reads * because we execute at teardown when no more writer nor reader @@ -1157,7 +1170,7 @@ void lib_ring_buffer_print_buffer_errors(struct lib_ring_buffer *buf, write_offset = v_read(config, &buf->offset); cons_offset = atomic_long_read(&buf->consumed); if (write_offset != cons_offset) - printk(KERN_WARNING + printk(KERN_DEBUG "ring buffer %s, cpu %d: " "non-consumed data\n" " [ %lu bytes written, %lu bytes read ]\n", @@ -1176,27 +1189,34 @@ static void lib_ring_buffer_print_errors(struct channel *chan, struct lib_ring_buffer *buf, int cpu) { - const struct lib_ring_buffer_config *config = chan->backend.config; + const struct lib_ring_buffer_config *config = &chan->backend.config; void *priv = chan->backend.priv; - printk(KERN_DEBUG "ring buffer %s, cpu %d: %lu records written, " - "%lu records overrun\n", - chan->backend.name, cpu, - v_read(config, &buf->records_count), - v_read(config, &buf->records_overrun)); - - if (v_read(config, &buf->records_lost_full) - || v_read(config, &buf->records_lost_wrap) - || v_read(config, &buf->records_lost_big)) - printk(KERN_WARNING - "ring buffer %s, cpu %d: records were lost. Caused by:\n" - " [ %lu buffer full, %lu nest buffer wrap-around, " - "%lu event too big ]\n", - chan->backend.name, cpu, - v_read(config, &buf->records_lost_full), - v_read(config, &buf->records_lost_wrap), - v_read(config, &buf->records_lost_big)); - + if (!strcmp(chan->backend.name, "relay-metadata")) { + printk(KERN_DEBUG "ring buffer %s: %lu records written, " + "%lu records overrun\n", + chan->backend.name, + v_read(config, &buf->records_count), + v_read(config, &buf->records_overrun)); + } else { + printk(KERN_DEBUG "ring buffer %s, cpu %d: %lu records written, " + "%lu records overrun\n", + chan->backend.name, cpu, + v_read(config, &buf->records_count), + v_read(config, &buf->records_overrun)); + + if (v_read(config, &buf->records_lost_full) + || v_read(config, &buf->records_lost_wrap) + || v_read(config, &buf->records_lost_big)) + printk(KERN_WARNING + "ring buffer %s, cpu %d: records were lost. Caused by:\n" + " [ %lu buffer full, %lu nest buffer wrap-around, " + "%lu event too big ]\n", + chan->backend.name, cpu, + v_read(config, &buf->records_lost_full), + v_read(config, &buf->records_lost_wrap), + v_read(config, &buf->records_lost_big)); + } lib_ring_buffer_print_buffer_errors(buf, chan, priv, cpu); } @@ -1211,7 +1231,7 @@ void lib_ring_buffer_switch_old_start(struct lib_ring_buffer *buf, struct switch_offsets *offsets, u64 tsc) { - const struct lib_ring_buffer_config *config = chan->backend.config; + const struct lib_ring_buffer_config *config = &chan->backend.config; unsigned long oldidx = subbuf_index(offsets->old, chan); unsigned long commit_count; @@ -1255,7 +1275,7 @@ void lib_ring_buffer_switch_old_end(struct lib_ring_buffer *buf, struct switch_offsets *offsets, u64 tsc) { - const struct lib_ring_buffer_config *config = chan->backend.config; + const struct lib_ring_buffer_config *config = &chan->backend.config; unsigned long oldidx = subbuf_index(offsets->old - 1, chan); unsigned long commit_count, padding_size, data_size; @@ -1298,7 +1318,7 @@ void lib_ring_buffer_switch_new_start(struct lib_ring_buffer *buf, struct switch_offsets *offsets, u64 tsc) { - const struct lib_ring_buffer_config *config = chan->backend.config; + const struct lib_ring_buffer_config *config = &chan->backend.config; unsigned long beginidx = subbuf_index(offsets->begin, chan); unsigned long commit_count; @@ -1328,48 +1348,6 @@ void lib_ring_buffer_switch_new_start(struct lib_ring_buffer *buf, config->cb.subbuffer_header_size()); } -/* - * lib_ring_buffer_switch_new_end: finish switching current subbuffer - * - * The only remaining threads could be the ones with pending commits. They will - * have to do the deliver themselves. - */ -static -void lib_ring_buffer_switch_new_end(struct lib_ring_buffer *buf, - struct channel *chan, - struct switch_offsets *offsets, - u64 tsc) -{ - const struct lib_ring_buffer_config *config = chan->backend.config; - unsigned long endidx = subbuf_index(offsets->end - 1, chan); - unsigned long commit_count, padding_size, data_size; - - data_size = subbuf_offset(offsets->end - 1, chan) + 1; - padding_size = chan->backend.subbuf_size - data_size; - subbuffer_set_data_size(config, &buf->backend, endidx, data_size); - - /* - * Order all writes to buffer before the commit count update that will - * determine that the subbuffer is full. - */ - if (config->ipi == RING_BUFFER_IPI_BARRIER) { - /* - * Must write slot data before incrementing commit count. This - * compiler barrier is upgraded into a smp_mb() by the IPI sent - * by get_subbuf(). - */ - barrier(); - } else - smp_wmb(); - v_add(config, padding_size, &buf->commit_hot[endidx].cc); - commit_count = v_read(config, &buf->commit_hot[endidx].cc); - lib_ring_buffer_check_deliver(config, buf, chan, offsets->end - 1, - commit_count, endidx); - lib_ring_buffer_write_commit_counter(config, buf, chan, endidx, - offsets->end, commit_count, - padding_size); -} - /* * Returns : * 0 if ok @@ -1382,7 +1360,7 @@ int lib_ring_buffer_try_switch_slow(enum switch_mode mode, struct switch_offsets *offsets, u64 *tsc) { - const struct lib_ring_buffer_config *config = chan->backend.config; + const struct lib_ring_buffer_config *config = &chan->backend.config; unsigned long off; offsets->begin = v_read(config, &buf->offset); @@ -1410,6 +1388,19 @@ int lib_ring_buffer_try_switch_slow(enum switch_mode mode, */ if (mode == SWITCH_FLUSH || off > 0) { if (unlikely(off == 0)) { + /* + * A final flush that encounters an empty + * sub-buffer cannot switch buffer if a + * reader is located within this sub-buffer. + * Anyway, the purpose of final flushing of a + * sub-buffer at offset 0 is to handle the case + * of entirely empty stream. + */ + if (unlikely(subbuf_trunc(offsets->begin, chan) + - subbuf_trunc((unsigned long) + atomic_long_read(&buf->consumed), chan) + >= chan->backend.buf_size)) + return -1; /* * The client does not save any header information. * Don't switch empty subbuffer on finalize, because it @@ -1441,7 +1432,7 @@ int lib_ring_buffer_try_switch_slow(enum switch_mode mode, void lib_ring_buffer_switch_slow(struct lib_ring_buffer *buf, enum switch_mode mode) { struct channel *chan = buf->backend.chan; - const struct lib_ring_buffer_config *config = chan->backend.config; + const struct lib_ring_buffer_config *config = &chan->backend.config; struct switch_offsets offsets; unsigned long oldidx; u64 tsc; @@ -1489,6 +1480,48 @@ void lib_ring_buffer_switch_slow(struct lib_ring_buffer *buf, enum switch_mode m } EXPORT_SYMBOL_GPL(lib_ring_buffer_switch_slow); +static void remote_switch(void *info) +{ + struct lib_ring_buffer *buf = info; + + lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE); +} + +void lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf) +{ + struct channel *chan = buf->backend.chan; + const struct lib_ring_buffer_config *config = &chan->backend.config; + int ret; + + /* + * With global synchronization we don't need to use the IPI scheme. + */ + if (config->sync == RING_BUFFER_SYNC_GLOBAL) { + lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE); + return; + } + + /* + * Taking lock on CPU hotplug to ensure two things: first, that the + * target cpu is not taken concurrently offline while we are within + * smp_call_function_single() (I don't trust that get_cpu() on the + * _local_ CPU actually inhibit CPU hotplug for the _remote_ CPU (to be + * confirmed)). Secondly, if it happens that the CPU is not online, our + * own call to lib_ring_buffer_switch_slow() needs to be protected from + * CPU hotplug handlers, which can also perform a remote subbuffer + * switch. + */ + get_online_cpus(); + ret = smp_call_function_single(buf->backend.cpu, + remote_switch, buf, 1); + if (ret) { + /* Remote CPU is offline, do it ourself. */ + lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE); + } + put_online_cpus(); +} +EXPORT_SYMBOL_GPL(lib_ring_buffer_switch_remote); + /* * Returns : * 0 if ok @@ -1502,13 +1535,12 @@ int lib_ring_buffer_try_reserve_slow(struct lib_ring_buffer *buf, struct switch_offsets *offsets, struct lib_ring_buffer_ctx *ctx) { - const struct lib_ring_buffer_config *config = chan->backend.config; + const struct lib_ring_buffer_config *config = &chan->backend.config; unsigned long reserve_commit_diff; offsets->begin = v_read(config, &buf->offset); offsets->old = offsets->begin; offsets->switch_new_start = 0; - offsets->switch_new_end = 0; offsets->switch_old_end = 0; offsets->pre_header_padding = 0; @@ -1615,14 +1647,6 @@ int lib_ring_buffer_try_reserve_slow(struct lib_ring_buffer *buf, */ } offsets->end = offsets->begin + offsets->size; - - if (unlikely(subbuf_offset(offsets->end, chan) == 0)) { - /* - * The offset_end will fall at the very beginning of the next - * subbuffer. - */ - offsets->switch_new_end = 1; /* For offsets->begin */ - } return 0; } @@ -1637,7 +1661,7 @@ int lib_ring_buffer_try_reserve_slow(struct lib_ring_buffer *buf, int lib_ring_buffer_reserve_slow(struct lib_ring_buffer_ctx *ctx) { struct channel *chan = ctx->chan; - const struct lib_ring_buffer_config *config = chan->backend.config; + const struct lib_ring_buffer_config *config = &chan->backend.config; struct lib_ring_buffer *buf; struct switch_offsets offsets; int ret; @@ -1693,12 +1717,26 @@ int lib_ring_buffer_reserve_slow(struct lib_ring_buffer_ctx *ctx) if (unlikely(offsets.switch_new_start)) lib_ring_buffer_switch_new_start(buf, chan, &offsets, ctx->tsc); - if (unlikely(offsets.switch_new_end)) - lib_ring_buffer_switch_new_end(buf, chan, &offsets, ctx->tsc); - ctx->slot_size = offsets.size; ctx->pre_offset = offsets.begin; ctx->buf_offset = offsets.begin + offsets.pre_header_padding; return 0; } EXPORT_SYMBOL_GPL(lib_ring_buffer_reserve_slow); + +int __init init_lib_ring_buffer_frontend(void) +{ + int cpu; + + for_each_possible_cpu(cpu) + spin_lock_init(&per_cpu(ring_buffer_nohz_lock, cpu)); + return 0; +} + +module_init(init_lib_ring_buffer_frontend); + +void __exit exit_lib_ring_buffer_frontend(void) +{ +} + +module_exit(exit_lib_ring_buffer_frontend);