Fix: RHEL 7.2 scsi instrumentation
[lttng-modules.git] / lib / ringbuffer / ring_buffer_frontend.c
CommitLineData
f3bc08c5
MD
1/*
2 * ring_buffer_frontend.c
3 *
886d51a3
MD
4 * Copyright (C) 2005-2012 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; only
9 * version 2.1 of the License.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with this library; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19 *
f3bc08c5
MD
20 *
21 * Ring buffer wait-free buffer synchronization. Producer-consumer and flight
22 * recorder (overwrite) modes. See thesis:
23 *
24 * Desnoyers, Mathieu (2009), "Low-Impact Operating System Tracing", Ph.D.
25 * dissertation, Ecole Polytechnique de Montreal.
26 * http://www.lttng.org/pub/thesis/desnoyers-dissertation-2009-12.pdf
27 *
28 * - Algorithm presentation in Chapter 5:
29 * "Lockless Multi-Core High-Throughput Buffering".
30 * - Algorithm formal verification in Section 8.6:
31 * "Formal verification of LTTng"
32 *
33 * Author:
34 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
35 *
36 * Inspired from LTT and RelayFS:
37 * Karim Yaghmour <karim@opersys.com>
38 * Tom Zanussi <zanussi@us.ibm.com>
39 * Bob Wisniewski <bob@watson.ibm.com>
40 * And from K42 :
41 * Bob Wisniewski <bob@watson.ibm.com>
42 *
43 * Buffer reader semantic :
44 *
45 * - get_subbuf_size
46 * while buffer is not finalized and empty
47 * - get_subbuf
48 * - if return value != 0, continue
49 * - splice one subbuffer worth of data to a pipe
50 * - splice the data from pipe to disk/network
51 * - put_subbuf
f3bc08c5
MD
52 */
53
54#include <linux/delay.h>
55#include <linux/module.h>
56#include <linux/percpu.h>
57
58#include "../../wrapper/ringbuffer/config.h"
59#include "../../wrapper/ringbuffer/backend.h"
60#include "../../wrapper/ringbuffer/frontend.h"
61#include "../../wrapper/ringbuffer/iterator.h"
62#include "../../wrapper/ringbuffer/nohz.h"
505fb410 63#include "../../wrapper/atomic.h"
e6b06d7d 64#include "../../wrapper/percpu-defs.h"
f3bc08c5
MD
65
66/*
67 * Internal structure representing offsets to use at a sub-buffer switch.
68 */
69struct switch_offsets {
70 unsigned long begin, end, old;
71 size_t pre_header_padding, size;
f5ea5800
MD
72 unsigned int switch_new_start:1, switch_new_end:1, switch_old_start:1,
73 switch_old_end:1;
f3bc08c5
MD
74};
75
76#ifdef CONFIG_NO_HZ
77enum tick_nohz_val {
78 TICK_NOHZ_STOP,
79 TICK_NOHZ_FLUSH,
80 TICK_NOHZ_RESTART,
81};
82
83static ATOMIC_NOTIFIER_HEAD(tick_nohz_notifier);
84#endif /* CONFIG_NO_HZ */
85
86static DEFINE_PER_CPU(spinlock_t, ring_buffer_nohz_lock);
87
88DEFINE_PER_CPU(unsigned int, lib_ring_buffer_nesting);
89EXPORT_PER_CPU_SYMBOL(lib_ring_buffer_nesting);
90
91static
92void lib_ring_buffer_print_errors(struct channel *chan,
93 struct lib_ring_buffer *buf, int cpu);
94
95/*
96 * Must be called under cpu hotplug protection.
97 */
98void lib_ring_buffer_free(struct lib_ring_buffer *buf)
99{
100 struct channel *chan = buf->backend.chan;
101
102 lib_ring_buffer_print_errors(chan, buf, buf->backend.cpu);
103 kfree(buf->commit_hot);
104 kfree(buf->commit_cold);
105
106 lib_ring_buffer_backend_free(&buf->backend);
107}
108
109/**
110 * lib_ring_buffer_reset - Reset ring buffer to initial values.
111 * @buf: Ring buffer.
112 *
113 * Effectively empty the ring buffer. Should be called when the buffer is not
114 * used for writing. The ring buffer can be opened for reading, but the reader
115 * should not be using the iterator concurrently with reset. The previous
116 * current iterator record is reset.
117 */
118void lib_ring_buffer_reset(struct lib_ring_buffer *buf)
119{
120 struct channel *chan = buf->backend.chan;
5a8fd222 121 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
122 unsigned int i;
123
124 /*
125 * Reset iterator first. It will put the subbuffer if it currently holds
126 * it.
127 */
128 lib_ring_buffer_iterator_reset(buf);
129 v_set(config, &buf->offset, 0);
130 for (i = 0; i < chan->backend.num_subbuf; i++) {
131 v_set(config, &buf->commit_hot[i].cc, 0);
132 v_set(config, &buf->commit_hot[i].seq, 0);
133 v_set(config, &buf->commit_cold[i].cc_sb, 0);
134 }
135 atomic_long_set(&buf->consumed, 0);
136 atomic_set(&buf->record_disabled, 0);
137 v_set(config, &buf->last_tsc, 0);
138 lib_ring_buffer_backend_reset(&buf->backend);
139 /* Don't reset number of active readers */
140 v_set(config, &buf->records_lost_full, 0);
141 v_set(config, &buf->records_lost_wrap, 0);
142 v_set(config, &buf->records_lost_big, 0);
143 v_set(config, &buf->records_count, 0);
144 v_set(config, &buf->records_overrun, 0);
145 buf->finalized = 0;
146}
147EXPORT_SYMBOL_GPL(lib_ring_buffer_reset);
148
149/**
150 * channel_reset - Reset channel to initial values.
151 * @chan: Channel.
152 *
153 * Effectively empty the channel. Should be called when the channel is not used
154 * for writing. The channel can be opened for reading, but the reader should not
155 * be using the iterator concurrently with reset. The previous current iterator
156 * record is reset.
157 */
158void channel_reset(struct channel *chan)
159{
160 /*
161 * Reset iterators first. Will put the subbuffer if held for reading.
162 */
163 channel_iterator_reset(chan);
164 atomic_set(&chan->record_disabled, 0);
165 /* Don't reset commit_count_mask, still valid */
166 channel_backend_reset(&chan->backend);
167 /* Don't reset switch/read timer interval */
168 /* Don't reset notifiers and notifier enable bits */
169 /* Don't reset reader reference count */
170}
171EXPORT_SYMBOL_GPL(channel_reset);
172
173/*
174 * Must be called under cpu hotplug protection.
175 */
176int lib_ring_buffer_create(struct lib_ring_buffer *buf,
177 struct channel_backend *chanb, int cpu)
178{
5a8fd222 179 const struct lib_ring_buffer_config *config = &chanb->config;
f3bc08c5
MD
180 struct channel *chan = container_of(chanb, struct channel, backend);
181 void *priv = chanb->priv;
f3bc08c5
MD
182 size_t subbuf_header_size;
183 u64 tsc;
184 int ret;
185
186 /* Test for cpu hotplug */
187 if (buf->backend.allocated)
188 return 0;
189
190 /*
191 * Paranoia: per cpu dynamic allocation is not officially documented as
192 * zeroing the memory, so let's do it here too, just in case.
193 */
194 memset(buf, 0, sizeof(*buf));
195
196 ret = lib_ring_buffer_backend_create(&buf->backend, &chan->backend, cpu);
197 if (ret)
198 return ret;
199
200 buf->commit_hot =
201 kzalloc_node(ALIGN(sizeof(*buf->commit_hot)
202 * chan->backend.num_subbuf,
203 1 << INTERNODE_CACHE_SHIFT),
204 GFP_KERNEL, cpu_to_node(max(cpu, 0)));
205 if (!buf->commit_hot) {
206 ret = -ENOMEM;
207 goto free_chanbuf;
208 }
209
210 buf->commit_cold =
211 kzalloc_node(ALIGN(sizeof(*buf->commit_cold)
212 * chan->backend.num_subbuf,
213 1 << INTERNODE_CACHE_SHIFT),
214 GFP_KERNEL, cpu_to_node(max(cpu, 0)));
215 if (!buf->commit_cold) {
216 ret = -ENOMEM;
217 goto free_commit;
218 }
219
f3bc08c5 220 init_waitqueue_head(&buf->read_wait);
71c1d843 221 init_waitqueue_head(&buf->write_wait);
f3bc08c5
MD
222 raw_spin_lock_init(&buf->raw_tick_nohz_spinlock);
223
224 /*
225 * Write the subbuffer header for first subbuffer so we know the total
226 * duration of data gathering.
227 */
228 subbuf_header_size = config->cb.subbuffer_header_size();
229 v_set(config, &buf->offset, subbuf_header_size);
230 subbuffer_id_clear_noref(config, &buf->backend.buf_wsb[0].id);
231 tsc = config->cb.ring_buffer_clock_read(buf->backend.chan);
232 config->cb.buffer_begin(buf, tsc, 0);
233 v_add(config, subbuf_header_size, &buf->commit_hot[0].cc);
234
235 if (config->cb.buffer_create) {
236 ret = config->cb.buffer_create(buf, priv, cpu, chanb->name);
237 if (ret)
238 goto free_init;
239 }
240
241 /*
242 * Ensure the buffer is ready before setting it to allocated and setting
243 * the cpumask.
244 * Used for cpu hotplug vs cpumask iteration.
245 */
246 smp_wmb();
247 buf->backend.allocated = 1;
248
249 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
250 CHAN_WARN_ON(chan, cpumask_test_cpu(cpu,
251 chan->backend.cpumask));
252 cpumask_set_cpu(cpu, chan->backend.cpumask);
253 }
254
255 return 0;
256
257 /* Error handling */
258free_init:
259 kfree(buf->commit_cold);
260free_commit:
261 kfree(buf->commit_hot);
262free_chanbuf:
263 lib_ring_buffer_backend_free(&buf->backend);
264 return ret;
265}
266
267static void switch_buffer_timer(unsigned long data)
268{
269 struct lib_ring_buffer *buf = (struct lib_ring_buffer *)data;
270 struct channel *chan = buf->backend.chan;
5a8fd222 271 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
272
273 /*
274 * Only flush buffers periodically if readers are active.
275 */
276 if (atomic_long_read(&buf->active_readers))
277 lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
278
279 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
280 mod_timer_pinned(&buf->switch_timer,
281 jiffies + chan->switch_timer_interval);
282 else
283 mod_timer(&buf->switch_timer,
284 jiffies + chan->switch_timer_interval);
285}
286
287/*
288 * Called with ring_buffer_nohz_lock held for per-cpu buffers.
289 */
290static void lib_ring_buffer_start_switch_timer(struct lib_ring_buffer *buf)
291{
292 struct channel *chan = buf->backend.chan;
5a8fd222 293 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
294
295 if (!chan->switch_timer_interval || buf->switch_timer_enabled)
296 return;
297 init_timer(&buf->switch_timer);
298 buf->switch_timer.function = switch_buffer_timer;
299 buf->switch_timer.expires = jiffies + chan->switch_timer_interval;
300 buf->switch_timer.data = (unsigned long)buf;
301 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
302 add_timer_on(&buf->switch_timer, buf->backend.cpu);
303 else
304 add_timer(&buf->switch_timer);
305 buf->switch_timer_enabled = 1;
306}
307
308/*
309 * Called with ring_buffer_nohz_lock held for per-cpu buffers.
310 */
311static void lib_ring_buffer_stop_switch_timer(struct lib_ring_buffer *buf)
312{
313 struct channel *chan = buf->backend.chan;
314
315 if (!chan->switch_timer_interval || !buf->switch_timer_enabled)
316 return;
317
318 del_timer_sync(&buf->switch_timer);
319 buf->switch_timer_enabled = 0;
320}
321
322/*
323 * Polling timer to check the channels for data.
324 */
325static void read_buffer_timer(unsigned long data)
326{
327 struct lib_ring_buffer *buf = (struct lib_ring_buffer *)data;
328 struct channel *chan = buf->backend.chan;
5a8fd222 329 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
330
331 CHAN_WARN_ON(chan, !buf->backend.allocated);
332
333 if (atomic_long_read(&buf->active_readers)
334 && lib_ring_buffer_poll_deliver(config, buf, chan)) {
335 wake_up_interruptible(&buf->read_wait);
336 wake_up_interruptible(&chan->read_wait);
337 }
338
339 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
340 mod_timer_pinned(&buf->read_timer,
341 jiffies + chan->read_timer_interval);
342 else
343 mod_timer(&buf->read_timer,
344 jiffies + chan->read_timer_interval);
345}
346
347/*
348 * Called with ring_buffer_nohz_lock held for per-cpu buffers.
349 */
350static void lib_ring_buffer_start_read_timer(struct lib_ring_buffer *buf)
351{
352 struct channel *chan = buf->backend.chan;
5a8fd222 353 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
354
355 if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER
356 || !chan->read_timer_interval
357 || buf->read_timer_enabled)
358 return;
359
360 init_timer(&buf->read_timer);
361 buf->read_timer.function = read_buffer_timer;
362 buf->read_timer.expires = jiffies + chan->read_timer_interval;
363 buf->read_timer.data = (unsigned long)buf;
364
365 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
366 add_timer_on(&buf->read_timer, buf->backend.cpu);
367 else
368 add_timer(&buf->read_timer);
369 buf->read_timer_enabled = 1;
370}
371
372/*
373 * Called with ring_buffer_nohz_lock held for per-cpu buffers.
374 */
375static void lib_ring_buffer_stop_read_timer(struct lib_ring_buffer *buf)
376{
377 struct channel *chan = buf->backend.chan;
5a8fd222 378 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
379
380 if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER
381 || !chan->read_timer_interval
382 || !buf->read_timer_enabled)
383 return;
384
385 del_timer_sync(&buf->read_timer);
386 /*
387 * do one more check to catch data that has been written in the last
388 * timer period.
389 */
390 if (lib_ring_buffer_poll_deliver(config, buf, chan)) {
391 wake_up_interruptible(&buf->read_wait);
392 wake_up_interruptible(&chan->read_wait);
393 }
394 buf->read_timer_enabled = 0;
395}
396
397#ifdef CONFIG_HOTPLUG_CPU
398/**
399 * lib_ring_buffer_cpu_hp_callback - CPU hotplug callback
400 * @nb: notifier block
401 * @action: hotplug action to take
402 * @hcpu: CPU number
403 *
404 * Returns the success/failure of the operation. (%NOTIFY_OK, %NOTIFY_BAD)
405 */
406static
e8f071d5 407int lib_ring_buffer_cpu_hp_callback(struct notifier_block *nb,
f3bc08c5
MD
408 unsigned long action,
409 void *hcpu)
410{
411 unsigned int cpu = (unsigned long)hcpu;
412 struct channel *chan = container_of(nb, struct channel,
413 cpu_hp_notifier);
414 struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu);
5a8fd222 415 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
416
417 if (!chan->cpu_hp_enable)
418 return NOTIFY_DONE;
419
420 CHAN_WARN_ON(chan, config->alloc == RING_BUFFER_ALLOC_GLOBAL);
421
422 switch (action) {
423 case CPU_DOWN_FAILED:
424 case CPU_DOWN_FAILED_FROZEN:
425 case CPU_ONLINE:
426 case CPU_ONLINE_FROZEN:
24cedcfe 427 wake_up_interruptible(&chan->hp_wait);
f3bc08c5
MD
428 lib_ring_buffer_start_switch_timer(buf);
429 lib_ring_buffer_start_read_timer(buf);
430 return NOTIFY_OK;
431
432 case CPU_DOWN_PREPARE:
433 case CPU_DOWN_PREPARE_FROZEN:
434 lib_ring_buffer_stop_switch_timer(buf);
435 lib_ring_buffer_stop_read_timer(buf);
436 return NOTIFY_OK;
437
438 case CPU_DEAD:
439 case CPU_DEAD_FROZEN:
440 /*
441 * Performing a buffer switch on a remote CPU. Performed by
442 * the CPU responsible for doing the hotunplug after the target
443 * CPU stopped running completely. Ensures that all data
444 * from that remote CPU is flushed.
445 */
446 lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
447 return NOTIFY_OK;
448
449 default:
450 return NOTIFY_DONE;
451 }
452}
453#endif
454
23b908b0 455#if defined(CONFIG_NO_HZ) && defined(CONFIG_LIB_RING_BUFFER)
f3bc08c5
MD
456/*
457 * For per-cpu buffers, call the reader wakeups before switching the buffer, so
458 * that wake-up-tracing generated events are flushed before going idle (in
459 * tick_nohz). We test if the spinlock is locked to deal with the race where
460 * readers try to sample the ring buffer before we perform the switch. We let
461 * the readers retry in that case. If there is data in the buffer, the wake up
462 * is going to forbid the CPU running the reader thread from going idle.
463 */
464static int notrace ring_buffer_tick_nohz_callback(struct notifier_block *nb,
465 unsigned long val,
466 void *data)
467{
468 struct channel *chan = container_of(nb, struct channel,
469 tick_nohz_notifier);
5a8fd222 470 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
471 struct lib_ring_buffer *buf;
472 int cpu = smp_processor_id();
473
474 if (config->alloc != RING_BUFFER_ALLOC_PER_CPU) {
475 /*
476 * We don't support keeping the system idle with global buffers
477 * and streaming active. In order to do so, we would need to
478 * sample a non-nohz-cpumask racelessly with the nohz updates
479 * without adding synchronization overhead to nohz. Leave this
480 * use-case out for now.
481 */
482 return 0;
483 }
484
485 buf = channel_get_ring_buffer(config, chan, cpu);
486 switch (val) {
487 case TICK_NOHZ_FLUSH:
488 raw_spin_lock(&buf->raw_tick_nohz_spinlock);
489 if (config->wakeup == RING_BUFFER_WAKEUP_BY_TIMER
490 && chan->read_timer_interval
491 && atomic_long_read(&buf->active_readers)
492 && (lib_ring_buffer_poll_deliver(config, buf, chan)
493 || lib_ring_buffer_pending_data(config, buf, chan))) {
494 wake_up_interruptible(&buf->read_wait);
495 wake_up_interruptible(&chan->read_wait);
496 }
497 if (chan->switch_timer_interval)
498 lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
499 raw_spin_unlock(&buf->raw_tick_nohz_spinlock);
500 break;
501 case TICK_NOHZ_STOP:
e6b06d7d 502 spin_lock(lttng_this_cpu_ptr(&ring_buffer_nohz_lock));
f3bc08c5
MD
503 lib_ring_buffer_stop_switch_timer(buf);
504 lib_ring_buffer_stop_read_timer(buf);
e6b06d7d 505 spin_unlock(lttng_this_cpu_ptr(&ring_buffer_nohz_lock));
f3bc08c5
MD
506 break;
507 case TICK_NOHZ_RESTART:
e6b06d7d 508 spin_lock(lttng_this_cpu_ptr(&ring_buffer_nohz_lock));
f3bc08c5
MD
509 lib_ring_buffer_start_read_timer(buf);
510 lib_ring_buffer_start_switch_timer(buf);
e6b06d7d 511 spin_unlock(lttng_this_cpu_ptr(&ring_buffer_nohz_lock));
f3bc08c5
MD
512 break;
513 }
514
515 return 0;
516}
517
518void notrace lib_ring_buffer_tick_nohz_flush(void)
519{
520 atomic_notifier_call_chain(&tick_nohz_notifier, TICK_NOHZ_FLUSH,
521 NULL);
522}
523
524void notrace lib_ring_buffer_tick_nohz_stop(void)
525{
526 atomic_notifier_call_chain(&tick_nohz_notifier, TICK_NOHZ_STOP,
527 NULL);
528}
529
530void notrace lib_ring_buffer_tick_nohz_restart(void)
531{
532 atomic_notifier_call_chain(&tick_nohz_notifier, TICK_NOHZ_RESTART,
533 NULL);
534}
23b908b0 535#endif /* defined(CONFIG_NO_HZ) && defined(CONFIG_LIB_RING_BUFFER) */
f3bc08c5
MD
536
537/*
538 * Holds CPU hotplug.
539 */
540static void channel_unregister_notifiers(struct channel *chan)
541{
5a8fd222 542 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
543 int cpu;
544
545 channel_iterator_unregister_notifiers(chan);
546 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
547#ifdef CONFIG_NO_HZ
548 /*
549 * Remove the nohz notifier first, so we are certain we stop
550 * the timers.
551 */
552 atomic_notifier_chain_unregister(&tick_nohz_notifier,
553 &chan->tick_nohz_notifier);
554 /*
555 * ring_buffer_nohz_lock will not be needed below, because
556 * we just removed the notifiers, which were the only source of
557 * concurrency.
558 */
559#endif /* CONFIG_NO_HZ */
560#ifdef CONFIG_HOTPLUG_CPU
561 get_online_cpus();
562 chan->cpu_hp_enable = 0;
563 for_each_online_cpu(cpu) {
564 struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
565 cpu);
566 lib_ring_buffer_stop_switch_timer(buf);
567 lib_ring_buffer_stop_read_timer(buf);
568 }
569 put_online_cpus();
570 unregister_cpu_notifier(&chan->cpu_hp_notifier);
571#else
572 for_each_possible_cpu(cpu) {
573 struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
574 cpu);
575 lib_ring_buffer_stop_switch_timer(buf);
576 lib_ring_buffer_stop_read_timer(buf);
577 }
578#endif
579 } else {
580 struct lib_ring_buffer *buf = chan->backend.buf;
581
582 lib_ring_buffer_stop_switch_timer(buf);
583 lib_ring_buffer_stop_read_timer(buf);
584 }
585 channel_backend_unregister_notifiers(&chan->backend);
586}
587
588static void channel_free(struct channel *chan)
589{
dd5a0db3
MD
590 if (chan->backend.release_priv_ops) {
591 chan->backend.release_priv_ops(chan->backend.priv_ops);
592 }
f3bc08c5
MD
593 channel_iterator_free(chan);
594 channel_backend_free(&chan->backend);
595 kfree(chan);
596}
597
598/**
599 * channel_create - Create channel.
600 * @config: ring buffer instance configuration
601 * @name: name of the channel
602 * @priv: ring buffer client private data
603 * @buf_addr: pointer the the beginning of the preallocated buffer contiguous
604 * address mapping. It is used only by RING_BUFFER_STATIC
605 * configuration. It can be set to NULL for other backends.
606 * @subbuf_size: subbuffer size
607 * @num_subbuf: number of subbuffers
608 * @switch_timer_interval: Time interval (in us) to fill sub-buffers with
609 * padding to let readers get those sub-buffers.
610 * Used for live streaming.
611 * @read_timer_interval: Time interval (in us) to wake up pending readers.
612 *
613 * Holds cpu hotplug.
614 * Returns NULL on failure.
615 */
616struct channel *channel_create(const struct lib_ring_buffer_config *config,
617 const char *name, void *priv, void *buf_addr,
618 size_t subbuf_size,
619 size_t num_subbuf, unsigned int switch_timer_interval,
620 unsigned int read_timer_interval)
621{
622 int ret, cpu;
623 struct channel *chan;
624
625 if (lib_ring_buffer_check_config(config, switch_timer_interval,
626 read_timer_interval))
627 return NULL;
628
629 chan = kzalloc(sizeof(struct channel), GFP_KERNEL);
630 if (!chan)
631 return NULL;
632
633 ret = channel_backend_init(&chan->backend, name, config, priv,
634 subbuf_size, num_subbuf);
635 if (ret)
636 goto error;
637
638 ret = channel_iterator_init(chan);
639 if (ret)
640 goto error_free_backend;
641
642 chan->commit_count_mask = (~0UL >> chan->backend.num_subbuf_order);
643 chan->switch_timer_interval = usecs_to_jiffies(switch_timer_interval);
644 chan->read_timer_interval = usecs_to_jiffies(read_timer_interval);
f40270ad 645 kref_init(&chan->ref);
f3bc08c5 646 init_waitqueue_head(&chan->read_wait);
24cedcfe 647 init_waitqueue_head(&chan->hp_wait);
f3bc08c5
MD
648
649 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
23b908b0 650#if defined(CONFIG_NO_HZ) && defined(CONFIG_LIB_RING_BUFFER)
f3bc08c5
MD
651 /* Only benefit from NO_HZ idle with per-cpu buffers for now. */
652 chan->tick_nohz_notifier.notifier_call =
653 ring_buffer_tick_nohz_callback;
654 chan->tick_nohz_notifier.priority = ~0U;
655 atomic_notifier_chain_register(&tick_nohz_notifier,
656 &chan->tick_nohz_notifier);
23b908b0 657#endif /* defined(CONFIG_NO_HZ) && defined(CONFIG_LIB_RING_BUFFER) */
f3bc08c5
MD
658
659 /*
660 * In case of non-hotplug cpu, if the ring-buffer is allocated
661 * in early initcall, it will not be notified of secondary cpus.
662 * In that off case, we need to allocate for all possible cpus.
663 */
664#ifdef CONFIG_HOTPLUG_CPU
665 chan->cpu_hp_notifier.notifier_call =
666 lib_ring_buffer_cpu_hp_callback;
667 chan->cpu_hp_notifier.priority = 6;
668 register_cpu_notifier(&chan->cpu_hp_notifier);
669
670 get_online_cpus();
671 for_each_online_cpu(cpu) {
672 struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
673 cpu);
674 spin_lock(&per_cpu(ring_buffer_nohz_lock, cpu));
675 lib_ring_buffer_start_switch_timer(buf);
676 lib_ring_buffer_start_read_timer(buf);
677 spin_unlock(&per_cpu(ring_buffer_nohz_lock, cpu));
678 }
679 chan->cpu_hp_enable = 1;
680 put_online_cpus();
681#else
682 for_each_possible_cpu(cpu) {
683 struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
684 cpu);
685 spin_lock(&per_cpu(ring_buffer_nohz_lock, cpu));
686 lib_ring_buffer_start_switch_timer(buf);
687 lib_ring_buffer_start_read_timer(buf);
688 spin_unlock(&per_cpu(ring_buffer_nohz_lock, cpu));
689 }
690#endif
691 } else {
692 struct lib_ring_buffer *buf = chan->backend.buf;
693
694 lib_ring_buffer_start_switch_timer(buf);
695 lib_ring_buffer_start_read_timer(buf);
696 }
697
698 return chan;
699
700error_free_backend:
701 channel_backend_free(&chan->backend);
702error:
703 kfree(chan);
704 return NULL;
705}
706EXPORT_SYMBOL_GPL(channel_create);
707
f40270ad
MD
708static
709void channel_release(struct kref *kref)
710{
711 struct channel *chan = container_of(kref, struct channel, ref);
712 channel_free(chan);
713}
714
f3bc08c5
MD
715/**
716 * channel_destroy - Finalize, wait for q.s. and destroy channel.
717 * @chan: channel to destroy
718 *
719 * Holds cpu hotplug.
9a0df743
MD
720 * Call "destroy" callback, finalize channels, and then decrement the
721 * channel reference count. Note that when readers have completed data
722 * consumption of finalized channels, get_subbuf() will return -ENODATA.
723 * They should release their handle at that point. Returns the private
724 * data pointer.
f3bc08c5
MD
725 */
726void *channel_destroy(struct channel *chan)
727{
728 int cpu;
5a8fd222 729 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
730 void *priv;
731
732 channel_unregister_notifiers(chan);
733
734 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
735 /*
736 * No need to hold cpu hotplug, because all notifiers have been
737 * unregistered.
738 */
739 for_each_channel_cpu(cpu, chan) {
740 struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
741 cpu);
742
743 if (config->cb.buffer_finalize)
744 config->cb.buffer_finalize(buf,
745 chan->backend.priv,
746 cpu);
747 if (buf->backend.allocated)
748 lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH);
749 /*
750 * Perform flush before writing to finalized.
751 */
752 smp_wmb();
753 ACCESS_ONCE(buf->finalized) = 1;
754 wake_up_interruptible(&buf->read_wait);
755 }
756 } else {
757 struct lib_ring_buffer *buf = chan->backend.buf;
758
759 if (config->cb.buffer_finalize)
760 config->cb.buffer_finalize(buf, chan->backend.priv, -1);
761 if (buf->backend.allocated)
762 lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH);
763 /*
764 * Perform flush before writing to finalized.
765 */
766 smp_wmb();
767 ACCESS_ONCE(buf->finalized) = 1;
768 wake_up_interruptible(&buf->read_wait);
769 }
24cedcfe
MD
770 ACCESS_ONCE(chan->finalized) = 1;
771 wake_up_interruptible(&chan->hp_wait);
f3bc08c5 772 wake_up_interruptible(&chan->read_wait);
f3bc08c5 773 priv = chan->backend.priv;
ba1d61bc 774 kref_put(&chan->ref, channel_release);
f3bc08c5
MD
775 return priv;
776}
777EXPORT_SYMBOL_GPL(channel_destroy);
778
779struct lib_ring_buffer *channel_get_ring_buffer(
780 const struct lib_ring_buffer_config *config,
781 struct channel *chan, int cpu)
782{
783 if (config->alloc == RING_BUFFER_ALLOC_GLOBAL)
784 return chan->backend.buf;
785 else
786 return per_cpu_ptr(chan->backend.buf, cpu);
787}
788EXPORT_SYMBOL_GPL(channel_get_ring_buffer);
789
790int lib_ring_buffer_open_read(struct lib_ring_buffer *buf)
791{
792 struct channel *chan = buf->backend.chan;
793
794 if (!atomic_long_add_unless(&buf->active_readers, 1, 1))
795 return -EBUSY;
f40270ad 796 kref_get(&chan->ref);
505fb410 797 lttng_smp_mb__after_atomic();
f3bc08c5
MD
798 return 0;
799}
800EXPORT_SYMBOL_GPL(lib_ring_buffer_open_read);
801
802void lib_ring_buffer_release_read(struct lib_ring_buffer *buf)
803{
804 struct channel *chan = buf->backend.chan;
805
806 CHAN_WARN_ON(chan, atomic_long_read(&buf->active_readers) != 1);
505fb410 807 lttng_smp_mb__before_atomic();
f3bc08c5 808 atomic_long_dec(&buf->active_readers);
f40270ad 809 kref_put(&chan->ref, channel_release);
f3bc08c5
MD
810}
811EXPORT_SYMBOL_GPL(lib_ring_buffer_release_read);
812
813/*
814 * Promote compiler barrier to a smp_mb().
815 * For the specific ring buffer case, this IPI call should be removed if the
816 * architecture does not reorder writes. This should eventually be provided by
817 * a separate architecture-specific infrastructure.
818 */
819static void remote_mb(void *info)
820{
821 smp_mb();
822}
823
824/**
825 * lib_ring_buffer_snapshot - save subbuffer position snapshot (for read)
826 * @buf: ring buffer
827 * @consumed: consumed count indicating the position where to read
828 * @produced: produced count, indicates position when to stop reading
829 *
830 * Returns -ENODATA if buffer is finalized, -EAGAIN if there is currently no
831 * data to read at consumed position, or 0 if the get operation succeeds.
832 * Busy-loop trying to get data if the tick_nohz sequence lock is held.
833 */
834
835int lib_ring_buffer_snapshot(struct lib_ring_buffer *buf,
836 unsigned long *consumed, unsigned long *produced)
837{
838 struct channel *chan = buf->backend.chan;
5a8fd222 839 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
840 unsigned long consumed_cur, write_offset;
841 int finalized;
842
843retry:
844 finalized = ACCESS_ONCE(buf->finalized);
845 /*
846 * Read finalized before counters.
847 */
848 smp_rmb();
849 consumed_cur = atomic_long_read(&buf->consumed);
850 /*
851 * No need to issue a memory barrier between consumed count read and
852 * write offset read, because consumed count can only change
853 * concurrently in overwrite mode, and we keep a sequence counter
854 * identifier derived from the write offset to check we are getting
855 * the same sub-buffer we are expecting (the sub-buffers are atomically
856 * "tagged" upon writes, tags are checked upon read).
857 */
858 write_offset = v_read(config, &buf->offset);
859
860 /*
861 * Check that we are not about to read the same subbuffer in
862 * which the writer head is.
863 */
864 if (subbuf_trunc(write_offset, chan) - subbuf_trunc(consumed_cur, chan)
865 == 0)
866 goto nodata;
867
868 *consumed = consumed_cur;
869 *produced = subbuf_trunc(write_offset, chan);
870
871 return 0;
872
873nodata:
874 /*
875 * The memory barriers __wait_event()/wake_up_interruptible() take care
876 * of "raw_spin_is_locked" memory ordering.
877 */
878 if (finalized)
879 return -ENODATA;
880 else if (raw_spin_is_locked(&buf->raw_tick_nohz_spinlock))
881 goto retry;
882 else
883 return -EAGAIN;
884}
885EXPORT_SYMBOL_GPL(lib_ring_buffer_snapshot);
886
887/**
888 * lib_ring_buffer_put_snapshot - move consumed counter forward
71c1d843
MD
889 *
890 * Should only be called from consumer context.
f3bc08c5
MD
891 * @buf: ring buffer
892 * @consumed_new: new consumed count value
893 */
894void lib_ring_buffer_move_consumer(struct lib_ring_buffer *buf,
895 unsigned long consumed_new)
896{
897 struct lib_ring_buffer_backend *bufb = &buf->backend;
898 struct channel *chan = bufb->chan;
899 unsigned long consumed;
900
901 CHAN_WARN_ON(chan, atomic_long_read(&buf->active_readers) != 1);
902
903 /*
904 * Only push the consumed value forward.
905 * If the consumed cmpxchg fails, this is because we have been pushed by
906 * the writer in flight recorder mode.
907 */
908 consumed = atomic_long_read(&buf->consumed);
909 while ((long) consumed - (long) consumed_new < 0)
910 consumed = atomic_long_cmpxchg(&buf->consumed, consumed,
911 consumed_new);
71c1d843
MD
912 /* Wake-up the metadata producer */
913 wake_up_interruptible(&buf->write_wait);
f3bc08c5
MD
914}
915EXPORT_SYMBOL_GPL(lib_ring_buffer_move_consumer);
916
917/**
918 * lib_ring_buffer_get_subbuf - get exclusive access to subbuffer for reading
919 * @buf: ring buffer
920 * @consumed: consumed count indicating the position where to read
921 *
922 * Returns -ENODATA if buffer is finalized, -EAGAIN if there is currently no
923 * data to read at consumed position, or 0 if the get operation succeeds.
924 * Busy-loop trying to get data if the tick_nohz sequence lock is held.
925 */
926int lib_ring_buffer_get_subbuf(struct lib_ring_buffer *buf,
927 unsigned long consumed)
928{
929 struct channel *chan = buf->backend.chan;
5a8fd222 930 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
931 unsigned long consumed_cur, consumed_idx, commit_count, write_offset;
932 int ret;
933 int finalized;
934
8202b8a0
MD
935 if (buf->get_subbuf) {
936 /*
937 * Reader is trying to get a subbuffer twice.
938 */
939 CHAN_WARN_ON(chan, 1);
940 return -EBUSY;
941 }
f3bc08c5
MD
942retry:
943 finalized = ACCESS_ONCE(buf->finalized);
944 /*
945 * Read finalized before counters.
946 */
947 smp_rmb();
948 consumed_cur = atomic_long_read(&buf->consumed);
949 consumed_idx = subbuf_index(consumed, chan);
950 commit_count = v_read(config, &buf->commit_cold[consumed_idx].cc_sb);
951 /*
952 * Make sure we read the commit count before reading the buffer
953 * data and the write offset. Correct consumed offset ordering
954 * wrt commit count is insured by the use of cmpxchg to update
955 * the consumed offset.
956 * smp_call_function_single can fail if the remote CPU is offline,
957 * this is OK because then there is no wmb to execute there.
958 * If our thread is executing on the same CPU as the on the buffers
959 * belongs to, we don't have to synchronize it at all. If we are
960 * migrated, the scheduler will take care of the memory barriers.
961 * Normally, smp_call_function_single() should ensure program order when
962 * executing the remote function, which implies that it surrounds the
963 * function execution with :
964 * smp_mb()
965 * send IPI
966 * csd_lock_wait
967 * recv IPI
968 * smp_mb()
969 * exec. function
970 * smp_mb()
971 * csd unlock
972 * smp_mb()
973 *
974 * However, smp_call_function_single() does not seem to clearly execute
975 * such barriers. It depends on spinlock semantic to provide the barrier
976 * before executing the IPI and, when busy-looping, csd_lock_wait only
977 * executes smp_mb() when it has to wait for the other CPU.
978 *
979 * I don't trust this code. Therefore, let's add the smp_mb() sequence
980 * required ourself, even if duplicated. It has no performance impact
981 * anyway.
982 *
983 * smp_mb() is needed because smp_rmb() and smp_wmb() only order read vs
984 * read and write vs write. They do not ensure core synchronization. We
985 * really have to ensure total order between the 3 barriers running on
986 * the 2 CPUs.
987 */
988 if (config->ipi == RING_BUFFER_IPI_BARRIER) {
989 if (config->sync == RING_BUFFER_SYNC_PER_CPU
990 && config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
991 if (raw_smp_processor_id() != buf->backend.cpu) {
992 /* Total order with IPI handler smp_mb() */
993 smp_mb();
994 smp_call_function_single(buf->backend.cpu,
995 remote_mb, NULL, 1);
996 /* Total order with IPI handler smp_mb() */
997 smp_mb();
998 }
999 } else {
1000 /* Total order with IPI handler smp_mb() */
1001 smp_mb();
1002 smp_call_function(remote_mb, NULL, 1);
1003 /* Total order with IPI handler smp_mb() */
1004 smp_mb();
1005 }
1006 } else {
1007 /*
1008 * Local rmb to match the remote wmb to read the commit count
1009 * before the buffer data and the write offset.
1010 */
1011 smp_rmb();
1012 }
1013
1014 write_offset = v_read(config, &buf->offset);
1015
1016 /*
1017 * Check that the buffer we are getting is after or at consumed_cur
1018 * position.
1019 */
1020 if ((long) subbuf_trunc(consumed, chan)
1021 - (long) subbuf_trunc(consumed_cur, chan) < 0)
1022 goto nodata;
1023
1024 /*
1025 * Check that the subbuffer we are trying to consume has been
1026 * already fully committed.
1027 */
1028 if (((commit_count - chan->backend.subbuf_size)
1029 & chan->commit_count_mask)
c9b3b5e2 1030 - (buf_trunc(consumed, chan)
f3bc08c5
MD
1031 >> chan->backend.num_subbuf_order)
1032 != 0)
1033 goto nodata;
1034
1035 /*
1036 * Check that we are not about to read the same subbuffer in
1037 * which the writer head is.
1038 */
c9b3b5e2 1039 if (subbuf_trunc(write_offset, chan) - subbuf_trunc(consumed, chan)
f3bc08c5
MD
1040 == 0)
1041 goto nodata;
1042
1043 /*
1044 * Failure to get the subbuffer causes a busy-loop retry without going
1045 * to a wait queue. These are caused by short-lived race windows where
1046 * the writer is getting access to a subbuffer we were trying to get
1047 * access to. Also checks that the "consumed" buffer count we are
1048 * looking for matches the one contained in the subbuffer id.
1049 */
1050 ret = update_read_sb_index(config, &buf->backend, &chan->backend,
1051 consumed_idx, buf_trunc_val(consumed, chan));
1052 if (ret)
1053 goto retry;
1054 subbuffer_id_clear_noref(config, &buf->backend.buf_rsb.id);
1055
1056 buf->get_subbuf_consumed = consumed;
1057 buf->get_subbuf = 1;
1058
1059 return 0;
1060
1061nodata:
1062 /*
1063 * The memory barriers __wait_event()/wake_up_interruptible() take care
1064 * of "raw_spin_is_locked" memory ordering.
1065 */
1066 if (finalized)
1067 return -ENODATA;
1068 else if (raw_spin_is_locked(&buf->raw_tick_nohz_spinlock))
1069 goto retry;
1070 else
1071 return -EAGAIN;
1072}
1073EXPORT_SYMBOL_GPL(lib_ring_buffer_get_subbuf);
1074
1075/**
1076 * lib_ring_buffer_put_subbuf - release exclusive subbuffer access
1077 * @buf: ring buffer
1078 */
1079void lib_ring_buffer_put_subbuf(struct lib_ring_buffer *buf)
1080{
1081 struct lib_ring_buffer_backend *bufb = &buf->backend;
1082 struct channel *chan = bufb->chan;
5a8fd222 1083 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
1084 unsigned long read_sb_bindex, consumed_idx, consumed;
1085
1086 CHAN_WARN_ON(chan, atomic_long_read(&buf->active_readers) != 1);
1087
1088 if (!buf->get_subbuf) {
1089 /*
1090 * Reader puts a subbuffer it did not get.
1091 */
1092 CHAN_WARN_ON(chan, 1);
1093 return;
1094 }
1095 consumed = buf->get_subbuf_consumed;
1096 buf->get_subbuf = 0;
1097
1098 /*
1099 * Clear the records_unread counter. (overruns counter)
1100 * Can still be non-zero if a file reader simply grabbed the data
1101 * without using iterators.
1102 * Can be below zero if an iterator is used on a snapshot more than
1103 * once.
1104 */
1105 read_sb_bindex = subbuffer_id_get_index(config, bufb->buf_rsb.id);
1106 v_add(config, v_read(config,
1107 &bufb->array[read_sb_bindex]->records_unread),
1108 &bufb->records_read);
1109 v_set(config, &bufb->array[read_sb_bindex]->records_unread, 0);
1110 CHAN_WARN_ON(chan, config->mode == RING_BUFFER_OVERWRITE
1111 && subbuffer_id_is_noref(config, bufb->buf_rsb.id));
1112 subbuffer_id_set_noref(config, &bufb->buf_rsb.id);
1113
1114 /*
1115 * Exchange the reader subbuffer with the one we put in its place in the
1116 * writer subbuffer table. Expect the original consumed count. If
1117 * update_read_sb_index fails, this is because the writer updated the
1118 * subbuffer concurrently. We should therefore keep the subbuffer we
1119 * currently have: it has become invalid to try reading this sub-buffer
1120 * consumed count value anyway.
1121 */
1122 consumed_idx = subbuf_index(consumed, chan);
1123 update_read_sb_index(config, &buf->backend, &chan->backend,
1124 consumed_idx, buf_trunc_val(consumed, chan));
1125 /*
1126 * update_read_sb_index return value ignored. Don't exchange sub-buffer
1127 * if the writer concurrently updated it.
1128 */
1129}
1130EXPORT_SYMBOL_GPL(lib_ring_buffer_put_subbuf);
1131
1132/*
1133 * cons_offset is an iterator on all subbuffer offsets between the reader
1134 * position and the writer position. (inclusive)
1135 */
1136static
1137void lib_ring_buffer_print_subbuffer_errors(struct lib_ring_buffer *buf,
1138 struct channel *chan,
1139 unsigned long cons_offset,
1140 int cpu)
1141{
5a8fd222 1142 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
1143 unsigned long cons_idx, commit_count, commit_count_sb;
1144
1145 cons_idx = subbuf_index(cons_offset, chan);
1146 commit_count = v_read(config, &buf->commit_hot[cons_idx].cc);
1147 commit_count_sb = v_read(config, &buf->commit_cold[cons_idx].cc_sb);
1148
1149 if (subbuf_offset(commit_count, chan) != 0)
1150 printk(KERN_WARNING
1151 "ring buffer %s, cpu %d: "
1152 "commit count in subbuffer %lu,\n"
1153 "expecting multiples of %lu bytes\n"
1154 " [ %lu bytes committed, %lu bytes reader-visible ]\n",
1155 chan->backend.name, cpu, cons_idx,
1156 chan->backend.subbuf_size,
1157 commit_count, commit_count_sb);
1158
1159 printk(KERN_DEBUG "ring buffer: %s, cpu %d: %lu bytes committed\n",
1160 chan->backend.name, cpu, commit_count);
1161}
1162
1163static
1164void lib_ring_buffer_print_buffer_errors(struct lib_ring_buffer *buf,
1165 struct channel *chan,
1166 void *priv, int cpu)
1167{
5a8fd222 1168 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
1169 unsigned long write_offset, cons_offset;
1170
f3bc08c5
MD
1171 /*
1172 * No need to order commit_count, write_offset and cons_offset reads
1173 * because we execute at teardown when no more writer nor reader
1174 * references are left.
1175 */
1176 write_offset = v_read(config, &buf->offset);
1177 cons_offset = atomic_long_read(&buf->consumed);
1178 if (write_offset != cons_offset)
05aad775 1179 printk(KERN_DEBUG
f3bc08c5
MD
1180 "ring buffer %s, cpu %d: "
1181 "non-consumed data\n"
1182 " [ %lu bytes written, %lu bytes read ]\n",
1183 chan->backend.name, cpu, write_offset, cons_offset);
1184
1185 for (cons_offset = atomic_long_read(&buf->consumed);
1186 (long) (subbuf_trunc((unsigned long) v_read(config, &buf->offset),
1187 chan)
1188 - cons_offset) > 0;
1189 cons_offset = subbuf_align(cons_offset, chan))
1190 lib_ring_buffer_print_subbuffer_errors(buf, chan, cons_offset,
1191 cpu);
1192}
1193
1194static
1195void lib_ring_buffer_print_errors(struct channel *chan,
1196 struct lib_ring_buffer *buf, int cpu)
1197{
5a8fd222 1198 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
1199 void *priv = chan->backend.priv;
1200
ec01ec93
MD
1201 if (!strcmp(chan->backend.name, "relay-metadata")) {
1202 printk(KERN_DEBUG "ring buffer %s: %lu records written, "
1203 "%lu records overrun\n",
1204 chan->backend.name,
1205 v_read(config, &buf->records_count),
1206 v_read(config, &buf->records_overrun));
1207 } else {
1208 printk(KERN_DEBUG "ring buffer %s, cpu %d: %lu records written, "
1209 "%lu records overrun\n",
1210 chan->backend.name, cpu,
1211 v_read(config, &buf->records_count),
1212 v_read(config, &buf->records_overrun));
1213
1214 if (v_read(config, &buf->records_lost_full)
1215 || v_read(config, &buf->records_lost_wrap)
1216 || v_read(config, &buf->records_lost_big))
1217 printk(KERN_WARNING
1218 "ring buffer %s, cpu %d: records were lost. Caused by:\n"
1219 " [ %lu buffer full, %lu nest buffer wrap-around, "
1220 "%lu event too big ]\n",
1221 chan->backend.name, cpu,
1222 v_read(config, &buf->records_lost_full),
1223 v_read(config, &buf->records_lost_wrap),
1224 v_read(config, &buf->records_lost_big));
1225 }
f3bc08c5
MD
1226 lib_ring_buffer_print_buffer_errors(buf, chan, priv, cpu);
1227}
1228
1229/*
1230 * lib_ring_buffer_switch_old_start: Populate old subbuffer header.
1231 *
1232 * Only executed when the buffer is finalized, in SWITCH_FLUSH.
1233 */
1234static
1235void lib_ring_buffer_switch_old_start(struct lib_ring_buffer *buf,
1236 struct channel *chan,
1237 struct switch_offsets *offsets,
1238 u64 tsc)
1239{
5a8fd222 1240 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
1241 unsigned long oldidx = subbuf_index(offsets->old, chan);
1242 unsigned long commit_count;
1243
1244 config->cb.buffer_begin(buf, tsc, oldidx);
1245
1246 /*
1247 * Order all writes to buffer before the commit count update that will
1248 * determine that the subbuffer is full.
1249 */
1250 if (config->ipi == RING_BUFFER_IPI_BARRIER) {
1251 /*
1252 * Must write slot data before incrementing commit count. This
1253 * compiler barrier is upgraded into a smp_mb() by the IPI sent
1254 * by get_subbuf().
1255 */
1256 barrier();
1257 } else
1258 smp_wmb();
1259 v_add(config, config->cb.subbuffer_header_size(),
1260 &buf->commit_hot[oldidx].cc);
1261 commit_count = v_read(config, &buf->commit_hot[oldidx].cc);
1262 /* Check if the written buffer has to be delivered */
1263 lib_ring_buffer_check_deliver(config, buf, chan, offsets->old,
635e457c 1264 commit_count, oldidx, tsc);
f3bc08c5 1265 lib_ring_buffer_write_commit_counter(config, buf, chan, oldidx,
7915e163
MD
1266 offsets->old + config->cb.subbuffer_header_size(),
1267 commit_count);
f3bc08c5
MD
1268}
1269
1270/*
1271 * lib_ring_buffer_switch_old_end: switch old subbuffer
1272 *
1273 * Note : offset_old should never be 0 here. It is ok, because we never perform
1274 * buffer switch on an empty subbuffer in SWITCH_ACTIVE mode. The caller
1275 * increments the offset_old value when doing a SWITCH_FLUSH on an empty
1276 * subbuffer.
1277 */
1278static
1279void lib_ring_buffer_switch_old_end(struct lib_ring_buffer *buf,
1280 struct channel *chan,
1281 struct switch_offsets *offsets,
1282 u64 tsc)
1283{
5a8fd222 1284 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
1285 unsigned long oldidx = subbuf_index(offsets->old - 1, chan);
1286 unsigned long commit_count, padding_size, data_size;
1287
1288 data_size = subbuf_offset(offsets->old - 1, chan) + 1;
1289 padding_size = chan->backend.subbuf_size - data_size;
1290 subbuffer_set_data_size(config, &buf->backend, oldidx, data_size);
1291
1292 /*
1293 * Order all writes to buffer before the commit count update that will
1294 * determine that the subbuffer is full.
1295 */
1296 if (config->ipi == RING_BUFFER_IPI_BARRIER) {
1297 /*
1298 * Must write slot data before incrementing commit count. This
1299 * compiler barrier is upgraded into a smp_mb() by the IPI sent
1300 * by get_subbuf().
1301 */
1302 barrier();
1303 } else
1304 smp_wmb();
1305 v_add(config, padding_size, &buf->commit_hot[oldidx].cc);
1306 commit_count = v_read(config, &buf->commit_hot[oldidx].cc);
1307 lib_ring_buffer_check_deliver(config, buf, chan, offsets->old - 1,
635e457c 1308 commit_count, oldidx, tsc);
f3bc08c5 1309 lib_ring_buffer_write_commit_counter(config, buf, chan, oldidx,
7915e163 1310 offsets->old + padding_size, commit_count);
f3bc08c5
MD
1311}
1312
1313/*
1314 * lib_ring_buffer_switch_new_start: Populate new subbuffer.
1315 *
1316 * This code can be executed unordered : writers may already have written to the
1317 * sub-buffer before this code gets executed, caution. The commit makes sure
1318 * that this code is executed before the deliver of this sub-buffer.
1319 */
1320static
1321void lib_ring_buffer_switch_new_start(struct lib_ring_buffer *buf,
1322 struct channel *chan,
1323 struct switch_offsets *offsets,
1324 u64 tsc)
1325{
5a8fd222 1326 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
1327 unsigned long beginidx = subbuf_index(offsets->begin, chan);
1328 unsigned long commit_count;
1329
1330 config->cb.buffer_begin(buf, tsc, beginidx);
1331
1332 /*
1333 * Order all writes to buffer before the commit count update that will
1334 * determine that the subbuffer is full.
1335 */
1336 if (config->ipi == RING_BUFFER_IPI_BARRIER) {
1337 /*
1338 * Must write slot data before incrementing commit count. This
1339 * compiler barrier is upgraded into a smp_mb() by the IPI sent
1340 * by get_subbuf().
1341 */
1342 barrier();
1343 } else
1344 smp_wmb();
1345 v_add(config, config->cb.subbuffer_header_size(),
1346 &buf->commit_hot[beginidx].cc);
1347 commit_count = v_read(config, &buf->commit_hot[beginidx].cc);
1348 /* Check if the written buffer has to be delivered */
1349 lib_ring_buffer_check_deliver(config, buf, chan, offsets->begin,
635e457c 1350 commit_count, beginidx, tsc);
f3bc08c5 1351 lib_ring_buffer_write_commit_counter(config, buf, chan, beginidx,
7915e163
MD
1352 offsets->begin + config->cb.subbuffer_header_size(),
1353 commit_count);
f3bc08c5
MD
1354}
1355
f5ea5800
MD
1356/*
1357 * lib_ring_buffer_switch_new_end: finish switching current subbuffer
1358 *
768b05c9
MD
1359 * Calls subbuffer_set_data_size() to set the data size of the current
1360 * sub-buffer. We do not need to perform check_deliver nor commit here,
1361 * since this task will be done by the "commit" of the event for which
1362 * we are currently doing the space reservation.
f5ea5800
MD
1363 */
1364static
1365void lib_ring_buffer_switch_new_end(struct lib_ring_buffer *buf,
1366 struct channel *chan,
1367 struct switch_offsets *offsets,
1368 u64 tsc)
1369{
1370 const struct lib_ring_buffer_config *config = &chan->backend.config;
768b05c9 1371 unsigned long endidx, data_size;
f5ea5800 1372
768b05c9 1373 endidx = subbuf_index(offsets->end - 1, chan);
f5ea5800 1374 data_size = subbuf_offset(offsets->end - 1, chan) + 1;
f5ea5800 1375 subbuffer_set_data_size(config, &buf->backend, endidx, data_size);
f5ea5800
MD
1376}
1377
f3bc08c5
MD
1378/*
1379 * Returns :
1380 * 0 if ok
1381 * !0 if execution must be aborted.
1382 */
1383static
1384int lib_ring_buffer_try_switch_slow(enum switch_mode mode,
1385 struct lib_ring_buffer *buf,
1386 struct channel *chan,
1387 struct switch_offsets *offsets,
1388 u64 *tsc)
1389{
5a8fd222 1390 const struct lib_ring_buffer_config *config = &chan->backend.config;
5334a2c5 1391 unsigned long off, reserve_commit_diff;
f3bc08c5
MD
1392
1393 offsets->begin = v_read(config, &buf->offset);
1394 offsets->old = offsets->begin;
1395 offsets->switch_old_start = 0;
1396 off = subbuf_offset(offsets->begin, chan);
1397
1398 *tsc = config->cb.ring_buffer_clock_read(chan);
1399
1400 /*
1401 * Ensure we flush the header of an empty subbuffer when doing the
1402 * finalize (SWITCH_FLUSH). This ensures that we end up knowing the
1403 * total data gathering duration even if there were no records saved
1404 * after the last buffer switch.
1405 * In SWITCH_ACTIVE mode, switch the buffer when it contains events.
1406 * SWITCH_ACTIVE only flushes the current subbuffer, dealing with end of
1407 * subbuffer header as appropriate.
1408 * The next record that reserves space will be responsible for
1409 * populating the following subbuffer header. We choose not to populate
1410 * the next subbuffer header here because we want to be able to use
1411 * SWITCH_ACTIVE for periodical buffer flush and CPU tick_nohz stop
1412 * buffer flush, which must guarantee that all the buffer content
1413 * (records and header timestamps) are visible to the reader. This is
1414 * required for quiescence guarantees for the fusion merge.
1415 */
5334a2c5
MD
1416 if (mode != SWITCH_FLUSH && !off)
1417 return -1; /* we do not have to switch : buffer is empty */
1418
1419 if (unlikely(off == 0)) {
1420 unsigned long sb_index, commit_count;
1421
1422 /*
1423 * We are performing a SWITCH_FLUSH. At this stage, there are no
1424 * concurrent writes into the buffer.
1425 *
1426 * The client does not save any header information. Don't
1427 * switch empty subbuffer on finalize, because it is invalid to
1428 * deliver a completely empty subbuffer.
1429 */
1430 if (!config->cb.subbuffer_header_size())
1431 return -1;
1432
1433 /* Test new buffer integrity */
1434 sb_index = subbuf_index(offsets->begin, chan);
1435 commit_count = v_read(config,
1436 &buf->commit_cold[sb_index].cc_sb);
1437 reserve_commit_diff =
1438 (buf_trunc(offsets->begin, chan)
1439 >> chan->backend.num_subbuf_order)
1440 - (commit_count & chan->commit_count_mask);
1441 if (likely(reserve_commit_diff == 0)) {
1442 /* Next subbuffer not being written to. */
1443 if (unlikely(config->mode != RING_BUFFER_OVERWRITE &&
1444 subbuf_trunc(offsets->begin, chan)
1445 - subbuf_trunc((unsigned long)
1446 atomic_long_read(&buf->consumed), chan)
1447 >= chan->backend.buf_size)) {
1448 /*
1449 * We do not overwrite non consumed buffers
1450 * and we are full : don't switch.
1451 */
f3bc08c5 1452 return -1;
5334a2c5
MD
1453 } else {
1454 /*
1455 * Next subbuffer not being written to, and we
1456 * are either in overwrite mode or the buffer is
1457 * not full. It's safe to write in this new
1458 * subbuffer.
1459 */
1460 }
1461 } else {
f3bc08c5 1462 /*
5334a2c5
MD
1463 * Next subbuffer reserve offset does not match the
1464 * commit offset. Don't perform switch in
1465 * producer-consumer and overwrite mode. Caused by
1466 * either a writer OOPS or too many nested writes over a
1467 * reserve/commit pair.
f3bc08c5 1468 */
5334a2c5 1469 return -1;
f3bc08c5 1470 }
5334a2c5
MD
1471
1472 /*
1473 * Need to write the subbuffer start header on finalize.
1474 */
1475 offsets->switch_old_start = 1;
1476 }
1477 offsets->begin = subbuf_align(offsets->begin, chan);
f3bc08c5
MD
1478 /* Note: old points to the next subbuf at offset 0 */
1479 offsets->end = offsets->begin;
1480 return 0;
1481}
1482
1483/*
1484 * Force a sub-buffer switch. This operation is completely reentrant : can be
1485 * called while tracing is active with absolutely no lock held.
1486 *
1487 * Note, however, that as a v_cmpxchg is used for some atomic
1488 * operations, this function must be called from the CPU which owns the buffer
1489 * for a ACTIVE flush.
1490 */
1491void lib_ring_buffer_switch_slow(struct lib_ring_buffer *buf, enum switch_mode mode)
1492{
1493 struct channel *chan = buf->backend.chan;
5a8fd222 1494 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
1495 struct switch_offsets offsets;
1496 unsigned long oldidx;
1497 u64 tsc;
1498
1499 offsets.size = 0;
1500
1501 /*
1502 * Perform retryable operations.
1503 */
1504 do {
1505 if (lib_ring_buffer_try_switch_slow(mode, buf, chan, &offsets,
1506 &tsc))
1507 return; /* Switch not needed */
1508 } while (v_cmpxchg(config, &buf->offset, offsets.old, offsets.end)
1509 != offsets.old);
1510
1511 /*
1512 * Atomically update last_tsc. This update races against concurrent
1513 * atomic updates, but the race will always cause supplementary full TSC
1514 * records, never the opposite (missing a full TSC record when it would
1515 * be needed).
1516 */
1517 save_last_tsc(config, buf, tsc);
1518
1519 /*
1520 * Push the reader if necessary
1521 */
1522 lib_ring_buffer_reserve_push_reader(buf, chan, offsets.old);
1523
1524 oldidx = subbuf_index(offsets.old, chan);
1525 lib_ring_buffer_clear_noref(config, &buf->backend, oldidx);
1526
1527 /*
1528 * May need to populate header start on SWITCH_FLUSH.
1529 */
1530 if (offsets.switch_old_start) {
1531 lib_ring_buffer_switch_old_start(buf, chan, &offsets, tsc);
1532 offsets.old += config->cb.subbuffer_header_size();
1533 }
1534
1535 /*
1536 * Switch old subbuffer.
1537 */
1538 lib_ring_buffer_switch_old_end(buf, chan, &offsets, tsc);
1539}
1540EXPORT_SYMBOL_GPL(lib_ring_buffer_switch_slow);
1541
5e391252
MD
1542static void remote_switch(void *info)
1543{
1544 struct lib_ring_buffer *buf = info;
1545
1546 lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
1547}
1548
1549void lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf)
1550{
1551 struct channel *chan = buf->backend.chan;
1552 const struct lib_ring_buffer_config *config = &chan->backend.config;
1553 int ret;
1554
1555 /*
1556 * With global synchronization we don't need to use the IPI scheme.
1557 */
1558 if (config->sync == RING_BUFFER_SYNC_GLOBAL) {
1559 lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
1560 return;
1561 }
1562
1563 /*
1564 * Taking lock on CPU hotplug to ensure two things: first, that the
1565 * target cpu is not taken concurrently offline while we are within
1566 * smp_call_function_single() (I don't trust that get_cpu() on the
1567 * _local_ CPU actually inhibit CPU hotplug for the _remote_ CPU (to be
1568 * confirmed)). Secondly, if it happens that the CPU is not online, our
1569 * own call to lib_ring_buffer_switch_slow() needs to be protected from
1570 * CPU hotplug handlers, which can also perform a remote subbuffer
1571 * switch.
1572 */
1573 get_online_cpus();
1574 ret = smp_call_function_single(buf->backend.cpu,
1575 remote_switch, buf, 1);
1576 if (ret) {
1577 /* Remote CPU is offline, do it ourself. */
1578 lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
1579 }
1580 put_online_cpus();
1581}
1582EXPORT_SYMBOL_GPL(lib_ring_buffer_switch_remote);
1583
f3bc08c5
MD
1584/*
1585 * Returns :
1586 * 0 if ok
97ca2c54
MD
1587 * -ENOSPC if event size is too large for packet.
1588 * -ENOBUFS if there is currently not enough space in buffer for the event.
1589 * -EIO if data cannot be written into the buffer for any other reason.
f3bc08c5
MD
1590 */
1591static
1592int lib_ring_buffer_try_reserve_slow(struct lib_ring_buffer *buf,
1593 struct channel *chan,
1594 struct switch_offsets *offsets,
1595 struct lib_ring_buffer_ctx *ctx)
1596{
5a8fd222 1597 const struct lib_ring_buffer_config *config = &chan->backend.config;
0fdec686 1598 unsigned long reserve_commit_diff, offset_cmp;
f3bc08c5 1599
0fdec686
MD
1600retry:
1601 offsets->begin = offset_cmp = v_read(config, &buf->offset);
f3bc08c5
MD
1602 offsets->old = offsets->begin;
1603 offsets->switch_new_start = 0;
f5ea5800 1604 offsets->switch_new_end = 0;
f3bc08c5
MD
1605 offsets->switch_old_end = 0;
1606 offsets->pre_header_padding = 0;
1607
1608 ctx->tsc = config->cb.ring_buffer_clock_read(chan);
97ca2c54
MD
1609 if ((int64_t) ctx->tsc == -EIO)
1610 return -EIO;
f3bc08c5
MD
1611
1612 if (last_tsc_overflow(config, buf, ctx->tsc))
64c796d8 1613 ctx->rflags |= RING_BUFFER_RFLAG_FULL_TSC;
f3bc08c5
MD
1614
1615 if (unlikely(subbuf_offset(offsets->begin, ctx->chan) == 0)) {
1616 offsets->switch_new_start = 1; /* For offsets->begin */
1617 } else {
1618 offsets->size = config->cb.record_header_size(config, chan,
1619 offsets->begin,
f3bc08c5 1620 &offsets->pre_header_padding,
64c796d8 1621 ctx);
f3bc08c5
MD
1622 offsets->size +=
1623 lib_ring_buffer_align(offsets->begin + offsets->size,
1624 ctx->largest_align)
1625 + ctx->data_size;
1626 if (unlikely(subbuf_offset(offsets->begin, chan) +
1627 offsets->size > chan->backend.subbuf_size)) {
1628 offsets->switch_old_end = 1; /* For offsets->old */
1629 offsets->switch_new_start = 1; /* For offsets->begin */
1630 }
1631 }
1632 if (unlikely(offsets->switch_new_start)) {
0fdec686 1633 unsigned long sb_index, commit_count;
f3bc08c5
MD
1634
1635 /*
1636 * We are typically not filling the previous buffer completely.
1637 */
1638 if (likely(offsets->switch_old_end))
1639 offsets->begin = subbuf_align(offsets->begin, chan);
1640 offsets->begin = offsets->begin
1641 + config->cb.subbuffer_header_size();
1642 /* Test new buffer integrity */
1643 sb_index = subbuf_index(offsets->begin, chan);
0fdec686
MD
1644 /*
1645 * Read buf->offset before buf->commit_cold[sb_index].cc_sb.
1646 * lib_ring_buffer_check_deliver() has the matching
1647 * memory barriers required around commit_cold cc_sb
1648 * updates to ensure reserve and commit counter updates
1649 * are not seen reordered when updated by another CPU.
1650 */
1651 smp_rmb();
1652 commit_count = v_read(config,
1653 &buf->commit_cold[sb_index].cc_sb);
1654 /* Read buf->commit_cold[sb_index].cc_sb before buf->offset. */
1655 smp_rmb();
1656 if (unlikely(offset_cmp != v_read(config, &buf->offset))) {
1657 /*
1658 * The reserve counter have been concurrently updated
1659 * while we read the commit counter. This means the
1660 * commit counter we read might not match buf->offset
1661 * due to concurrent update. We therefore need to retry.
1662 */
1663 goto retry;
1664 }
f3bc08c5
MD
1665 reserve_commit_diff =
1666 (buf_trunc(offsets->begin, chan)
1667 >> chan->backend.num_subbuf_order)
0fdec686 1668 - (commit_count & chan->commit_count_mask);
f3bc08c5
MD
1669 if (likely(reserve_commit_diff == 0)) {
1670 /* Next subbuffer not being written to. */
1671 if (unlikely(config->mode != RING_BUFFER_OVERWRITE &&
1672 subbuf_trunc(offsets->begin, chan)
1673 - subbuf_trunc((unsigned long)
1674 atomic_long_read(&buf->consumed), chan)
1675 >= chan->backend.buf_size)) {
1676 /*
1677 * We do not overwrite non consumed buffers
1678 * and we are full : record is lost.
1679 */
1680 v_inc(config, &buf->records_lost_full);
97ca2c54 1681 return -ENOBUFS;
f3bc08c5
MD
1682 } else {
1683 /*
1684 * Next subbuffer not being written to, and we
1685 * are either in overwrite mode or the buffer is
1686 * not full. It's safe to write in this new
1687 * subbuffer.
1688 */
1689 }
1690 } else {
1691 /*
1692 * Next subbuffer reserve offset does not match the
0fdec686
MD
1693 * commit offset, and this did not involve update to the
1694 * reserve counter. Drop record in producer-consumer and
1695 * overwrite mode. Caused by either a writer OOPS or
1696 * too many nested writes over a reserve/commit pair.
f3bc08c5
MD
1697 */
1698 v_inc(config, &buf->records_lost_wrap);
97ca2c54 1699 return -EIO;
f3bc08c5
MD
1700 }
1701 offsets->size =
1702 config->cb.record_header_size(config, chan,
1703 offsets->begin,
f3bc08c5 1704 &offsets->pre_header_padding,
64c796d8 1705 ctx);
f3bc08c5
MD
1706 offsets->size +=
1707 lib_ring_buffer_align(offsets->begin + offsets->size,
1708 ctx->largest_align)
1709 + ctx->data_size;
1710 if (unlikely(subbuf_offset(offsets->begin, chan)
1711 + offsets->size > chan->backend.subbuf_size)) {
1712 /*
1713 * Record too big for subbuffers, report error, don't
1714 * complete the sub-buffer switch.
1715 */
1716 v_inc(config, &buf->records_lost_big);
97ca2c54 1717 return -ENOSPC;
f3bc08c5
MD
1718 } else {
1719 /*
1720 * We just made a successful buffer switch and the
1721 * record fits in the new subbuffer. Let's write.
1722 */
1723 }
1724 } else {
1725 /*
1726 * Record fits in the current buffer and we are not on a switch
1727 * boundary. It's safe to write.
1728 */
1729 }
1730 offsets->end = offsets->begin + offsets->size;
f5ea5800
MD
1731
1732 if (unlikely(subbuf_offset(offsets->end, chan) == 0)) {
1733 /*
1734 * The offset_end will fall at the very beginning of the next
1735 * subbuffer.
1736 */
1737 offsets->switch_new_end = 1; /* For offsets->begin */
1738 }
f3bc08c5
MD
1739 return 0;
1740}
1741
1742/**
1743 * lib_ring_buffer_reserve_slow - Atomic slot reservation in a buffer.
1744 * @ctx: ring buffer context.
1745 *
97ca2c54
MD
1746 * Return : -NOBUFS if not enough space, -ENOSPC if event size too large,
1747 * -EIO for other errors, else returns 0.
f3bc08c5
MD
1748 * It will take care of sub-buffer switching.
1749 */
1750int lib_ring_buffer_reserve_slow(struct lib_ring_buffer_ctx *ctx)
1751{
1752 struct channel *chan = ctx->chan;
5a8fd222 1753 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
1754 struct lib_ring_buffer *buf;
1755 struct switch_offsets offsets;
c099397a 1756 int ret;
f3bc08c5
MD
1757
1758 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
1759 buf = per_cpu_ptr(chan->backend.buf, ctx->cpu);
1760 else
1761 buf = chan->backend.buf;
1762 ctx->buf = buf;
1763
1764 offsets.size = 0;
1765
1766 do {
97ca2c54
MD
1767 ret = lib_ring_buffer_try_reserve_slow(buf, chan, &offsets,
1768 ctx);
1769 if (unlikely(ret))
1770 return ret;
f3bc08c5
MD
1771 } while (unlikely(v_cmpxchg(config, &buf->offset, offsets.old,
1772 offsets.end)
1773 != offsets.old));
1774
1775 /*
1776 * Atomically update last_tsc. This update races against concurrent
1777 * atomic updates, but the race will always cause supplementary full TSC
1778 * records, never the opposite (missing a full TSC record when it would
1779 * be needed).
1780 */
1781 save_last_tsc(config, buf, ctx->tsc);
1782
1783 /*
1784 * Push the reader if necessary
1785 */
1786 lib_ring_buffer_reserve_push_reader(buf, chan, offsets.end - 1);
1787
1788 /*
1789 * Clear noref flag for this subbuffer.
1790 */
1791 lib_ring_buffer_clear_noref(config, &buf->backend,
1792 subbuf_index(offsets.end - 1, chan));
1793
1794 /*
1795 * Switch old subbuffer if needed.
1796 */
1797 if (unlikely(offsets.switch_old_end)) {
1798 lib_ring_buffer_clear_noref(config, &buf->backend,
1799 subbuf_index(offsets.old - 1, chan));
1800 lib_ring_buffer_switch_old_end(buf, chan, &offsets, ctx->tsc);
1801 }
1802
1803 /*
1804 * Populate new subbuffer.
1805 */
1806 if (unlikely(offsets.switch_new_start))
1807 lib_ring_buffer_switch_new_start(buf, chan, &offsets, ctx->tsc);
1808
f5ea5800
MD
1809 if (unlikely(offsets.switch_new_end))
1810 lib_ring_buffer_switch_new_end(buf, chan, &offsets, ctx->tsc);
1811
f3bc08c5
MD
1812 ctx->slot_size = offsets.size;
1813 ctx->pre_offset = offsets.begin;
1814 ctx->buf_offset = offsets.begin + offsets.pre_header_padding;
1815 return 0;
1816}
1817EXPORT_SYMBOL_GPL(lib_ring_buffer_reserve_slow);
6fb8de4b 1818
02a766bb 1819int __init init_lib_ring_buffer_frontend(void)
6fb8de4b
MD
1820{
1821 int cpu;
1822
1823 for_each_possible_cpu(cpu)
1824 spin_lock_init(&per_cpu(ring_buffer_nohz_lock, cpu));
02a766bb 1825 return 0;
6fb8de4b 1826}
02a766bb
MD
1827
1828module_init(init_lib_ring_buffer_frontend);
1a5db82d
MD
1829
1830void __exit exit_lib_ring_buffer_frontend(void)
1831{
1832}
1833
1834module_exit(exit_lib_ring_buffer_frontend);
This page took 0.109302 seconds and 4 git commands to generate.