lib ring buffer initial import
[lttng-ust.git] / libringbuffer / ring_buffer_frontend.c
CommitLineData
852c2936
MD
1/*
2 * ring_buffer_frontend.c
3 *
4 * (C) Copyright 2005-2010 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
5 *
6 * Ring buffer wait-free buffer synchronization. Producer-consumer and flight
7 * recorder (overwrite) modes. See thesis:
8 *
9 * Desnoyers, Mathieu (2009), "Low-Impact Operating System Tracing", Ph.D.
10 * dissertation, Ecole Polytechnique de Montreal.
11 * http://www.lttng.org/pub/thesis/desnoyers-dissertation-2009-12.pdf
12 *
13 * - Algorithm presentation in Chapter 5:
14 * "Lockless Multi-Core High-Throughput Buffering".
15 * - Algorithm formal verification in Section 8.6:
16 * "Formal verification of LTTng"
17 *
18 * Author:
19 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
20 *
21 * Inspired from LTT and RelayFS:
22 * Karim Yaghmour <karim@opersys.com>
23 * Tom Zanussi <zanussi@us.ibm.com>
24 * Bob Wisniewski <bob@watson.ibm.com>
25 * And from K42 :
26 * Bob Wisniewski <bob@watson.ibm.com>
27 *
28 * Buffer reader semantic :
29 *
30 * - get_subbuf_size
31 * while buffer is not finalized and empty
32 * - get_subbuf
33 * - if return value != 0, continue
34 * - splice one subbuffer worth of data to a pipe
35 * - splice the data from pipe to disk/network
36 * - put_subbuf
37 *
38 * Dual LGPL v2.1/GPL v2 license.
39 */
40
41#include <linux/delay.h>
42#include <linux/module.h>
43#include <linux/percpu.h>
44
45#include "../../wrapper/ringbuffer/config.h"
46#include "../../wrapper/ringbuffer/backend.h"
47#include "../../wrapper/ringbuffer/frontend.h"
48#include "../../wrapper/ringbuffer/iterator.h"
49#include "../../wrapper/ringbuffer/nohz.h"
50
51/*
52 * Internal structure representing offsets to use at a sub-buffer switch.
53 */
54struct switch_offsets {
55 unsigned long begin, end, old;
56 size_t pre_header_padding, size;
57 unsigned int switch_new_start:1, switch_new_end:1, switch_old_start:1,
58 switch_old_end:1;
59};
60
61#ifdef CONFIG_NO_HZ
62enum tick_nohz_val {
63 TICK_NOHZ_STOP,
64 TICK_NOHZ_FLUSH,
65 TICK_NOHZ_RESTART,
66};
67
68static ATOMIC_NOTIFIER_HEAD(tick_nohz_notifier);
69#endif /* CONFIG_NO_HZ */
70
71static DEFINE_PER_CPU(spinlock_t, ring_buffer_nohz_lock);
72
73DEFINE_PER_CPU(unsigned int, lib_ring_buffer_nesting);
74EXPORT_PER_CPU_SYMBOL(lib_ring_buffer_nesting);
75
76static
77void lib_ring_buffer_print_errors(struct channel *chan,
78 struct lib_ring_buffer *buf, int cpu);
79
80/*
81 * Must be called under cpu hotplug protection.
82 */
83void lib_ring_buffer_free(struct lib_ring_buffer *buf)
84{
85 struct channel *chan = buf->backend.chan;
86
87 lib_ring_buffer_print_errors(chan, buf, buf->backend.cpu);
88 kfree(buf->commit_hot);
89 kfree(buf->commit_cold);
90
91 lib_ring_buffer_backend_free(&buf->backend);
92}
93
94/**
95 * lib_ring_buffer_reset - Reset ring buffer to initial values.
96 * @buf: Ring buffer.
97 *
98 * Effectively empty the ring buffer. Should be called when the buffer is not
99 * used for writing. The ring buffer can be opened for reading, but the reader
100 * should not be using the iterator concurrently with reset. The previous
101 * current iterator record is reset.
102 */
103void lib_ring_buffer_reset(struct lib_ring_buffer *buf)
104{
105 struct channel *chan = buf->backend.chan;
106 const struct lib_ring_buffer_config *config = chan->backend.config;
107 unsigned int i;
108
109 /*
110 * Reset iterator first. It will put the subbuffer if it currently holds
111 * it.
112 */
113 lib_ring_buffer_iterator_reset(buf);
114 v_set(config, &buf->offset, 0);
115 for (i = 0; i < chan->backend.num_subbuf; i++) {
116 v_set(config, &buf->commit_hot[i].cc, 0);
117 v_set(config, &buf->commit_hot[i].seq, 0);
118 v_set(config, &buf->commit_cold[i].cc_sb, 0);
119 }
120 atomic_long_set(&buf->consumed, 0);
121 atomic_set(&buf->record_disabled, 0);
122 v_set(config, &buf->last_tsc, 0);
123 lib_ring_buffer_backend_reset(&buf->backend);
124 /* Don't reset number of active readers */
125 v_set(config, &buf->records_lost_full, 0);
126 v_set(config, &buf->records_lost_wrap, 0);
127 v_set(config, &buf->records_lost_big, 0);
128 v_set(config, &buf->records_count, 0);
129 v_set(config, &buf->records_overrun, 0);
130 buf->finalized = 0;
131}
132EXPORT_SYMBOL_GPL(lib_ring_buffer_reset);
133
134/**
135 * channel_reset - Reset channel to initial values.
136 * @chan: Channel.
137 *
138 * Effectively empty the channel. Should be called when the channel is not used
139 * for writing. The channel can be opened for reading, but the reader should not
140 * be using the iterator concurrently with reset. The previous current iterator
141 * record is reset.
142 */
143void channel_reset(struct channel *chan)
144{
145 /*
146 * Reset iterators first. Will put the subbuffer if held for reading.
147 */
148 channel_iterator_reset(chan);
149 atomic_set(&chan->record_disabled, 0);
150 /* Don't reset commit_count_mask, still valid */
151 channel_backend_reset(&chan->backend);
152 /* Don't reset switch/read timer interval */
153 /* Don't reset notifiers and notifier enable bits */
154 /* Don't reset reader reference count */
155}
156EXPORT_SYMBOL_GPL(channel_reset);
157
158/*
159 * Must be called under cpu hotplug protection.
160 */
161int lib_ring_buffer_create(struct lib_ring_buffer *buf,
162 struct channel_backend *chanb, int cpu)
163{
164 const struct lib_ring_buffer_config *config = chanb->config;
165 struct channel *chan = container_of(chanb, struct channel, backend);
166 void *priv = chanb->priv;
167 unsigned int num_subbuf;
168 size_t subbuf_header_size;
169 u64 tsc;
170 int ret;
171
172 /* Test for cpu hotplug */
173 if (buf->backend.allocated)
174 return 0;
175
176 /*
177 * Paranoia: per cpu dynamic allocation is not officially documented as
178 * zeroing the memory, so let's do it here too, just in case.
179 */
180 memset(buf, 0, sizeof(*buf));
181
182 ret = lib_ring_buffer_backend_create(&buf->backend, &chan->backend, cpu);
183 if (ret)
184 return ret;
185
186 buf->commit_hot =
187 kzalloc_node(ALIGN(sizeof(*buf->commit_hot)
188 * chan->backend.num_subbuf,
189 1 << INTERNODE_CACHE_SHIFT),
190 GFP_KERNEL, cpu_to_node(max(cpu, 0)));
191 if (!buf->commit_hot) {
192 ret = -ENOMEM;
193 goto free_chanbuf;
194 }
195
196 buf->commit_cold =
197 kzalloc_node(ALIGN(sizeof(*buf->commit_cold)
198 * chan->backend.num_subbuf,
199 1 << INTERNODE_CACHE_SHIFT),
200 GFP_KERNEL, cpu_to_node(max(cpu, 0)));
201 if (!buf->commit_cold) {
202 ret = -ENOMEM;
203 goto free_commit;
204 }
205
206 num_subbuf = chan->backend.num_subbuf;
207 init_waitqueue_head(&buf->read_wait);
208 raw_spin_lock_init(&buf->raw_tick_nohz_spinlock);
209
210 /*
211 * Write the subbuffer header for first subbuffer so we know the total
212 * duration of data gathering.
213 */
214 subbuf_header_size = config->cb.subbuffer_header_size();
215 v_set(config, &buf->offset, subbuf_header_size);
216 subbuffer_id_clear_noref(config, &buf->backend.buf_wsb[0].id);
217 tsc = config->cb.ring_buffer_clock_read(buf->backend.chan);
218 config->cb.buffer_begin(buf, tsc, 0);
219 v_add(config, subbuf_header_size, &buf->commit_hot[0].cc);
220
221 if (config->cb.buffer_create) {
222 ret = config->cb.buffer_create(buf, priv, cpu, chanb->name);
223 if (ret)
224 goto free_init;
225 }
226
227 /*
228 * Ensure the buffer is ready before setting it to allocated and setting
229 * the cpumask.
230 * Used for cpu hotplug vs cpumask iteration.
231 */
232 smp_wmb();
233 buf->backend.allocated = 1;
234
235 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
236 CHAN_WARN_ON(chan, cpumask_test_cpu(cpu,
237 chan->backend.cpumask));
238 cpumask_set_cpu(cpu, chan->backend.cpumask);
239 }
240
241 return 0;
242
243 /* Error handling */
244free_init:
245 kfree(buf->commit_cold);
246free_commit:
247 kfree(buf->commit_hot);
248free_chanbuf:
249 lib_ring_buffer_backend_free(&buf->backend);
250 return ret;
251}
252
253static void switch_buffer_timer(unsigned long data)
254{
255 struct lib_ring_buffer *buf = (struct lib_ring_buffer *)data;
256 struct channel *chan = buf->backend.chan;
257 const struct lib_ring_buffer_config *config = chan->backend.config;
258
259 /*
260 * Only flush buffers periodically if readers are active.
261 */
262 if (atomic_long_read(&buf->active_readers))
263 lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
264
265 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
266 mod_timer_pinned(&buf->switch_timer,
267 jiffies + chan->switch_timer_interval);
268 else
269 mod_timer(&buf->switch_timer,
270 jiffies + chan->switch_timer_interval);
271}
272
273/*
274 * Called with ring_buffer_nohz_lock held for per-cpu buffers.
275 */
276static void lib_ring_buffer_start_switch_timer(struct lib_ring_buffer *buf)
277{
278 struct channel *chan = buf->backend.chan;
279 const struct lib_ring_buffer_config *config = chan->backend.config;
280
281 if (!chan->switch_timer_interval || buf->switch_timer_enabled)
282 return;
283 init_timer(&buf->switch_timer);
284 buf->switch_timer.function = switch_buffer_timer;
285 buf->switch_timer.expires = jiffies + chan->switch_timer_interval;
286 buf->switch_timer.data = (unsigned long)buf;
287 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
288 add_timer_on(&buf->switch_timer, buf->backend.cpu);
289 else
290 add_timer(&buf->switch_timer);
291 buf->switch_timer_enabled = 1;
292}
293
294/*
295 * Called with ring_buffer_nohz_lock held for per-cpu buffers.
296 */
297static void lib_ring_buffer_stop_switch_timer(struct lib_ring_buffer *buf)
298{
299 struct channel *chan = buf->backend.chan;
300
301 if (!chan->switch_timer_interval || !buf->switch_timer_enabled)
302 return;
303
304 del_timer_sync(&buf->switch_timer);
305 buf->switch_timer_enabled = 0;
306}
307
308/*
309 * Polling timer to check the channels for data.
310 */
311static void read_buffer_timer(unsigned long data)
312{
313 struct lib_ring_buffer *buf = (struct lib_ring_buffer *)data;
314 struct channel *chan = buf->backend.chan;
315 const struct lib_ring_buffer_config *config = chan->backend.config;
316
317 CHAN_WARN_ON(chan, !buf->backend.allocated);
318
319 if (atomic_long_read(&buf->active_readers)
320 && lib_ring_buffer_poll_deliver(config, buf, chan)) {
321 wake_up_interruptible(&buf->read_wait);
322 wake_up_interruptible(&chan->read_wait);
323 }
324
325 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
326 mod_timer_pinned(&buf->read_timer,
327 jiffies + chan->read_timer_interval);
328 else
329 mod_timer(&buf->read_timer,
330 jiffies + chan->read_timer_interval);
331}
332
333/*
334 * Called with ring_buffer_nohz_lock held for per-cpu buffers.
335 */
336static void lib_ring_buffer_start_read_timer(struct lib_ring_buffer *buf)
337{
338 struct channel *chan = buf->backend.chan;
339 const struct lib_ring_buffer_config *config = chan->backend.config;
340
341 if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER
342 || !chan->read_timer_interval
343 || buf->read_timer_enabled)
344 return;
345
346 init_timer(&buf->read_timer);
347 buf->read_timer.function = read_buffer_timer;
348 buf->read_timer.expires = jiffies + chan->read_timer_interval;
349 buf->read_timer.data = (unsigned long)buf;
350
351 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
352 add_timer_on(&buf->read_timer, buf->backend.cpu);
353 else
354 add_timer(&buf->read_timer);
355 buf->read_timer_enabled = 1;
356}
357
358/*
359 * Called with ring_buffer_nohz_lock held for per-cpu buffers.
360 */
361static void lib_ring_buffer_stop_read_timer(struct lib_ring_buffer *buf)
362{
363 struct channel *chan = buf->backend.chan;
364 const struct lib_ring_buffer_config *config = chan->backend.config;
365
366 if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER
367 || !chan->read_timer_interval
368 || !buf->read_timer_enabled)
369 return;
370
371 del_timer_sync(&buf->read_timer);
372 /*
373 * do one more check to catch data that has been written in the last
374 * timer period.
375 */
376 if (lib_ring_buffer_poll_deliver(config, buf, chan)) {
377 wake_up_interruptible(&buf->read_wait);
378 wake_up_interruptible(&chan->read_wait);
379 }
380 buf->read_timer_enabled = 0;
381}
382
383#ifdef CONFIG_HOTPLUG_CPU
384/**
385 * lib_ring_buffer_cpu_hp_callback - CPU hotplug callback
386 * @nb: notifier block
387 * @action: hotplug action to take
388 * @hcpu: CPU number
389 *
390 * Returns the success/failure of the operation. (%NOTIFY_OK, %NOTIFY_BAD)
391 */
392static
393int __cpuinit lib_ring_buffer_cpu_hp_callback(struct notifier_block *nb,
394 unsigned long action,
395 void *hcpu)
396{
397 unsigned int cpu = (unsigned long)hcpu;
398 struct channel *chan = container_of(nb, struct channel,
399 cpu_hp_notifier);
400 struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu);
401 const struct lib_ring_buffer_config *config = chan->backend.config;
402
403 if (!chan->cpu_hp_enable)
404 return NOTIFY_DONE;
405
406 CHAN_WARN_ON(chan, config->alloc == RING_BUFFER_ALLOC_GLOBAL);
407
408 switch (action) {
409 case CPU_DOWN_FAILED:
410 case CPU_DOWN_FAILED_FROZEN:
411 case CPU_ONLINE:
412 case CPU_ONLINE_FROZEN:
413 wake_up_interruptible(&chan->hp_wait);
414 lib_ring_buffer_start_switch_timer(buf);
415 lib_ring_buffer_start_read_timer(buf);
416 return NOTIFY_OK;
417
418 case CPU_DOWN_PREPARE:
419 case CPU_DOWN_PREPARE_FROZEN:
420 lib_ring_buffer_stop_switch_timer(buf);
421 lib_ring_buffer_stop_read_timer(buf);
422 return NOTIFY_OK;
423
424 case CPU_DEAD:
425 case CPU_DEAD_FROZEN:
426 /*
427 * Performing a buffer switch on a remote CPU. Performed by
428 * the CPU responsible for doing the hotunplug after the target
429 * CPU stopped running completely. Ensures that all data
430 * from that remote CPU is flushed.
431 */
432 lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
433 return NOTIFY_OK;
434
435 default:
436 return NOTIFY_DONE;
437 }
438}
439#endif
440
441#if defined(CONFIG_NO_HZ) && defined(CONFIG_LIB_RING_BUFFER)
442/*
443 * For per-cpu buffers, call the reader wakeups before switching the buffer, so
444 * that wake-up-tracing generated events are flushed before going idle (in
445 * tick_nohz). We test if the spinlock is locked to deal with the race where
446 * readers try to sample the ring buffer before we perform the switch. We let
447 * the readers retry in that case. If there is data in the buffer, the wake up
448 * is going to forbid the CPU running the reader thread from going idle.
449 */
450static int notrace ring_buffer_tick_nohz_callback(struct notifier_block *nb,
451 unsigned long val,
452 void *data)
453{
454 struct channel *chan = container_of(nb, struct channel,
455 tick_nohz_notifier);
456 const struct lib_ring_buffer_config *config = chan->backend.config;
457 struct lib_ring_buffer *buf;
458 int cpu = smp_processor_id();
459
460 if (config->alloc != RING_BUFFER_ALLOC_PER_CPU) {
461 /*
462 * We don't support keeping the system idle with global buffers
463 * and streaming active. In order to do so, we would need to
464 * sample a non-nohz-cpumask racelessly with the nohz updates
465 * without adding synchronization overhead to nohz. Leave this
466 * use-case out for now.
467 */
468 return 0;
469 }
470
471 buf = channel_get_ring_buffer(config, chan, cpu);
472 switch (val) {
473 case TICK_NOHZ_FLUSH:
474 raw_spin_lock(&buf->raw_tick_nohz_spinlock);
475 if (config->wakeup == RING_BUFFER_WAKEUP_BY_TIMER
476 && chan->read_timer_interval
477 && atomic_long_read(&buf->active_readers)
478 && (lib_ring_buffer_poll_deliver(config, buf, chan)
479 || lib_ring_buffer_pending_data(config, buf, chan))) {
480 wake_up_interruptible(&buf->read_wait);
481 wake_up_interruptible(&chan->read_wait);
482 }
483 if (chan->switch_timer_interval)
484 lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
485 raw_spin_unlock(&buf->raw_tick_nohz_spinlock);
486 break;
487 case TICK_NOHZ_STOP:
488 spin_lock(&__get_cpu_var(ring_buffer_nohz_lock));
489 lib_ring_buffer_stop_switch_timer(buf);
490 lib_ring_buffer_stop_read_timer(buf);
491 spin_unlock(&__get_cpu_var(ring_buffer_nohz_lock));
492 break;
493 case TICK_NOHZ_RESTART:
494 spin_lock(&__get_cpu_var(ring_buffer_nohz_lock));
495 lib_ring_buffer_start_read_timer(buf);
496 lib_ring_buffer_start_switch_timer(buf);
497 spin_unlock(&__get_cpu_var(ring_buffer_nohz_lock));
498 break;
499 }
500
501 return 0;
502}
503
504void notrace lib_ring_buffer_tick_nohz_flush(void)
505{
506 atomic_notifier_call_chain(&tick_nohz_notifier, TICK_NOHZ_FLUSH,
507 NULL);
508}
509
510void notrace lib_ring_buffer_tick_nohz_stop(void)
511{
512 atomic_notifier_call_chain(&tick_nohz_notifier, TICK_NOHZ_STOP,
513 NULL);
514}
515
516void notrace lib_ring_buffer_tick_nohz_restart(void)
517{
518 atomic_notifier_call_chain(&tick_nohz_notifier, TICK_NOHZ_RESTART,
519 NULL);
520}
521#endif /* defined(CONFIG_NO_HZ) && defined(CONFIG_LIB_RING_BUFFER) */
522
523/*
524 * Holds CPU hotplug.
525 */
526static void channel_unregister_notifiers(struct channel *chan)
527{
528 const struct lib_ring_buffer_config *config = chan->backend.config;
529 int cpu;
530
531 channel_iterator_unregister_notifiers(chan);
532 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
533#ifdef CONFIG_NO_HZ
534 /*
535 * Remove the nohz notifier first, so we are certain we stop
536 * the timers.
537 */
538 atomic_notifier_chain_unregister(&tick_nohz_notifier,
539 &chan->tick_nohz_notifier);
540 /*
541 * ring_buffer_nohz_lock will not be needed below, because
542 * we just removed the notifiers, which were the only source of
543 * concurrency.
544 */
545#endif /* CONFIG_NO_HZ */
546#ifdef CONFIG_HOTPLUG_CPU
547 get_online_cpus();
548 chan->cpu_hp_enable = 0;
549 for_each_online_cpu(cpu) {
550 struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
551 cpu);
552 lib_ring_buffer_stop_switch_timer(buf);
553 lib_ring_buffer_stop_read_timer(buf);
554 }
555 put_online_cpus();
556 unregister_cpu_notifier(&chan->cpu_hp_notifier);
557#else
558 for_each_possible_cpu(cpu) {
559 struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
560 cpu);
561 lib_ring_buffer_stop_switch_timer(buf);
562 lib_ring_buffer_stop_read_timer(buf);
563 }
564#endif
565 } else {
566 struct lib_ring_buffer *buf = chan->backend.buf;
567
568 lib_ring_buffer_stop_switch_timer(buf);
569 lib_ring_buffer_stop_read_timer(buf);
570 }
571 channel_backend_unregister_notifiers(&chan->backend);
572}
573
574static void channel_free(struct channel *chan)
575{
576 channel_iterator_free(chan);
577 channel_backend_free(&chan->backend);
578 kfree(chan);
579}
580
581/**
582 * channel_create - Create channel.
583 * @config: ring buffer instance configuration
584 * @name: name of the channel
585 * @priv: ring buffer client private data
586 * @buf_addr: pointer the the beginning of the preallocated buffer contiguous
587 * address mapping. It is used only by RING_BUFFER_STATIC
588 * configuration. It can be set to NULL for other backends.
589 * @subbuf_size: subbuffer size
590 * @num_subbuf: number of subbuffers
591 * @switch_timer_interval: Time interval (in us) to fill sub-buffers with
592 * padding to let readers get those sub-buffers.
593 * Used for live streaming.
594 * @read_timer_interval: Time interval (in us) to wake up pending readers.
595 *
596 * Holds cpu hotplug.
597 * Returns NULL on failure.
598 */
599struct channel *channel_create(const struct lib_ring_buffer_config *config,
600 const char *name, void *priv, void *buf_addr,
601 size_t subbuf_size,
602 size_t num_subbuf, unsigned int switch_timer_interval,
603 unsigned int read_timer_interval)
604{
605 int ret, cpu;
606 struct channel *chan;
607
608 if (lib_ring_buffer_check_config(config, switch_timer_interval,
609 read_timer_interval))
610 return NULL;
611
612 chan = kzalloc(sizeof(struct channel), GFP_KERNEL);
613 if (!chan)
614 return NULL;
615
616 ret = channel_backend_init(&chan->backend, name, config, priv,
617 subbuf_size, num_subbuf);
618 if (ret)
619 goto error;
620
621 ret = channel_iterator_init(chan);
622 if (ret)
623 goto error_free_backend;
624
625 chan->commit_count_mask = (~0UL >> chan->backend.num_subbuf_order);
626 chan->switch_timer_interval = usecs_to_jiffies(switch_timer_interval);
627 chan->read_timer_interval = usecs_to_jiffies(read_timer_interval);
628 kref_init(&chan->ref);
629 init_waitqueue_head(&chan->read_wait);
630 init_waitqueue_head(&chan->hp_wait);
631
632 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
633#if defined(CONFIG_NO_HZ) && defined(CONFIG_LIB_RING_BUFFER)
634 /* Only benefit from NO_HZ idle with per-cpu buffers for now. */
635 chan->tick_nohz_notifier.notifier_call =
636 ring_buffer_tick_nohz_callback;
637 chan->tick_nohz_notifier.priority = ~0U;
638 atomic_notifier_chain_register(&tick_nohz_notifier,
639 &chan->tick_nohz_notifier);
640#endif /* defined(CONFIG_NO_HZ) && defined(CONFIG_LIB_RING_BUFFER) */
641
642 /*
643 * In case of non-hotplug cpu, if the ring-buffer is allocated
644 * in early initcall, it will not be notified of secondary cpus.
645 * In that off case, we need to allocate for all possible cpus.
646 */
647#ifdef CONFIG_HOTPLUG_CPU
648 chan->cpu_hp_notifier.notifier_call =
649 lib_ring_buffer_cpu_hp_callback;
650 chan->cpu_hp_notifier.priority = 6;
651 register_cpu_notifier(&chan->cpu_hp_notifier);
652
653 get_online_cpus();
654 for_each_online_cpu(cpu) {
655 struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
656 cpu);
657 spin_lock(&per_cpu(ring_buffer_nohz_lock, cpu));
658 lib_ring_buffer_start_switch_timer(buf);
659 lib_ring_buffer_start_read_timer(buf);
660 spin_unlock(&per_cpu(ring_buffer_nohz_lock, cpu));
661 }
662 chan->cpu_hp_enable = 1;
663 put_online_cpus();
664#else
665 for_each_possible_cpu(cpu) {
666 struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
667 cpu);
668 spin_lock(&per_cpu(ring_buffer_nohz_lock, cpu));
669 lib_ring_buffer_start_switch_timer(buf);
670 lib_ring_buffer_start_read_timer(buf);
671 spin_unlock(&per_cpu(ring_buffer_nohz_lock, cpu));
672 }
673#endif
674 } else {
675 struct lib_ring_buffer *buf = chan->backend.buf;
676
677 lib_ring_buffer_start_switch_timer(buf);
678 lib_ring_buffer_start_read_timer(buf);
679 }
680
681 return chan;
682
683error_free_backend:
684 channel_backend_free(&chan->backend);
685error:
686 kfree(chan);
687 return NULL;
688}
689EXPORT_SYMBOL_GPL(channel_create);
690
691static
692void channel_release(struct kref *kref)
693{
694 struct channel *chan = container_of(kref, struct channel, ref);
695 channel_free(chan);
696}
697
698/**
699 * channel_destroy - Finalize, wait for q.s. and destroy channel.
700 * @chan: channel to destroy
701 *
702 * Holds cpu hotplug.
703 * Call "destroy" callback, finalize channels, wait for readers to release their
704 * reference, then destroy ring buffer data. Note that when readers have
705 * completed data consumption of finalized channels, get_subbuf() will return
706 * -ENODATA. They should release their handle at that point.
707 * Returns the private data pointer.
708 */
709void *channel_destroy(struct channel *chan)
710{
711 int cpu;
712 const struct lib_ring_buffer_config *config = chan->backend.config;
713 void *priv;
714
715 channel_unregister_notifiers(chan);
716
717 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
718 /*
719 * No need to hold cpu hotplug, because all notifiers have been
720 * unregistered.
721 */
722 for_each_channel_cpu(cpu, chan) {
723 struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
724 cpu);
725
726 if (config->cb.buffer_finalize)
727 config->cb.buffer_finalize(buf,
728 chan->backend.priv,
729 cpu);
730 if (buf->backend.allocated)
731 lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH);
732 /*
733 * Perform flush before writing to finalized.
734 */
735 smp_wmb();
736 ACCESS_ONCE(buf->finalized) = 1;
737 wake_up_interruptible(&buf->read_wait);
738 }
739 } else {
740 struct lib_ring_buffer *buf = chan->backend.buf;
741
742 if (config->cb.buffer_finalize)
743 config->cb.buffer_finalize(buf, chan->backend.priv, -1);
744 if (buf->backend.allocated)
745 lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH);
746 /*
747 * Perform flush before writing to finalized.
748 */
749 smp_wmb();
750 ACCESS_ONCE(buf->finalized) = 1;
751 wake_up_interruptible(&buf->read_wait);
752 }
753 ACCESS_ONCE(chan->finalized) = 1;
754 wake_up_interruptible(&chan->hp_wait);
755 wake_up_interruptible(&chan->read_wait);
756 kref_put(&chan->ref, channel_release);
757 priv = chan->backend.priv;
758 return priv;
759}
760EXPORT_SYMBOL_GPL(channel_destroy);
761
762struct lib_ring_buffer *channel_get_ring_buffer(
763 const struct lib_ring_buffer_config *config,
764 struct channel *chan, int cpu)
765{
766 if (config->alloc == RING_BUFFER_ALLOC_GLOBAL)
767 return chan->backend.buf;
768 else
769 return per_cpu_ptr(chan->backend.buf, cpu);
770}
771EXPORT_SYMBOL_GPL(channel_get_ring_buffer);
772
773int lib_ring_buffer_open_read(struct lib_ring_buffer *buf)
774{
775 struct channel *chan = buf->backend.chan;
776
777 if (!atomic_long_add_unless(&buf->active_readers, 1, 1))
778 return -EBUSY;
779 kref_get(&chan->ref);
780 smp_mb__after_atomic_inc();
781 return 0;
782}
783EXPORT_SYMBOL_GPL(lib_ring_buffer_open_read);
784
785void lib_ring_buffer_release_read(struct lib_ring_buffer *buf)
786{
787 struct channel *chan = buf->backend.chan;
788
789 CHAN_WARN_ON(chan, atomic_long_read(&buf->active_readers) != 1);
790 smp_mb__before_atomic_dec();
791 atomic_long_dec(&buf->active_readers);
792 kref_put(&chan->ref, channel_release);
793}
794EXPORT_SYMBOL_GPL(lib_ring_buffer_release_read);
795
796/*
797 * Promote compiler barrier to a smp_mb().
798 * For the specific ring buffer case, this IPI call should be removed if the
799 * architecture does not reorder writes. This should eventually be provided by
800 * a separate architecture-specific infrastructure.
801 */
802static void remote_mb(void *info)
803{
804 smp_mb();
805}
806
807/**
808 * lib_ring_buffer_snapshot - save subbuffer position snapshot (for read)
809 * @buf: ring buffer
810 * @consumed: consumed count indicating the position where to read
811 * @produced: produced count, indicates position when to stop reading
812 *
813 * Returns -ENODATA if buffer is finalized, -EAGAIN if there is currently no
814 * data to read at consumed position, or 0 if the get operation succeeds.
815 * Busy-loop trying to get data if the tick_nohz sequence lock is held.
816 */
817
818int lib_ring_buffer_snapshot(struct lib_ring_buffer *buf,
819 unsigned long *consumed, unsigned long *produced)
820{
821 struct channel *chan = buf->backend.chan;
822 const struct lib_ring_buffer_config *config = chan->backend.config;
823 unsigned long consumed_cur, write_offset;
824 int finalized;
825
826retry:
827 finalized = ACCESS_ONCE(buf->finalized);
828 /*
829 * Read finalized before counters.
830 */
831 smp_rmb();
832 consumed_cur = atomic_long_read(&buf->consumed);
833 /*
834 * No need to issue a memory barrier between consumed count read and
835 * write offset read, because consumed count can only change
836 * concurrently in overwrite mode, and we keep a sequence counter
837 * identifier derived from the write offset to check we are getting
838 * the same sub-buffer we are expecting (the sub-buffers are atomically
839 * "tagged" upon writes, tags are checked upon read).
840 */
841 write_offset = v_read(config, &buf->offset);
842
843 /*
844 * Check that we are not about to read the same subbuffer in
845 * which the writer head is.
846 */
847 if (subbuf_trunc(write_offset, chan) - subbuf_trunc(consumed_cur, chan)
848 == 0)
849 goto nodata;
850
851 *consumed = consumed_cur;
852 *produced = subbuf_trunc(write_offset, chan);
853
854 return 0;
855
856nodata:
857 /*
858 * The memory barriers __wait_event()/wake_up_interruptible() take care
859 * of "raw_spin_is_locked" memory ordering.
860 */
861 if (finalized)
862 return -ENODATA;
863 else if (raw_spin_is_locked(&buf->raw_tick_nohz_spinlock))
864 goto retry;
865 else
866 return -EAGAIN;
867}
868EXPORT_SYMBOL_GPL(lib_ring_buffer_snapshot);
869
870/**
871 * lib_ring_buffer_put_snapshot - move consumed counter forward
872 * @buf: ring buffer
873 * @consumed_new: new consumed count value
874 */
875void lib_ring_buffer_move_consumer(struct lib_ring_buffer *buf,
876 unsigned long consumed_new)
877{
878 struct lib_ring_buffer_backend *bufb = &buf->backend;
879 struct channel *chan = bufb->chan;
880 unsigned long consumed;
881
882 CHAN_WARN_ON(chan, atomic_long_read(&buf->active_readers) != 1);
883
884 /*
885 * Only push the consumed value forward.
886 * If the consumed cmpxchg fails, this is because we have been pushed by
887 * the writer in flight recorder mode.
888 */
889 consumed = atomic_long_read(&buf->consumed);
890 while ((long) consumed - (long) consumed_new < 0)
891 consumed = atomic_long_cmpxchg(&buf->consumed, consumed,
892 consumed_new);
893}
894EXPORT_SYMBOL_GPL(lib_ring_buffer_move_consumer);
895
896/**
897 * lib_ring_buffer_get_subbuf - get exclusive access to subbuffer for reading
898 * @buf: ring buffer
899 * @consumed: consumed count indicating the position where to read
900 *
901 * Returns -ENODATA if buffer is finalized, -EAGAIN if there is currently no
902 * data to read at consumed position, or 0 if the get operation succeeds.
903 * Busy-loop trying to get data if the tick_nohz sequence lock is held.
904 */
905int lib_ring_buffer_get_subbuf(struct lib_ring_buffer *buf,
906 unsigned long consumed)
907{
908 struct channel *chan = buf->backend.chan;
909 const struct lib_ring_buffer_config *config = chan->backend.config;
910 unsigned long consumed_cur, consumed_idx, commit_count, write_offset;
911 int ret;
912 int finalized;
913
914retry:
915 finalized = ACCESS_ONCE(buf->finalized);
916 /*
917 * Read finalized before counters.
918 */
919 smp_rmb();
920 consumed_cur = atomic_long_read(&buf->consumed);
921 consumed_idx = subbuf_index(consumed, chan);
922 commit_count = v_read(config, &buf->commit_cold[consumed_idx].cc_sb);
923 /*
924 * Make sure we read the commit count before reading the buffer
925 * data and the write offset. Correct consumed offset ordering
926 * wrt commit count is insured by the use of cmpxchg to update
927 * the consumed offset.
928 * smp_call_function_single can fail if the remote CPU is offline,
929 * this is OK because then there is no wmb to execute there.
930 * If our thread is executing on the same CPU as the on the buffers
931 * belongs to, we don't have to synchronize it at all. If we are
932 * migrated, the scheduler will take care of the memory barriers.
933 * Normally, smp_call_function_single() should ensure program order when
934 * executing the remote function, which implies that it surrounds the
935 * function execution with :
936 * smp_mb()
937 * send IPI
938 * csd_lock_wait
939 * recv IPI
940 * smp_mb()
941 * exec. function
942 * smp_mb()
943 * csd unlock
944 * smp_mb()
945 *
946 * However, smp_call_function_single() does not seem to clearly execute
947 * such barriers. It depends on spinlock semantic to provide the barrier
948 * before executing the IPI and, when busy-looping, csd_lock_wait only
949 * executes smp_mb() when it has to wait for the other CPU.
950 *
951 * I don't trust this code. Therefore, let's add the smp_mb() sequence
952 * required ourself, even if duplicated. It has no performance impact
953 * anyway.
954 *
955 * smp_mb() is needed because smp_rmb() and smp_wmb() only order read vs
956 * read and write vs write. They do not ensure core synchronization. We
957 * really have to ensure total order between the 3 barriers running on
958 * the 2 CPUs.
959 */
960 if (config->ipi == RING_BUFFER_IPI_BARRIER) {
961 if (config->sync == RING_BUFFER_SYNC_PER_CPU
962 && config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
963 if (raw_smp_processor_id() != buf->backend.cpu) {
964 /* Total order with IPI handler smp_mb() */
965 smp_mb();
966 smp_call_function_single(buf->backend.cpu,
967 remote_mb, NULL, 1);
968 /* Total order with IPI handler smp_mb() */
969 smp_mb();
970 }
971 } else {
972 /* Total order with IPI handler smp_mb() */
973 smp_mb();
974 smp_call_function(remote_mb, NULL, 1);
975 /* Total order with IPI handler smp_mb() */
976 smp_mb();
977 }
978 } else {
979 /*
980 * Local rmb to match the remote wmb to read the commit count
981 * before the buffer data and the write offset.
982 */
983 smp_rmb();
984 }
985
986 write_offset = v_read(config, &buf->offset);
987
988 /*
989 * Check that the buffer we are getting is after or at consumed_cur
990 * position.
991 */
992 if ((long) subbuf_trunc(consumed, chan)
993 - (long) subbuf_trunc(consumed_cur, chan) < 0)
994 goto nodata;
995
996 /*
997 * Check that the subbuffer we are trying to consume has been
998 * already fully committed.
999 */
1000 if (((commit_count - chan->backend.subbuf_size)
1001 & chan->commit_count_mask)
1002 - (buf_trunc(consumed_cur, chan)
1003 >> chan->backend.num_subbuf_order)
1004 != 0)
1005 goto nodata;
1006
1007 /*
1008 * Check that we are not about to read the same subbuffer in
1009 * which the writer head is.
1010 */
1011 if (subbuf_trunc(write_offset, chan) - subbuf_trunc(consumed_cur, chan)
1012 == 0)
1013 goto nodata;
1014
1015 /*
1016 * Failure to get the subbuffer causes a busy-loop retry without going
1017 * to a wait queue. These are caused by short-lived race windows where
1018 * the writer is getting access to a subbuffer we were trying to get
1019 * access to. Also checks that the "consumed" buffer count we are
1020 * looking for matches the one contained in the subbuffer id.
1021 */
1022 ret = update_read_sb_index(config, &buf->backend, &chan->backend,
1023 consumed_idx, buf_trunc_val(consumed, chan));
1024 if (ret)
1025 goto retry;
1026 subbuffer_id_clear_noref(config, &buf->backend.buf_rsb.id);
1027
1028 buf->get_subbuf_consumed = consumed;
1029 buf->get_subbuf = 1;
1030
1031 return 0;
1032
1033nodata:
1034 /*
1035 * The memory barriers __wait_event()/wake_up_interruptible() take care
1036 * of "raw_spin_is_locked" memory ordering.
1037 */
1038 if (finalized)
1039 return -ENODATA;
1040 else if (raw_spin_is_locked(&buf->raw_tick_nohz_spinlock))
1041 goto retry;
1042 else
1043 return -EAGAIN;
1044}
1045EXPORT_SYMBOL_GPL(lib_ring_buffer_get_subbuf);
1046
1047/**
1048 * lib_ring_buffer_put_subbuf - release exclusive subbuffer access
1049 * @buf: ring buffer
1050 */
1051void lib_ring_buffer_put_subbuf(struct lib_ring_buffer *buf)
1052{
1053 struct lib_ring_buffer_backend *bufb = &buf->backend;
1054 struct channel *chan = bufb->chan;
1055 const struct lib_ring_buffer_config *config = chan->backend.config;
1056 unsigned long read_sb_bindex, consumed_idx, consumed;
1057
1058 CHAN_WARN_ON(chan, atomic_long_read(&buf->active_readers) != 1);
1059
1060 if (!buf->get_subbuf) {
1061 /*
1062 * Reader puts a subbuffer it did not get.
1063 */
1064 CHAN_WARN_ON(chan, 1);
1065 return;
1066 }
1067 consumed = buf->get_subbuf_consumed;
1068 buf->get_subbuf = 0;
1069
1070 /*
1071 * Clear the records_unread counter. (overruns counter)
1072 * Can still be non-zero if a file reader simply grabbed the data
1073 * without using iterators.
1074 * Can be below zero if an iterator is used on a snapshot more than
1075 * once.
1076 */
1077 read_sb_bindex = subbuffer_id_get_index(config, bufb->buf_rsb.id);
1078 v_add(config, v_read(config,
1079 &bufb->array[read_sb_bindex]->records_unread),
1080 &bufb->records_read);
1081 v_set(config, &bufb->array[read_sb_bindex]->records_unread, 0);
1082 CHAN_WARN_ON(chan, config->mode == RING_BUFFER_OVERWRITE
1083 && subbuffer_id_is_noref(config, bufb->buf_rsb.id));
1084 subbuffer_id_set_noref(config, &bufb->buf_rsb.id);
1085
1086 /*
1087 * Exchange the reader subbuffer with the one we put in its place in the
1088 * writer subbuffer table. Expect the original consumed count. If
1089 * update_read_sb_index fails, this is because the writer updated the
1090 * subbuffer concurrently. We should therefore keep the subbuffer we
1091 * currently have: it has become invalid to try reading this sub-buffer
1092 * consumed count value anyway.
1093 */
1094 consumed_idx = subbuf_index(consumed, chan);
1095 update_read_sb_index(config, &buf->backend, &chan->backend,
1096 consumed_idx, buf_trunc_val(consumed, chan));
1097 /*
1098 * update_read_sb_index return value ignored. Don't exchange sub-buffer
1099 * if the writer concurrently updated it.
1100 */
1101}
1102EXPORT_SYMBOL_GPL(lib_ring_buffer_put_subbuf);
1103
1104/*
1105 * cons_offset is an iterator on all subbuffer offsets between the reader
1106 * position and the writer position. (inclusive)
1107 */
1108static
1109void lib_ring_buffer_print_subbuffer_errors(struct lib_ring_buffer *buf,
1110 struct channel *chan,
1111 unsigned long cons_offset,
1112 int cpu)
1113{
1114 const struct lib_ring_buffer_config *config = chan->backend.config;
1115 unsigned long cons_idx, commit_count, commit_count_sb;
1116
1117 cons_idx = subbuf_index(cons_offset, chan);
1118 commit_count = v_read(config, &buf->commit_hot[cons_idx].cc);
1119 commit_count_sb = v_read(config, &buf->commit_cold[cons_idx].cc_sb);
1120
1121 if (subbuf_offset(commit_count, chan) != 0)
1122 printk(KERN_WARNING
1123 "ring buffer %s, cpu %d: "
1124 "commit count in subbuffer %lu,\n"
1125 "expecting multiples of %lu bytes\n"
1126 " [ %lu bytes committed, %lu bytes reader-visible ]\n",
1127 chan->backend.name, cpu, cons_idx,
1128 chan->backend.subbuf_size,
1129 commit_count, commit_count_sb);
1130
1131 printk(KERN_DEBUG "ring buffer: %s, cpu %d: %lu bytes committed\n",
1132 chan->backend.name, cpu, commit_count);
1133}
1134
1135static
1136void lib_ring_buffer_print_buffer_errors(struct lib_ring_buffer *buf,
1137 struct channel *chan,
1138 void *priv, int cpu)
1139{
1140 const struct lib_ring_buffer_config *config = chan->backend.config;
1141 unsigned long write_offset, cons_offset;
1142
1143 /*
1144 * Can be called in the error path of allocation when
1145 * trans_channel_data is not yet set.
1146 */
1147 if (!chan)
1148 return;
1149 /*
1150 * No need to order commit_count, write_offset and cons_offset reads
1151 * because we execute at teardown when no more writer nor reader
1152 * references are left.
1153 */
1154 write_offset = v_read(config, &buf->offset);
1155 cons_offset = atomic_long_read(&buf->consumed);
1156 if (write_offset != cons_offset)
1157 printk(KERN_WARNING
1158 "ring buffer %s, cpu %d: "
1159 "non-consumed data\n"
1160 " [ %lu bytes written, %lu bytes read ]\n",
1161 chan->backend.name, cpu, write_offset, cons_offset);
1162
1163 for (cons_offset = atomic_long_read(&buf->consumed);
1164 (long) (subbuf_trunc((unsigned long) v_read(config, &buf->offset),
1165 chan)
1166 - cons_offset) > 0;
1167 cons_offset = subbuf_align(cons_offset, chan))
1168 lib_ring_buffer_print_subbuffer_errors(buf, chan, cons_offset,
1169 cpu);
1170}
1171
1172static
1173void lib_ring_buffer_print_errors(struct channel *chan,
1174 struct lib_ring_buffer *buf, int cpu)
1175{
1176 const struct lib_ring_buffer_config *config = chan->backend.config;
1177 void *priv = chan->backend.priv;
1178
1179 printk(KERN_DEBUG "ring buffer %s, cpu %d: %lu records written, "
1180 "%lu records overrun\n",
1181 chan->backend.name, cpu,
1182 v_read(config, &buf->records_count),
1183 v_read(config, &buf->records_overrun));
1184
1185 if (v_read(config, &buf->records_lost_full)
1186 || v_read(config, &buf->records_lost_wrap)
1187 || v_read(config, &buf->records_lost_big))
1188 printk(KERN_WARNING
1189 "ring buffer %s, cpu %d: records were lost. Caused by:\n"
1190 " [ %lu buffer full, %lu nest buffer wrap-around, "
1191 "%lu event too big ]\n",
1192 chan->backend.name, cpu,
1193 v_read(config, &buf->records_lost_full),
1194 v_read(config, &buf->records_lost_wrap),
1195 v_read(config, &buf->records_lost_big));
1196
1197 lib_ring_buffer_print_buffer_errors(buf, chan, priv, cpu);
1198}
1199
1200/*
1201 * lib_ring_buffer_switch_old_start: Populate old subbuffer header.
1202 *
1203 * Only executed when the buffer is finalized, in SWITCH_FLUSH.
1204 */
1205static
1206void lib_ring_buffer_switch_old_start(struct lib_ring_buffer *buf,
1207 struct channel *chan,
1208 struct switch_offsets *offsets,
1209 u64 tsc)
1210{
1211 const struct lib_ring_buffer_config *config = chan->backend.config;
1212 unsigned long oldidx = subbuf_index(offsets->old, chan);
1213 unsigned long commit_count;
1214
1215 config->cb.buffer_begin(buf, tsc, oldidx);
1216
1217 /*
1218 * Order all writes to buffer before the commit count update that will
1219 * determine that the subbuffer is full.
1220 */
1221 if (config->ipi == RING_BUFFER_IPI_BARRIER) {
1222 /*
1223 * Must write slot data before incrementing commit count. This
1224 * compiler barrier is upgraded into a smp_mb() by the IPI sent
1225 * by get_subbuf().
1226 */
1227 barrier();
1228 } else
1229 smp_wmb();
1230 v_add(config, config->cb.subbuffer_header_size(),
1231 &buf->commit_hot[oldidx].cc);
1232 commit_count = v_read(config, &buf->commit_hot[oldidx].cc);
1233 /* Check if the written buffer has to be delivered */
1234 lib_ring_buffer_check_deliver(config, buf, chan, offsets->old,
1235 commit_count, oldidx);
1236 lib_ring_buffer_write_commit_counter(config, buf, chan, oldidx,
1237 offsets->old, commit_count,
1238 config->cb.subbuffer_header_size());
1239}
1240
1241/*
1242 * lib_ring_buffer_switch_old_end: switch old subbuffer
1243 *
1244 * Note : offset_old should never be 0 here. It is ok, because we never perform
1245 * buffer switch on an empty subbuffer in SWITCH_ACTIVE mode. The caller
1246 * increments the offset_old value when doing a SWITCH_FLUSH on an empty
1247 * subbuffer.
1248 */
1249static
1250void lib_ring_buffer_switch_old_end(struct lib_ring_buffer *buf,
1251 struct channel *chan,
1252 struct switch_offsets *offsets,
1253 u64 tsc)
1254{
1255 const struct lib_ring_buffer_config *config = chan->backend.config;
1256 unsigned long oldidx = subbuf_index(offsets->old - 1, chan);
1257 unsigned long commit_count, padding_size, data_size;
1258
1259 data_size = subbuf_offset(offsets->old - 1, chan) + 1;
1260 padding_size = chan->backend.subbuf_size - data_size;
1261 subbuffer_set_data_size(config, &buf->backend, oldidx, data_size);
1262
1263 /*
1264 * Order all writes to buffer before the commit count update that will
1265 * determine that the subbuffer is full.
1266 */
1267 if (config->ipi == RING_BUFFER_IPI_BARRIER) {
1268 /*
1269 * Must write slot data before incrementing commit count. This
1270 * compiler barrier is upgraded into a smp_mb() by the IPI sent
1271 * by get_subbuf().
1272 */
1273 barrier();
1274 } else
1275 smp_wmb();
1276 v_add(config, padding_size, &buf->commit_hot[oldidx].cc);
1277 commit_count = v_read(config, &buf->commit_hot[oldidx].cc);
1278 lib_ring_buffer_check_deliver(config, buf, chan, offsets->old - 1,
1279 commit_count, oldidx);
1280 lib_ring_buffer_write_commit_counter(config, buf, chan, oldidx,
1281 offsets->old, commit_count,
1282 padding_size);
1283}
1284
1285/*
1286 * lib_ring_buffer_switch_new_start: Populate new subbuffer.
1287 *
1288 * This code can be executed unordered : writers may already have written to the
1289 * sub-buffer before this code gets executed, caution. The commit makes sure
1290 * that this code is executed before the deliver of this sub-buffer.
1291 */
1292static
1293void lib_ring_buffer_switch_new_start(struct lib_ring_buffer *buf,
1294 struct channel *chan,
1295 struct switch_offsets *offsets,
1296 u64 tsc)
1297{
1298 const struct lib_ring_buffer_config *config = chan->backend.config;
1299 unsigned long beginidx = subbuf_index(offsets->begin, chan);
1300 unsigned long commit_count;
1301
1302 config->cb.buffer_begin(buf, tsc, beginidx);
1303
1304 /*
1305 * Order all writes to buffer before the commit count update that will
1306 * determine that the subbuffer is full.
1307 */
1308 if (config->ipi == RING_BUFFER_IPI_BARRIER) {
1309 /*
1310 * Must write slot data before incrementing commit count. This
1311 * compiler barrier is upgraded into a smp_mb() by the IPI sent
1312 * by get_subbuf().
1313 */
1314 barrier();
1315 } else
1316 smp_wmb();
1317 v_add(config, config->cb.subbuffer_header_size(),
1318 &buf->commit_hot[beginidx].cc);
1319 commit_count = v_read(config, &buf->commit_hot[beginidx].cc);
1320 /* Check if the written buffer has to be delivered */
1321 lib_ring_buffer_check_deliver(config, buf, chan, offsets->begin,
1322 commit_count, beginidx);
1323 lib_ring_buffer_write_commit_counter(config, buf, chan, beginidx,
1324 offsets->begin, commit_count,
1325 config->cb.subbuffer_header_size());
1326}
1327
1328/*
1329 * lib_ring_buffer_switch_new_end: finish switching current subbuffer
1330 *
1331 * The only remaining threads could be the ones with pending commits. They will
1332 * have to do the deliver themselves.
1333 */
1334static
1335void lib_ring_buffer_switch_new_end(struct lib_ring_buffer *buf,
1336 struct channel *chan,
1337 struct switch_offsets *offsets,
1338 u64 tsc)
1339{
1340 const struct lib_ring_buffer_config *config = chan->backend.config;
1341 unsigned long endidx = subbuf_index(offsets->end - 1, chan);
1342 unsigned long commit_count, padding_size, data_size;
1343
1344 data_size = subbuf_offset(offsets->end - 1, chan) + 1;
1345 padding_size = chan->backend.subbuf_size - data_size;
1346 subbuffer_set_data_size(config, &buf->backend, endidx, data_size);
1347
1348 /*
1349 * Order all writes to buffer before the commit count update that will
1350 * determine that the subbuffer is full.
1351 */
1352 if (config->ipi == RING_BUFFER_IPI_BARRIER) {
1353 /*
1354 * Must write slot data before incrementing commit count. This
1355 * compiler barrier is upgraded into a smp_mb() by the IPI sent
1356 * by get_subbuf().
1357 */
1358 barrier();
1359 } else
1360 smp_wmb();
1361 v_add(config, padding_size, &buf->commit_hot[endidx].cc);
1362 commit_count = v_read(config, &buf->commit_hot[endidx].cc);
1363 lib_ring_buffer_check_deliver(config, buf, chan, offsets->end - 1,
1364 commit_count, endidx);
1365 lib_ring_buffer_write_commit_counter(config, buf, chan, endidx,
1366 offsets->end, commit_count,
1367 padding_size);
1368}
1369
1370/*
1371 * Returns :
1372 * 0 if ok
1373 * !0 if execution must be aborted.
1374 */
1375static
1376int lib_ring_buffer_try_switch_slow(enum switch_mode mode,
1377 struct lib_ring_buffer *buf,
1378 struct channel *chan,
1379 struct switch_offsets *offsets,
1380 u64 *tsc)
1381{
1382 const struct lib_ring_buffer_config *config = chan->backend.config;
1383 unsigned long off;
1384
1385 offsets->begin = v_read(config, &buf->offset);
1386 offsets->old = offsets->begin;
1387 offsets->switch_old_start = 0;
1388 off = subbuf_offset(offsets->begin, chan);
1389
1390 *tsc = config->cb.ring_buffer_clock_read(chan);
1391
1392 /*
1393 * Ensure we flush the header of an empty subbuffer when doing the
1394 * finalize (SWITCH_FLUSH). This ensures that we end up knowing the
1395 * total data gathering duration even if there were no records saved
1396 * after the last buffer switch.
1397 * In SWITCH_ACTIVE mode, switch the buffer when it contains events.
1398 * SWITCH_ACTIVE only flushes the current subbuffer, dealing with end of
1399 * subbuffer header as appropriate.
1400 * The next record that reserves space will be responsible for
1401 * populating the following subbuffer header. We choose not to populate
1402 * the next subbuffer header here because we want to be able to use
1403 * SWITCH_ACTIVE for periodical buffer flush and CPU tick_nohz stop
1404 * buffer flush, which must guarantee that all the buffer content
1405 * (records and header timestamps) are visible to the reader. This is
1406 * required for quiescence guarantees for the fusion merge.
1407 */
1408 if (mode == SWITCH_FLUSH || off > 0) {
1409 if (unlikely(off == 0)) {
1410 /*
1411 * The client does not save any header information.
1412 * Don't switch empty subbuffer on finalize, because it
1413 * is invalid to deliver a completely empty subbuffer.
1414 */
1415 if (!config->cb.subbuffer_header_size())
1416 return -1;
1417 /*
1418 * Need to write the subbuffer start header on finalize.
1419 */
1420 offsets->switch_old_start = 1;
1421 }
1422 offsets->begin = subbuf_align(offsets->begin, chan);
1423 } else
1424 return -1; /* we do not have to switch : buffer is empty */
1425 /* Note: old points to the next subbuf at offset 0 */
1426 offsets->end = offsets->begin;
1427 return 0;
1428}
1429
1430/*
1431 * Force a sub-buffer switch. This operation is completely reentrant : can be
1432 * called while tracing is active with absolutely no lock held.
1433 *
1434 * Note, however, that as a v_cmpxchg is used for some atomic
1435 * operations, this function must be called from the CPU which owns the buffer
1436 * for a ACTIVE flush.
1437 */
1438void lib_ring_buffer_switch_slow(struct lib_ring_buffer *buf, enum switch_mode mode)
1439{
1440 struct channel *chan = buf->backend.chan;
1441 const struct lib_ring_buffer_config *config = chan->backend.config;
1442 struct switch_offsets offsets;
1443 unsigned long oldidx;
1444 u64 tsc;
1445
1446 offsets.size = 0;
1447
1448 /*
1449 * Perform retryable operations.
1450 */
1451 do {
1452 if (lib_ring_buffer_try_switch_slow(mode, buf, chan, &offsets,
1453 &tsc))
1454 return; /* Switch not needed */
1455 } while (v_cmpxchg(config, &buf->offset, offsets.old, offsets.end)
1456 != offsets.old);
1457
1458 /*
1459 * Atomically update last_tsc. This update races against concurrent
1460 * atomic updates, but the race will always cause supplementary full TSC
1461 * records, never the opposite (missing a full TSC record when it would
1462 * be needed).
1463 */
1464 save_last_tsc(config, buf, tsc);
1465
1466 /*
1467 * Push the reader if necessary
1468 */
1469 lib_ring_buffer_reserve_push_reader(buf, chan, offsets.old);
1470
1471 oldidx = subbuf_index(offsets.old, chan);
1472 lib_ring_buffer_clear_noref(config, &buf->backend, oldidx);
1473
1474 /*
1475 * May need to populate header start on SWITCH_FLUSH.
1476 */
1477 if (offsets.switch_old_start) {
1478 lib_ring_buffer_switch_old_start(buf, chan, &offsets, tsc);
1479 offsets.old += config->cb.subbuffer_header_size();
1480 }
1481
1482 /*
1483 * Switch old subbuffer.
1484 */
1485 lib_ring_buffer_switch_old_end(buf, chan, &offsets, tsc);
1486}
1487EXPORT_SYMBOL_GPL(lib_ring_buffer_switch_slow);
1488
1489/*
1490 * Returns :
1491 * 0 if ok
1492 * -ENOSPC if event size is too large for packet.
1493 * -ENOBUFS if there is currently not enough space in buffer for the event.
1494 * -EIO if data cannot be written into the buffer for any other reason.
1495 */
1496static
1497int lib_ring_buffer_try_reserve_slow(struct lib_ring_buffer *buf,
1498 struct channel *chan,
1499 struct switch_offsets *offsets,
1500 struct lib_ring_buffer_ctx *ctx)
1501{
1502 const struct lib_ring_buffer_config *config = chan->backend.config;
1503 unsigned long reserve_commit_diff;
1504
1505 offsets->begin = v_read(config, &buf->offset);
1506 offsets->old = offsets->begin;
1507 offsets->switch_new_start = 0;
1508 offsets->switch_new_end = 0;
1509 offsets->switch_old_end = 0;
1510 offsets->pre_header_padding = 0;
1511
1512 ctx->tsc = config->cb.ring_buffer_clock_read(chan);
1513 if ((int64_t) ctx->tsc == -EIO)
1514 return -EIO;
1515
1516 if (last_tsc_overflow(config, buf, ctx->tsc))
1517 ctx->rflags |= RING_BUFFER_RFLAG_FULL_TSC;
1518
1519 if (unlikely(subbuf_offset(offsets->begin, ctx->chan) == 0)) {
1520 offsets->switch_new_start = 1; /* For offsets->begin */
1521 } else {
1522 offsets->size = config->cb.record_header_size(config, chan,
1523 offsets->begin,
1524 &offsets->pre_header_padding,
1525 ctx);
1526 offsets->size +=
1527 lib_ring_buffer_align(offsets->begin + offsets->size,
1528 ctx->largest_align)
1529 + ctx->data_size;
1530 if (unlikely(subbuf_offset(offsets->begin, chan) +
1531 offsets->size > chan->backend.subbuf_size)) {
1532 offsets->switch_old_end = 1; /* For offsets->old */
1533 offsets->switch_new_start = 1; /* For offsets->begin */
1534 }
1535 }
1536 if (unlikely(offsets->switch_new_start)) {
1537 unsigned long sb_index;
1538
1539 /*
1540 * We are typically not filling the previous buffer completely.
1541 */
1542 if (likely(offsets->switch_old_end))
1543 offsets->begin = subbuf_align(offsets->begin, chan);
1544 offsets->begin = offsets->begin
1545 + config->cb.subbuffer_header_size();
1546 /* Test new buffer integrity */
1547 sb_index = subbuf_index(offsets->begin, chan);
1548 reserve_commit_diff =
1549 (buf_trunc(offsets->begin, chan)
1550 >> chan->backend.num_subbuf_order)
1551 - ((unsigned long) v_read(config,
1552 &buf->commit_cold[sb_index].cc_sb)
1553 & chan->commit_count_mask);
1554 if (likely(reserve_commit_diff == 0)) {
1555 /* Next subbuffer not being written to. */
1556 if (unlikely(config->mode != RING_BUFFER_OVERWRITE &&
1557 subbuf_trunc(offsets->begin, chan)
1558 - subbuf_trunc((unsigned long)
1559 atomic_long_read(&buf->consumed), chan)
1560 >= chan->backend.buf_size)) {
1561 /*
1562 * We do not overwrite non consumed buffers
1563 * and we are full : record is lost.
1564 */
1565 v_inc(config, &buf->records_lost_full);
1566 return -ENOBUFS;
1567 } else {
1568 /*
1569 * Next subbuffer not being written to, and we
1570 * are either in overwrite mode or the buffer is
1571 * not full. It's safe to write in this new
1572 * subbuffer.
1573 */
1574 }
1575 } else {
1576 /*
1577 * Next subbuffer reserve offset does not match the
1578 * commit offset. Drop record in producer-consumer and
1579 * overwrite mode. Caused by either a writer OOPS or too
1580 * many nested writes over a reserve/commit pair.
1581 */
1582 v_inc(config, &buf->records_lost_wrap);
1583 return -EIO;
1584 }
1585 offsets->size =
1586 config->cb.record_header_size(config, chan,
1587 offsets->begin,
1588 &offsets->pre_header_padding,
1589 ctx);
1590 offsets->size +=
1591 lib_ring_buffer_align(offsets->begin + offsets->size,
1592 ctx->largest_align)
1593 + ctx->data_size;
1594 if (unlikely(subbuf_offset(offsets->begin, chan)
1595 + offsets->size > chan->backend.subbuf_size)) {
1596 /*
1597 * Record too big for subbuffers, report error, don't
1598 * complete the sub-buffer switch.
1599 */
1600 v_inc(config, &buf->records_lost_big);
1601 return -ENOSPC;
1602 } else {
1603 /*
1604 * We just made a successful buffer switch and the
1605 * record fits in the new subbuffer. Let's write.
1606 */
1607 }
1608 } else {
1609 /*
1610 * Record fits in the current buffer and we are not on a switch
1611 * boundary. It's safe to write.
1612 */
1613 }
1614 offsets->end = offsets->begin + offsets->size;
1615
1616 if (unlikely(subbuf_offset(offsets->end, chan) == 0)) {
1617 /*
1618 * The offset_end will fall at the very beginning of the next
1619 * subbuffer.
1620 */
1621 offsets->switch_new_end = 1; /* For offsets->begin */
1622 }
1623 return 0;
1624}
1625
1626/**
1627 * lib_ring_buffer_reserve_slow - Atomic slot reservation in a buffer.
1628 * @ctx: ring buffer context.
1629 *
1630 * Return : -NOBUFS if not enough space, -ENOSPC if event size too large,
1631 * -EIO for other errors, else returns 0.
1632 * It will take care of sub-buffer switching.
1633 */
1634int lib_ring_buffer_reserve_slow(struct lib_ring_buffer_ctx *ctx)
1635{
1636 struct channel *chan = ctx->chan;
1637 const struct lib_ring_buffer_config *config = chan->backend.config;
1638 struct lib_ring_buffer *buf;
1639 struct switch_offsets offsets;
1640 int ret;
1641
1642 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
1643 buf = per_cpu_ptr(chan->backend.buf, ctx->cpu);
1644 else
1645 buf = chan->backend.buf;
1646 ctx->buf = buf;
1647
1648 offsets.size = 0;
1649
1650 do {
1651 ret = lib_ring_buffer_try_reserve_slow(buf, chan, &offsets,
1652 ctx);
1653 if (unlikely(ret))
1654 return ret;
1655 } while (unlikely(v_cmpxchg(config, &buf->offset, offsets.old,
1656 offsets.end)
1657 != offsets.old));
1658
1659 /*
1660 * Atomically update last_tsc. This update races against concurrent
1661 * atomic updates, but the race will always cause supplementary full TSC
1662 * records, never the opposite (missing a full TSC record when it would
1663 * be needed).
1664 */
1665 save_last_tsc(config, buf, ctx->tsc);
1666
1667 /*
1668 * Push the reader if necessary
1669 */
1670 lib_ring_buffer_reserve_push_reader(buf, chan, offsets.end - 1);
1671
1672 /*
1673 * Clear noref flag for this subbuffer.
1674 */
1675 lib_ring_buffer_clear_noref(config, &buf->backend,
1676 subbuf_index(offsets.end - 1, chan));
1677
1678 /*
1679 * Switch old subbuffer if needed.
1680 */
1681 if (unlikely(offsets.switch_old_end)) {
1682 lib_ring_buffer_clear_noref(config, &buf->backend,
1683 subbuf_index(offsets.old - 1, chan));
1684 lib_ring_buffer_switch_old_end(buf, chan, &offsets, ctx->tsc);
1685 }
1686
1687 /*
1688 * Populate new subbuffer.
1689 */
1690 if (unlikely(offsets.switch_new_start))
1691 lib_ring_buffer_switch_new_start(buf, chan, &offsets, ctx->tsc);
1692
1693 if (unlikely(offsets.switch_new_end))
1694 lib_ring_buffer_switch_new_end(buf, chan, &offsets, ctx->tsc);
1695
1696 ctx->slot_size = offsets.size;
1697 ctx->pre_offset = offsets.begin;
1698 ctx->buf_offset = offsets.begin + offsets.pre_header_padding;
1699 return 0;
1700}
1701EXPORT_SYMBOL_GPL(lib_ring_buffer_reserve_slow);
This page took 0.085446 seconds and 4 git commands to generate.