X-Git-Url: http://git.lttng.org/?a=blobdiff_plain;ds=sidebyside;f=lib%2Fringbuffer%2Fring_buffer_frontend.c;h=3003dd8324458ee62c8e99460b295bc3ab8e481d;hb=5fee13fdbe001aacbdd61052f2ab0be57bfd6c5f;hp=5e633f438f32e57ad2793c4f4e89e56e5a0ce052;hpb=c099397a53087b8c616a7feaef0c26d939b9662f;p=lttng-modules.git diff --git a/lib/ringbuffer/ring_buffer_frontend.c b/lib/ringbuffer/ring_buffer_frontend.c index 5e633f43..3003dd83 100644 --- a/lib/ringbuffer/ring_buffer_frontend.c +++ b/lib/ringbuffer/ring_buffer_frontend.c @@ -164,7 +164,6 @@ int lib_ring_buffer_create(struct lib_ring_buffer *buf, const struct lib_ring_buffer_config *config = chanb->config; struct channel *chan = container_of(chanb, struct channel, backend); void *priv = chanb->priv; - unsigned int num_subbuf; size_t subbuf_header_size; u64 tsc; int ret; @@ -203,8 +202,8 @@ int lib_ring_buffer_create(struct lib_ring_buffer *buf, goto free_commit; } - num_subbuf = chan->backend.num_subbuf; init_waitqueue_head(&buf->read_wait); + init_waitqueue_head(&buf->write_wait); raw_spin_lock_init(&buf->raw_tick_nohz_spinlock); /* @@ -410,6 +409,7 @@ int __cpuinit lib_ring_buffer_cpu_hp_callback(struct notifier_block *nb, case CPU_DOWN_FAILED_FROZEN: case CPU_ONLINE: case CPU_ONLINE_FROZEN: + wake_up_interruptible(&chan->hp_wait); lib_ring_buffer_start_switch_timer(buf); lib_ring_buffer_start_read_timer(buf); return NOTIFY_OK; @@ -624,7 +624,9 @@ struct channel *channel_create(const struct lib_ring_buffer_config *config, chan->commit_count_mask = (~0UL >> chan->backend.num_subbuf_order); chan->switch_timer_interval = usecs_to_jiffies(switch_timer_interval); chan->read_timer_interval = usecs_to_jiffies(read_timer_interval); + kref_init(&chan->ref); init_waitqueue_head(&chan->read_wait); + init_waitqueue_head(&chan->hp_wait); if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) { #if defined(CONFIG_NO_HZ) && defined(CONFIG_LIB_RING_BUFFER) @@ -685,16 +687,23 @@ error: } EXPORT_SYMBOL_GPL(channel_create); +static +void channel_release(struct kref *kref) +{ + struct channel *chan = container_of(kref, struct channel, ref); + channel_free(chan); +} + /** * channel_destroy - Finalize, wait for q.s. and destroy channel. * @chan: channel to destroy * * Holds cpu hotplug. - * Call "destroy" callback, finalize channels, wait for readers to release their - * reference, then destroy ring buffer data. Note that when readers have - * completed data consumption of finalized channels, get_subbuf() will return - * -ENODATA. They should release their handle at that point. - * Returns the private data pointer. + * Call "destroy" callback, finalize channels, and then decrement the + * channel reference count. Note that when readers have completed data + * consumption of finalized channels, get_subbuf() will return -ENODATA. + * They should release their handle at that point. Returns the private + * data pointer. */ void *channel_destroy(struct channel *chan) { @@ -740,14 +749,11 @@ void *channel_destroy(struct channel *chan) ACCESS_ONCE(buf->finalized) = 1; wake_up_interruptible(&buf->read_wait); } + ACCESS_ONCE(chan->finalized) = 1; + wake_up_interruptible(&chan->hp_wait); wake_up_interruptible(&chan->read_wait); - - while (atomic_long_read(&chan->read_ref) > 0) - msleep(100); - /* Finish waiting for refcount before free */ - smp_mb(); priv = chan->backend.priv; - channel_free(chan); + kref_put(&chan->ref, channel_release); return priv; } EXPORT_SYMBOL_GPL(channel_destroy); @@ -769,7 +775,7 @@ int lib_ring_buffer_open_read(struct lib_ring_buffer *buf) if (!atomic_long_add_unless(&buf->active_readers, 1, 1)) return -EBUSY; - atomic_long_inc(&chan->read_ref); + kref_get(&chan->ref); smp_mb__after_atomic_inc(); return 0; } @@ -781,8 +787,8 @@ void lib_ring_buffer_release_read(struct lib_ring_buffer *buf) CHAN_WARN_ON(chan, atomic_long_read(&buf->active_readers) != 1); smp_mb__before_atomic_dec(); - atomic_long_dec(&chan->read_ref); atomic_long_dec(&buf->active_readers); + kref_put(&chan->ref, channel_release); } EXPORT_SYMBOL_GPL(lib_ring_buffer_release_read); @@ -862,6 +868,8 @@ EXPORT_SYMBOL_GPL(lib_ring_buffer_snapshot); /** * lib_ring_buffer_put_snapshot - move consumed counter forward + * + * Should only be called from consumer context. * @buf: ring buffer * @consumed_new: new consumed count value */ @@ -883,6 +891,8 @@ void lib_ring_buffer_move_consumer(struct lib_ring_buffer *buf, while ((long) consumed - (long) consumed_new < 0) consumed = atomic_long_cmpxchg(&buf->consumed, consumed, consumed_new); + /* Wake-up the metadata producer */ + wake_up_interruptible(&buf->write_wait); } EXPORT_SYMBOL_GPL(lib_ring_buffer_move_consumer); @@ -1507,16 +1517,15 @@ int lib_ring_buffer_try_reserve_slow(struct lib_ring_buffer *buf, return -EIO; if (last_tsc_overflow(config, buf, ctx->tsc)) - ctx->rflags = RING_BUFFER_RFLAG_FULL_TSC; + ctx->rflags |= RING_BUFFER_RFLAG_FULL_TSC; if (unlikely(subbuf_offset(offsets->begin, ctx->chan) == 0)) { offsets->switch_new_start = 1; /* For offsets->begin */ } else { offsets->size = config->cb.record_header_size(config, chan, offsets->begin, - ctx->data_size, &offsets->pre_header_padding, - ctx->rflags, ctx); + ctx); offsets->size += lib_ring_buffer_align(offsets->begin + offsets->size, ctx->largest_align) @@ -1579,9 +1588,8 @@ int lib_ring_buffer_try_reserve_slow(struct lib_ring_buffer *buf, offsets->size = config->cb.record_header_size(config, chan, offsets->begin, - ctx->data_size, &offsets->pre_header_padding, - ctx->rflags, ctx); + ctx); offsets->size += lib_ring_buffer_align(offsets->begin + offsets->size, ctx->largest_align)