X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Flib%2Fringbuffer%2Fring_buffer_frontend.c;h=d9e64dff9d15eb616df6b5147f4f0b4ec557da1b;hb=c7d9db61d9c4861b6f344af8f1471a42e00739a8;hp=fca37fbc3a5b143a08cb55b94f762bf664fb7187;hpb=cfa6cc1d0f01c2cfcc1a679abf3a6572d411c309;p=lttng-modules.git diff --git a/src/lib/ringbuffer/ring_buffer_frontend.c b/src/lib/ringbuffer/ring_buffer_frontend.c index fca37fbc..d9e64dff 100644 --- a/src/lib/ringbuffer/ring_buffer_frontend.c +++ b/src/lib/ringbuffer/ring_buffer_frontend.c @@ -133,6 +133,8 @@ void lib_ring_buffer_free(struct lib_ring_buffer *buf) { struct channel *chan = buf->backend.chan; + irq_work_sync(&buf->wakeup_pending); + lib_ring_buffer_print_errors(chan, buf, buf->backend.cpu); lttng_kvfree(buf->commit_hot); lttng_kvfree(buf->commit_cold); @@ -206,6 +208,19 @@ void channel_reset(struct channel *chan) } EXPORT_SYMBOL_GPL(channel_reset); +static void lib_ring_buffer_pending_wakeup_buf(struct irq_work *entry) +{ + struct lib_ring_buffer *buf = container_of(entry, struct lib_ring_buffer, + wakeup_pending); + wake_up_interruptible(&buf->read_wait); +} + +static void lib_ring_buffer_pending_wakeup_chan(struct irq_work *entry) +{ + struct channel *chan = container_of(entry, struct channel, wakeup_pending); + wake_up_interruptible(&chan->read_wait); +} + /* * Must be called under cpu hotplug protection. */ @@ -268,6 +283,7 @@ int lib_ring_buffer_create(struct lib_ring_buffer *buf, init_waitqueue_head(&buf->read_wait); init_waitqueue_head(&buf->write_wait); + init_irq_work(&buf->wakeup_pending, lib_ring_buffer_pending_wakeup_buf); raw_spin_lock_init(&buf->raw_tick_nohz_spinlock); /* @@ -453,7 +469,7 @@ static void lib_ring_buffer_stop_read_timer(struct lib_ring_buffer *buf) buf->read_timer_enabled = 0; } -#if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) +#if (LTTNG_LINUX_VERSION_CODE >= LTTNG_KERNEL_VERSION(4,10,0)) enum cpuhp_state lttng_rb_hp_prepare; enum cpuhp_state lttng_rb_hp_online; @@ -524,7 +540,7 @@ int lttng_cpuhp_rb_frontend_offline(unsigned int cpu, } EXPORT_SYMBOL_GPL(lttng_cpuhp_rb_frontend_offline); -#else /* #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */ +#else /* #if (LTTNG_LINUX_VERSION_CODE >= LTTNG_KERNEL_VERSION(4,10,0)) */ #ifdef CONFIG_HOTPLUG_CPU @@ -586,7 +602,7 @@ int lib_ring_buffer_cpu_hp_callback(struct notifier_block *nb, #endif -#endif /* #else #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */ +#endif /* #else #if (LTTNG_LINUX_VERSION_CODE >= LTTNG_KERNEL_VERSION(4,10,0)) */ #if defined(CONFIG_NO_HZ) && defined(CONFIG_LIB_RING_BUFFER) /* @@ -692,7 +708,7 @@ static void channel_unregister_notifiers(struct channel *chan) * concurrency. */ #endif /* CONFIG_NO_HZ */ -#if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) +#if (LTTNG_LINUX_VERSION_CODE >= LTTNG_KERNEL_VERSION(4,10,0)) { int ret; @@ -703,7 +719,7 @@ static void channel_unregister_notifiers(struct channel *chan) &chan->cpuhp_prepare.node); WARN_ON(ret); } -#else /* #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */ +#else /* #if (LTTNG_LINUX_VERSION_CODE >= LTTNG_KERNEL_VERSION(4,10,0)) */ { int cpu; @@ -727,7 +743,7 @@ static void channel_unregister_notifiers(struct channel *chan) } #endif } -#endif /* #else #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */ +#endif /* #else #if (LTTNG_LINUX_VERSION_CODE >= LTTNG_KERNEL_VERSION(4,10,0)) */ } else { struct lib_ring_buffer *buf = chan->backend.buf; @@ -854,9 +870,10 @@ struct channel *channel_create(const struct lib_ring_buffer_config *config, kref_init(&chan->ref); init_waitqueue_head(&chan->read_wait); init_waitqueue_head(&chan->hp_wait); + init_irq_work(&chan->wakeup_pending, lib_ring_buffer_pending_wakeup_chan); if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) { -#if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) +#if (LTTNG_LINUX_VERSION_CODE >= LTTNG_KERNEL_VERSION(4,10,0)) chan->cpuhp_prepare.component = LTTNG_RING_BUFFER_FRONTEND; ret = cpuhp_state_add_instance_nocalls(lttng_rb_hp_prepare, &chan->cpuhp_prepare.node); @@ -868,7 +885,7 @@ struct channel *channel_create(const struct lib_ring_buffer_config *config, &chan->cpuhp_online.node); if (ret) goto cpuhp_online_error; -#else /* #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */ +#else /* #if (LTTNG_LINUX_VERSION_CODE >= LTTNG_KERNEL_VERSION(4,10,0)) */ { int cpu; /* @@ -904,7 +921,7 @@ struct channel *channel_create(const struct lib_ring_buffer_config *config, } #endif } -#endif /* #else #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */ +#endif /* #else #if (LTTNG_LINUX_VERSION_CODE >= LTTNG_KERNEL_VERSION(4,10,0)) */ #if defined(CONFIG_NO_HZ) && defined(CONFIG_LIB_RING_BUFFER) /* Only benefit from NO_HZ idle with per-cpu buffers for now. */ @@ -924,13 +941,13 @@ struct channel *channel_create(const struct lib_ring_buffer_config *config, return chan; -#if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) +#if (LTTNG_LINUX_VERSION_CODE >= LTTNG_KERNEL_VERSION(4,10,0)) cpuhp_online_error: ret = cpuhp_state_remove_instance_nocalls(lttng_rb_hp_prepare, &chan->cpuhp_prepare.node); WARN_ON(ret); cpuhp_prepare_error: -#endif /* #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */ +#endif /* #if (LTTNG_LINUX_VERSION_CODE >= LTTNG_KERNEL_VERSION(4,10,0)) */ error_free_backend: channel_backend_free(&chan->backend); error: @@ -963,6 +980,8 @@ void *channel_destroy(struct channel *chan) const struct lib_ring_buffer_config *config = &chan->backend.config; void *priv; + irq_work_sync(&chan->wakeup_pending); + channel_unregister_notifiers(chan); if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) { @@ -1074,7 +1093,7 @@ int lib_ring_buffer_snapshot(struct lib_ring_buffer *buf, int finalized; retry: - finalized = READ_ONCE(buf->finalized); + finalized = LTTNG_READ_ONCE(buf->finalized); /* * Read finalized before counters. */ @@ -1245,7 +1264,7 @@ int lib_ring_buffer_get_subbuf(struct lib_ring_buffer *buf, return -EBUSY; } retry: - finalized = READ_ONCE(buf->finalized); + finalized = LTTNG_READ_ONCE(buf->finalized); /* * Read finalized before counters. */ @@ -1455,7 +1474,7 @@ void lib_ring_buffer_print_subbuffer_errors(struct lib_ring_buffer *buf, if (subbuf_offset(commit_count, chan) != 0) printk(KERN_WARNING - "ring buffer %s, cpu %d: " + "LTTng: ring buffer %s, cpu %d: " "commit count in subbuffer %lu,\n" "expecting multiples of %lu bytes\n" " [ %lu bytes committed, %lu bytes reader-visible ]\n", @@ -1463,7 +1482,7 @@ void lib_ring_buffer_print_subbuffer_errors(struct lib_ring_buffer *buf, chan->backend.subbuf_size, commit_count, commit_count_sb); - printk(KERN_DEBUG "ring buffer: %s, cpu %d: %lu bytes committed\n", + printk(KERN_DEBUG "LTTng: ring buffer: %s, cpu %d: %lu bytes committed\n", chan->backend.name, cpu, commit_count); } @@ -1484,7 +1503,7 @@ void lib_ring_buffer_print_buffer_errors(struct lib_ring_buffer *buf, cons_offset = atomic_long_read(&buf->consumed); if (write_offset != cons_offset) printk(KERN_DEBUG - "ring buffer %s, cpu %d: " + "LTTng: ring buffer %s, cpu %d: " "non-consumed data\n" " [ %lu bytes written, %lu bytes read ]\n", chan->backend.name, cpu, write_offset, cons_offset); @@ -1507,13 +1526,13 @@ void lib_ring_buffer_print_records_count(struct channel *chan, const struct lib_ring_buffer_config *config = &chan->backend.config; if (!strcmp(chan->backend.name, "relay-metadata")) { - printk(KERN_DEBUG "ring buffer %s: %lu records written, " + printk(KERN_DEBUG "LTTng: ring buffer %s: %lu records written, " "%lu records overrun\n", chan->backend.name, v_read(config, &buf->records_count), v_read(config, &buf->records_overrun)); } else { - printk(KERN_DEBUG "ring buffer %s, cpu %d: %lu records written, " + printk(KERN_DEBUG "LTTng: ring buffer %s, cpu %d: %lu records written, " "%lu records overrun\n", chan->backend.name, cpu, v_read(config, &buf->records_count), @@ -1542,7 +1561,7 @@ void lib_ring_buffer_print_errors(struct channel *chan, || v_read(config, &buf->records_lost_wrap) || v_read(config, &buf->records_lost_big)) printk(KERN_WARNING - "ring buffer %s, cpu %d: records were lost. Caused by:\n" + "LTTng: ring buffer %s, cpu %d: records were lost. Caused by:\n" " [ %lu buffer full, %lu nest buffer wrap-around, " "%lu event too big ]\n", chan->backend.name, cpu, @@ -1992,14 +2011,14 @@ retry: offsets->switch_old_end = 0; offsets->pre_header_padding = 0; - ctx->tsc = config->cb.ring_buffer_clock_read(chan); - if ((int64_t) ctx->tsc == -EIO) + ctx->priv.tsc = config->cb.ring_buffer_clock_read(chan); + if ((int64_t) ctx->priv.tsc == -EIO) return -EIO; - if (last_tsc_overflow(config, buf, ctx->tsc)) - ctx->rflags |= RING_BUFFER_RFLAG_FULL_TSC; + if (last_tsc_overflow(config, buf, ctx->priv.tsc)) + ctx->priv.rflags |= RING_BUFFER_RFLAG_FULL_TSC; - if (unlikely(subbuf_offset(offsets->begin, ctx->chan) == 0)) { + if (unlikely(subbuf_offset(offsets->begin, ctx->priv.chan) == 0)) { offsets->switch_new_start = 1; /* For offsets->begin */ } else { offsets->size = config->cb.record_header_size(config, chan, @@ -2156,13 +2175,13 @@ EXPORT_SYMBOL_GPL(lib_ring_buffer_lost_event_too_big); int lib_ring_buffer_reserve_slow(struct lib_ring_buffer_ctx *ctx, void *client_ctx) { - struct channel *chan = ctx->chan; + struct channel *chan = ctx->priv.chan; const struct lib_ring_buffer_config *config = &chan->backend.config; struct lib_ring_buffer *buf; struct switch_offsets offsets; int ret; - ctx->buf = buf = get_current_buf(chan, ctx->cpu); + ctx->priv.buf = buf = get_current_buf(chan, ctx->priv.reserve_cpu); offsets.size = 0; do { @@ -2180,7 +2199,7 @@ int lib_ring_buffer_reserve_slow(struct lib_ring_buffer_ctx *ctx, * records, never the opposite (missing a full TSC record when it would * be needed). */ - save_last_tsc(config, buf, ctx->tsc); + save_last_tsc(config, buf, ctx->priv.tsc); /* * Push the reader if necessary @@ -2199,21 +2218,21 @@ int lib_ring_buffer_reserve_slow(struct lib_ring_buffer_ctx *ctx, if (unlikely(offsets.switch_old_end)) { lib_ring_buffer_clear_noref(config, &buf->backend, subbuf_index(offsets.old - 1, chan)); - lib_ring_buffer_switch_old_end(buf, chan, &offsets, ctx->tsc); + lib_ring_buffer_switch_old_end(buf, chan, &offsets, ctx->priv.tsc); } /* * Populate new subbuffer. */ if (unlikely(offsets.switch_new_start)) - lib_ring_buffer_switch_new_start(buf, chan, &offsets, ctx->tsc); + lib_ring_buffer_switch_new_start(buf, chan, &offsets, ctx->priv.tsc); if (unlikely(offsets.switch_new_end)) - lib_ring_buffer_switch_new_end(buf, chan, &offsets, ctx->tsc); + lib_ring_buffer_switch_new_end(buf, chan, &offsets, ctx->priv.tsc); - ctx->slot_size = offsets.size; - ctx->pre_offset = offsets.begin; - ctx->buf_offset = offsets.begin + offsets.pre_header_padding; + ctx->priv.slot_size = offsets.size; + ctx->priv.pre_offset = offsets.begin; + ctx->priv.buf_offset = offsets.begin + offsets.pre_header_padding; return 0; } EXPORT_SYMBOL_GPL(lib_ring_buffer_reserve_slow); @@ -2356,13 +2375,14 @@ void lib_ring_buffer_check_deliver_slow(const struct lib_ring_buffer_config *con commit_count, idx); /* - * RING_BUFFER_WAKEUP_BY_WRITER wakeup is not lock-free. + * RING_BUFFER_WAKEUP_BY_WRITER uses an irq_work to issue + * the wakeups. */ if (config->wakeup == RING_BUFFER_WAKEUP_BY_WRITER && atomic_long_read(&buf->active_readers) && lib_ring_buffer_poll_deliver(config, buf, chan)) { - wake_up_interruptible(&buf->read_wait); - wake_up_interruptible(&chan->read_wait); + irq_work_queue(&buf->wakeup_pending); + irq_work_queue(&chan->wakeup_pending); } }