X-Git-Url: http://git.lttng.org/?a=blobdiff_plain;ds=inline;f=lib%2Fringbuffer%2Fring_buffer_frontend.c;h=957d7f3d7ac6184f5401d44099275b6893403e45;hb=daaf627aaa66c434d9274c6616977a6edc07b6ca;hp=07ecc1d0aa56825a374df8b418b63c6670f1dff7;hpb=f3bc08c50e1b302bceea699027d889fd6d9af525;p=lttng-modules.git diff --git a/lib/ringbuffer/ring_buffer_frontend.c b/lib/ringbuffer/ring_buffer_frontend.c index 07ecc1d0..957d7f3d 100644 --- a/lib/ringbuffer/ring_buffer_frontend.c +++ b/lib/ringbuffer/ring_buffer_frontend.c @@ -164,7 +164,6 @@ int lib_ring_buffer_create(struct lib_ring_buffer *buf, const struct lib_ring_buffer_config *config = chanb->config; struct channel *chan = container_of(chanb, struct channel, backend); void *priv = chanb->priv; - unsigned int num_subbuf; size_t subbuf_header_size; u64 tsc; int ret; @@ -203,8 +202,8 @@ int lib_ring_buffer_create(struct lib_ring_buffer *buf, goto free_commit; } - num_subbuf = chan->backend.num_subbuf; init_waitqueue_head(&buf->read_wait); + init_waitqueue_head(&buf->write_wait); raw_spin_lock_init(&buf->raw_tick_nohz_spinlock); /* @@ -410,6 +409,7 @@ int __cpuinit lib_ring_buffer_cpu_hp_callback(struct notifier_block *nb, case CPU_DOWN_FAILED_FROZEN: case CPU_ONLINE: case CPU_ONLINE_FROZEN: + wake_up_interruptible(&chan->hp_wait); lib_ring_buffer_start_switch_timer(buf); lib_ring_buffer_start_read_timer(buf); return NOTIFY_OK; @@ -437,7 +437,7 @@ int __cpuinit lib_ring_buffer_cpu_hp_callback(struct notifier_block *nb, } #endif -#ifdef CONFIG_NO_HZ +#if defined(CONFIG_NO_HZ) && defined(CONFIG_LIB_RING_BUFFER) /* * For per-cpu buffers, call the reader wakeups before switching the buffer, so * that wake-up-tracing generated events are flushed before going idle (in @@ -517,7 +517,7 @@ void notrace lib_ring_buffer_tick_nohz_restart(void) atomic_notifier_call_chain(&tick_nohz_notifier, TICK_NOHZ_RESTART, NULL); } -#endif /* CONFIG_NO_HZ */ +#endif /* defined(CONFIG_NO_HZ) && defined(CONFIG_LIB_RING_BUFFER) */ /* * Holds CPU hotplug. @@ -624,17 +624,19 @@ struct channel *channel_create(const struct lib_ring_buffer_config *config, chan->commit_count_mask = (~0UL >> chan->backend.num_subbuf_order); chan->switch_timer_interval = usecs_to_jiffies(switch_timer_interval); chan->read_timer_interval = usecs_to_jiffies(read_timer_interval); + kref_init(&chan->ref); init_waitqueue_head(&chan->read_wait); + init_waitqueue_head(&chan->hp_wait); if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) { -#ifdef CONFIG_NO_HZ +#if defined(CONFIG_NO_HZ) && defined(CONFIG_LIB_RING_BUFFER) /* Only benefit from NO_HZ idle with per-cpu buffers for now. */ chan->tick_nohz_notifier.notifier_call = ring_buffer_tick_nohz_callback; chan->tick_nohz_notifier.priority = ~0U; atomic_notifier_chain_register(&tick_nohz_notifier, &chan->tick_nohz_notifier); -#endif /* CONFIG_NO_HZ */ +#endif /* defined(CONFIG_NO_HZ) && defined(CONFIG_LIB_RING_BUFFER) */ /* * In case of non-hotplug cpu, if the ring-buffer is allocated @@ -685,16 +687,23 @@ error: } EXPORT_SYMBOL_GPL(channel_create); +static +void channel_release(struct kref *kref) +{ + struct channel *chan = container_of(kref, struct channel, ref); + channel_free(chan); +} + /** * channel_destroy - Finalize, wait for q.s. and destroy channel. * @chan: channel to destroy * * Holds cpu hotplug. - * Call "destroy" callback, finalize channels, wait for readers to release their - * reference, then destroy ring buffer data. Note that when readers have - * completed data consumption of finalized channels, get_subbuf() will return - * -ENODATA. They should release their handle at that point. - * Returns the private data pointer. + * Call "destroy" callback, finalize channels, and then decrement the + * channel reference count. Note that when readers have completed data + * consumption of finalized channels, get_subbuf() will return -ENODATA. + * They should release their handle at that point. Returns the private + * data pointer. */ void *channel_destroy(struct channel *chan) { @@ -740,14 +749,11 @@ void *channel_destroy(struct channel *chan) ACCESS_ONCE(buf->finalized) = 1; wake_up_interruptible(&buf->read_wait); } + ACCESS_ONCE(chan->finalized) = 1; + wake_up_interruptible(&chan->hp_wait); wake_up_interruptible(&chan->read_wait); - - while (atomic_long_read(&chan->read_ref) > 0) - msleep(100); - /* Finish waiting for refcount before free */ - smp_mb(); priv = chan->backend.priv; - channel_free(chan); + kref_put(&chan->ref, channel_release); return priv; } EXPORT_SYMBOL_GPL(channel_destroy); @@ -769,7 +775,7 @@ int lib_ring_buffer_open_read(struct lib_ring_buffer *buf) if (!atomic_long_add_unless(&buf->active_readers, 1, 1)) return -EBUSY; - atomic_long_inc(&chan->read_ref); + kref_get(&chan->ref); smp_mb__after_atomic_inc(); return 0; } @@ -781,8 +787,8 @@ void lib_ring_buffer_release_read(struct lib_ring_buffer *buf) CHAN_WARN_ON(chan, atomic_long_read(&buf->active_readers) != 1); smp_mb__before_atomic_dec(); - atomic_long_dec(&chan->read_ref); atomic_long_dec(&buf->active_readers); + kref_put(&chan->ref, channel_release); } EXPORT_SYMBOL_GPL(lib_ring_buffer_release_read); @@ -862,6 +868,8 @@ EXPORT_SYMBOL_GPL(lib_ring_buffer_snapshot); /** * lib_ring_buffer_put_snapshot - move consumed counter forward + * + * Should only be called from consumer context. * @buf: ring buffer * @consumed_new: new consumed count value */ @@ -883,6 +891,8 @@ void lib_ring_buffer_move_consumer(struct lib_ring_buffer *buf, while ((long) consumed - (long) consumed_new < 0) consumed = atomic_long_cmpxchg(&buf->consumed, consumed, consumed_new); + /* Wake-up the metadata producer */ + wake_up_interruptible(&buf->write_wait); } EXPORT_SYMBOL_GPL(lib_ring_buffer_move_consumer); @@ -1133,12 +1143,6 @@ void lib_ring_buffer_print_buffer_errors(struct lib_ring_buffer *buf, const struct lib_ring_buffer_config *config = chan->backend.config; unsigned long write_offset, cons_offset; - /* - * Can be called in the error path of allocation when - * trans_channel_data is not yet set. - */ - if (!chan) - return; /* * No need to order commit_count, write_offset and cons_offset reads * because we execute at teardown when no more writer nor reader @@ -1147,7 +1151,7 @@ void lib_ring_buffer_print_buffer_errors(struct lib_ring_buffer *buf, write_offset = v_read(config, &buf->offset); cons_offset = atomic_long_read(&buf->consumed); if (write_offset != cons_offset) - printk(KERN_WARNING + printk(KERN_DEBUG "ring buffer %s, cpu %d: " "non-consumed data\n" " [ %lu bytes written, %lu bytes read ]\n", @@ -1482,7 +1486,9 @@ EXPORT_SYMBOL_GPL(lib_ring_buffer_switch_slow); /* * Returns : * 0 if ok - * !0 if execution must be aborted. + * -ENOSPC if event size is too large for packet. + * -ENOBUFS if there is currently not enough space in buffer for the event. + * -EIO if data cannot be written into the buffer for any other reason. */ static int lib_ring_buffer_try_reserve_slow(struct lib_ring_buffer *buf, @@ -1501,18 +1507,19 @@ int lib_ring_buffer_try_reserve_slow(struct lib_ring_buffer *buf, offsets->pre_header_padding = 0; ctx->tsc = config->cb.ring_buffer_clock_read(chan); + if ((int64_t) ctx->tsc == -EIO) + return -EIO; if (last_tsc_overflow(config, buf, ctx->tsc)) - ctx->rflags = RING_BUFFER_RFLAG_FULL_TSC; + ctx->rflags |= RING_BUFFER_RFLAG_FULL_TSC; if (unlikely(subbuf_offset(offsets->begin, ctx->chan) == 0)) { offsets->switch_new_start = 1; /* For offsets->begin */ } else { offsets->size = config->cb.record_header_size(config, chan, offsets->begin, - ctx->data_size, &offsets->pre_header_padding, - ctx->rflags, ctx); + ctx); offsets->size += lib_ring_buffer_align(offsets->begin + offsets->size, ctx->largest_align) @@ -1553,7 +1560,7 @@ int lib_ring_buffer_try_reserve_slow(struct lib_ring_buffer *buf, * and we are full : record is lost. */ v_inc(config, &buf->records_lost_full); - return -1; + return -ENOBUFS; } else { /* * Next subbuffer not being written to, and we @@ -1570,14 +1577,13 @@ int lib_ring_buffer_try_reserve_slow(struct lib_ring_buffer *buf, * many nested writes over a reserve/commit pair. */ v_inc(config, &buf->records_lost_wrap); - return -1; + return -EIO; } offsets->size = config->cb.record_header_size(config, chan, offsets->begin, - ctx->data_size, &offsets->pre_header_padding, - ctx->rflags, ctx); + ctx); offsets->size += lib_ring_buffer_align(offsets->begin + offsets->size, ctx->largest_align) @@ -1589,7 +1595,7 @@ int lib_ring_buffer_try_reserve_slow(struct lib_ring_buffer *buf, * complete the sub-buffer switch. */ v_inc(config, &buf->records_lost_big); - return -1; + return -ENOSPC; } else { /* * We just made a successful buffer switch and the @@ -1618,7 +1624,8 @@ int lib_ring_buffer_try_reserve_slow(struct lib_ring_buffer *buf, * lib_ring_buffer_reserve_slow - Atomic slot reservation in a buffer. * @ctx: ring buffer context. * - * Return : -ENOSPC if not enough space, else returns 0. + * Return : -NOBUFS if not enough space, -ENOSPC if event size too large, + * -EIO for other errors, else returns 0. * It will take care of sub-buffer switching. */ int lib_ring_buffer_reserve_slow(struct lib_ring_buffer_ctx *ctx) @@ -1627,6 +1634,7 @@ int lib_ring_buffer_reserve_slow(struct lib_ring_buffer_ctx *ctx) const struct lib_ring_buffer_config *config = chan->backend.config; struct lib_ring_buffer *buf; struct switch_offsets offsets; + int ret; if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) buf = per_cpu_ptr(chan->backend.buf, ctx->cpu); @@ -1637,9 +1645,10 @@ int lib_ring_buffer_reserve_slow(struct lib_ring_buffer_ctx *ctx) offsets.size = 0; do { - if (unlikely(lib_ring_buffer_try_reserve_slow(buf, chan, &offsets, - ctx))) - return -ENOSPC; + ret = lib_ring_buffer_try_reserve_slow(buf, chan, &offsets, + ctx); + if (unlikely(ret)) + return ret; } while (unlikely(v_cmpxchg(config, &buf->offset, offsets.old, offsets.end) != offsets.old)); @@ -1687,3 +1696,20 @@ int lib_ring_buffer_reserve_slow(struct lib_ring_buffer_ctx *ctx) return 0; } EXPORT_SYMBOL_GPL(lib_ring_buffer_reserve_slow); + +int __init init_lib_ring_buffer_frontend(void) +{ + int cpu; + + for_each_possible_cpu(cpu) + spin_lock_init(&per_cpu(ring_buffer_nohz_lock, cpu)); + return 0; +} + +module_init(init_lib_ring_buffer_frontend); + +void __exit exit_lib_ring_buffer_frontend(void) +{ +} + +module_exit(exit_lib_ring_buffer_frontend);