Commit changes prior to shmp read-only header
[lttng-ust.git] / libringbuffer / ring_buffer_frontend.c
CommitLineData
852c2936
MD
1/*
2 * ring_buffer_frontend.c
3 *
4 * (C) Copyright 2005-2010 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
5 *
6 * Ring buffer wait-free buffer synchronization. Producer-consumer and flight
7 * recorder (overwrite) modes. See thesis:
8 *
9 * Desnoyers, Mathieu (2009), "Low-Impact Operating System Tracing", Ph.D.
10 * dissertation, Ecole Polytechnique de Montreal.
11 * http://www.lttng.org/pub/thesis/desnoyers-dissertation-2009-12.pdf
12 *
13 * - Algorithm presentation in Chapter 5:
14 * "Lockless Multi-Core High-Throughput Buffering".
15 * - Algorithm formal verification in Section 8.6:
16 * "Formal verification of LTTng"
17 *
18 * Author:
19 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
20 *
21 * Inspired from LTT and RelayFS:
22 * Karim Yaghmour <karim@opersys.com>
23 * Tom Zanussi <zanussi@us.ibm.com>
24 * Bob Wisniewski <bob@watson.ibm.com>
25 * And from K42 :
26 * Bob Wisniewski <bob@watson.ibm.com>
27 *
28 * Buffer reader semantic :
29 *
30 * - get_subbuf_size
31 * while buffer is not finalized and empty
32 * - get_subbuf
33 * - if return value != 0, continue
34 * - splice one subbuffer worth of data to a pipe
35 * - splice the data from pipe to disk/network
36 * - put_subbuf
37 *
38 * Dual LGPL v2.1/GPL v2 license.
39 */
40
a6352fd4
MD
41#include <sys/types.h>
42#include <sys/shm.h>
14641deb 43#include <urcu/compiler.h>
a6352fd4 44#include <urcu/ref.h>
14641deb 45
a6352fd4 46#include "smp.h"
4931a13e
MD
47#include "config.h"
48#include "backend.h"
49#include "frontend.h"
a6352fd4 50#include "shm.h"
852c2936
MD
51
52/*
53 * Internal structure representing offsets to use at a sub-buffer switch.
54 */
55struct switch_offsets {
56 unsigned long begin, end, old;
57 size_t pre_header_padding, size;
58 unsigned int switch_new_start:1, switch_new_end:1, switch_old_start:1,
59 switch_old_end:1;
60};
61
a6352fd4 62__thread unsigned int lib_ring_buffer_nesting;
852c2936
MD
63
64static
65void lib_ring_buffer_print_errors(struct channel *chan,
66 struct lib_ring_buffer *buf, int cpu);
67
68/*
69 * Must be called under cpu hotplug protection.
70 */
71void lib_ring_buffer_free(struct lib_ring_buffer *buf)
72{
a6352fd4 73 struct channel *chan = shmp(buf->backend.chan);
852c2936
MD
74
75 lib_ring_buffer_print_errors(chan, buf, buf->backend.cpu);
a6352fd4
MD
76 free(shmp(buf->commit_hot));
77 free(shmp(buf->commit_cold));
852c2936
MD
78
79 lib_ring_buffer_backend_free(&buf->backend);
80}
81
82/**
83 * lib_ring_buffer_reset - Reset ring buffer to initial values.
84 * @buf: Ring buffer.
85 *
86 * Effectively empty the ring buffer. Should be called when the buffer is not
87 * used for writing. The ring buffer can be opened for reading, but the reader
88 * should not be using the iterator concurrently with reset. The previous
89 * current iterator record is reset.
90 */
91void lib_ring_buffer_reset(struct lib_ring_buffer *buf)
92{
a6352fd4 93 struct channel *chan = shmp(buf->backend.chan);
852c2936
MD
94 const struct lib_ring_buffer_config *config = chan->backend.config;
95 unsigned int i;
96
97 /*
98 * Reset iterator first. It will put the subbuffer if it currently holds
99 * it.
100 */
852c2936
MD
101 v_set(config, &buf->offset, 0);
102 for (i = 0; i < chan->backend.num_subbuf; i++) {
a6352fd4
MD
103 v_set(config, &shmp(buf->commit_hot)[i].cc, 0);
104 v_set(config, &shmp(buf->commit_hot)[i].seq, 0);
105 v_set(config, &shmp(buf->commit_cold)[i].cc_sb, 0);
852c2936 106 }
a6352fd4
MD
107 uatomic_set(&buf->consumed, 0);
108 uatomic_set(&buf->record_disabled, 0);
852c2936
MD
109 v_set(config, &buf->last_tsc, 0);
110 lib_ring_buffer_backend_reset(&buf->backend);
111 /* Don't reset number of active readers */
112 v_set(config, &buf->records_lost_full, 0);
113 v_set(config, &buf->records_lost_wrap, 0);
114 v_set(config, &buf->records_lost_big, 0);
115 v_set(config, &buf->records_count, 0);
116 v_set(config, &buf->records_overrun, 0);
117 buf->finalized = 0;
118}
852c2936
MD
119
120/**
121 * channel_reset - Reset channel to initial values.
122 * @chan: Channel.
123 *
124 * Effectively empty the channel. Should be called when the channel is not used
125 * for writing. The channel can be opened for reading, but the reader should not
126 * be using the iterator concurrently with reset. The previous current iterator
127 * record is reset.
128 */
129void channel_reset(struct channel *chan)
130{
131 /*
132 * Reset iterators first. Will put the subbuffer if held for reading.
133 */
a6352fd4 134 uatomic_set(&chan->record_disabled, 0);
852c2936
MD
135 /* Don't reset commit_count_mask, still valid */
136 channel_backend_reset(&chan->backend);
137 /* Don't reset switch/read timer interval */
138 /* Don't reset notifiers and notifier enable bits */
139 /* Don't reset reader reference count */
140}
852c2936
MD
141
142/*
143 * Must be called under cpu hotplug protection.
144 */
145int lib_ring_buffer_create(struct lib_ring_buffer *buf,
a6352fd4
MD
146 struct channel_backend *chanb, int cpu,
147 struct shm_header *shm_header)
852c2936
MD
148{
149 const struct lib_ring_buffer_config *config = chanb->config;
14641deb 150 struct channel *chan = caa_container_of(chanb, struct channel, backend);
852c2936
MD
151 void *priv = chanb->priv;
152 unsigned int num_subbuf;
153 size_t subbuf_header_size;
154 u64 tsc;
155 int ret;
156
157 /* Test for cpu hotplug */
158 if (buf->backend.allocated)
159 return 0;
160
a6352fd4
MD
161 ret = lib_ring_buffer_backend_create(&buf->backend, &chan->backend,
162 cpu, shm_header);
852c2936
MD
163 if (ret)
164 return ret;
165
a6352fd4
MD
166 set_shmp(&buf->commit_hot,
167 zalloc_shm(shm_header,
168 sizeof(*buf->commit_hot) * chan->backend.num_subbuf));
169 if (!shmp(buf->commit_hot)) {
852c2936
MD
170 ret = -ENOMEM;
171 goto free_chanbuf;
172 }
173
a6352fd4
MD
174 set_shmp(&buf->commit_cold,
175 zalloc_shm(shm_header,
176 sizeof(*buf->commit_cold) * chan->backend.num_subbuf));
177 if (!shmp(buf->commit_cold)) {
852c2936
MD
178 ret = -ENOMEM;
179 goto free_commit;
180 }
181
182 num_subbuf = chan->backend.num_subbuf;
a6352fd4 183 //init_waitqueue_head(&buf->read_wait);
852c2936
MD
184
185 /*
186 * Write the subbuffer header for first subbuffer so we know the total
187 * duration of data gathering.
188 */
189 subbuf_header_size = config->cb.subbuffer_header_size();
190 v_set(config, &buf->offset, subbuf_header_size);
a6352fd4
MD
191 subbuffer_id_clear_noref(config, &shmp(buf->backend.buf_wsb)[0].id);
192 tsc = config->cb.ring_buffer_clock_read(shmp(buf->backend.chan));
852c2936 193 config->cb.buffer_begin(buf, tsc, 0);
a6352fd4 194 v_add(config, subbuf_header_size, &shmp(buf->commit_hot)[0].cc);
852c2936
MD
195
196 if (config->cb.buffer_create) {
197 ret = config->cb.buffer_create(buf, priv, cpu, chanb->name);
198 if (ret)
199 goto free_init;
200 }
852c2936 201 buf->backend.allocated = 1;
852c2936
MD
202 return 0;
203
204 /* Error handling */
205free_init:
a6352fd4 206 /* commit_cold will be freed by shm teardown */
852c2936 207free_commit:
a6352fd4 208 /* commit_hot will be freed by shm teardown */
852c2936
MD
209free_chanbuf:
210 lib_ring_buffer_backend_free(&buf->backend);
211 return ret;
212}
213
214static void switch_buffer_timer(unsigned long data)
215{
216 struct lib_ring_buffer *buf = (struct lib_ring_buffer *)data;
a6352fd4 217 struct channel *chan = shmp(buf->backend.chan);
852c2936
MD
218 const struct lib_ring_buffer_config *config = chan->backend.config;
219
220 /*
221 * Only flush buffers periodically if readers are active.
222 */
a6352fd4 223 if (uatomic_read(&buf->active_readers))
852c2936
MD
224 lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
225
a6352fd4
MD
226 //TODO timers
227 //if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
228 // mod_timer_pinned(&buf->switch_timer,
229 // jiffies + chan->switch_timer_interval);
230 //else
231 // mod_timer(&buf->switch_timer,
232 // jiffies + chan->switch_timer_interval);
852c2936
MD
233}
234
852c2936
MD
235static void lib_ring_buffer_start_switch_timer(struct lib_ring_buffer *buf)
236{
a6352fd4 237 struct channel *chan = shmp(buf->backend.chan);
852c2936
MD
238 const struct lib_ring_buffer_config *config = chan->backend.config;
239
240 if (!chan->switch_timer_interval || buf->switch_timer_enabled)
241 return;
a6352fd4
MD
242 //TODO
243 //init_timer(&buf->switch_timer);
244 //buf->switch_timer.function = switch_buffer_timer;
245 //buf->switch_timer.expires = jiffies + chan->switch_timer_interval;
246 //buf->switch_timer.data = (unsigned long)buf;
247 //if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
248 // add_timer_on(&buf->switch_timer, buf->backend.cpu);
249 //else
250 // add_timer(&buf->switch_timer);
852c2936
MD
251 buf->switch_timer_enabled = 1;
252}
253
852c2936
MD
254static void lib_ring_buffer_stop_switch_timer(struct lib_ring_buffer *buf)
255{
a6352fd4 256 struct channel *chan = shmp(buf->backend.chan);
852c2936
MD
257
258 if (!chan->switch_timer_interval || !buf->switch_timer_enabled)
259 return;
260
a6352fd4
MD
261 //TODO
262 //del_timer_sync(&buf->switch_timer);
852c2936
MD
263 buf->switch_timer_enabled = 0;
264}
265
266/*
267 * Polling timer to check the channels for data.
268 */
269static void read_buffer_timer(unsigned long data)
270{
271 struct lib_ring_buffer *buf = (struct lib_ring_buffer *)data;
a6352fd4 272 struct channel *chan = shmp(buf->backend.chan);
852c2936
MD
273 const struct lib_ring_buffer_config *config = chan->backend.config;
274
275 CHAN_WARN_ON(chan, !buf->backend.allocated);
276
a6352fd4 277 if (uatomic_read(&buf->active_readers)
852c2936 278 && lib_ring_buffer_poll_deliver(config, buf, chan)) {
a6352fd4
MD
279 //TODO
280 //wake_up_interruptible(&buf->read_wait);
281 //wake_up_interruptible(&chan->read_wait);
852c2936
MD
282 }
283
a6352fd4
MD
284 //TODO
285 //if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
286 // mod_timer_pinned(&buf->read_timer,
287 // jiffies + chan->read_timer_interval);
288 //else
289 // mod_timer(&buf->read_timer,
290 // jiffies + chan->read_timer_interval);
852c2936
MD
291}
292
852c2936
MD
293static void lib_ring_buffer_start_read_timer(struct lib_ring_buffer *buf)
294{
a6352fd4 295 struct channel *chan = shmp(buf->backend.chan);
852c2936
MD
296 const struct lib_ring_buffer_config *config = chan->backend.config;
297
298 if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER
299 || !chan->read_timer_interval
300 || buf->read_timer_enabled)
301 return;
302
a6352fd4
MD
303 //TODO
304 //init_timer(&buf->read_timer);
305 //buf->read_timer.function = read_buffer_timer;
306 //buf->read_timer.expires = jiffies + chan->read_timer_interval;
307 //buf->read_timer.data = (unsigned long)buf;
852c2936 308
a6352fd4
MD
309 //if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
310 // add_timer_on(&buf->read_timer, buf->backend.cpu);
311 //else
312 // add_timer(&buf->read_timer);
852c2936
MD
313 buf->read_timer_enabled = 1;
314}
315
852c2936
MD
316static void lib_ring_buffer_stop_read_timer(struct lib_ring_buffer *buf)
317{
a6352fd4 318 struct channel *chan = shmp(buf->backend.chan);
852c2936
MD
319 const struct lib_ring_buffer_config *config = chan->backend.config;
320
321 if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER
322 || !chan->read_timer_interval
323 || !buf->read_timer_enabled)
324 return;
325
a6352fd4
MD
326 //TODO
327 //del_timer_sync(&buf->read_timer);
852c2936
MD
328 /*
329 * do one more check to catch data that has been written in the last
330 * timer period.
331 */
332 if (lib_ring_buffer_poll_deliver(config, buf, chan)) {
a6352fd4
MD
333 //TODO
334 //wake_up_interruptible(&buf->read_wait);
335 //wake_up_interruptible(&chan->read_wait);
852c2936
MD
336 }
337 buf->read_timer_enabled = 0;
338}
339
852c2936
MD
340static void channel_unregister_notifiers(struct channel *chan)
341{
342 const struct lib_ring_buffer_config *config = chan->backend.config;
343 int cpu;
344
852c2936 345 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
852c2936 346 for_each_possible_cpu(cpu) {
a6352fd4
MD
347 struct lib_ring_buffer *buf = &shmp(chan->backend.buf)[cpu];
348
852c2936
MD
349 lib_ring_buffer_stop_switch_timer(buf);
350 lib_ring_buffer_stop_read_timer(buf);
351 }
852c2936 352 } else {
a6352fd4 353 struct lib_ring_buffer *buf = shmp(chan->backend.buf);
852c2936
MD
354
355 lib_ring_buffer_stop_switch_timer(buf);
356 lib_ring_buffer_stop_read_timer(buf);
357 }
358 channel_backend_unregister_notifiers(&chan->backend);
359}
360
361static void channel_free(struct channel *chan)
362{
852c2936 363 channel_backend_free(&chan->backend);
a6352fd4 364 free(chan);
852c2936
MD
365}
366
367/**
368 * channel_create - Create channel.
369 * @config: ring buffer instance configuration
370 * @name: name of the channel
371 * @priv: ring buffer client private data
372 * @buf_addr: pointer the the beginning of the preallocated buffer contiguous
373 * address mapping. It is used only by RING_BUFFER_STATIC
374 * configuration. It can be set to NULL for other backends.
375 * @subbuf_size: subbuffer size
376 * @num_subbuf: number of subbuffers
377 * @switch_timer_interval: Time interval (in us) to fill sub-buffers with
378 * padding to let readers get those sub-buffers.
379 * Used for live streaming.
380 * @read_timer_interval: Time interval (in us) to wake up pending readers.
a6352fd4 381 * @shmid: shared memory ID (output)
852c2936
MD
382 *
383 * Holds cpu hotplug.
384 * Returns NULL on failure.
385 */
386struct channel *channel_create(const struct lib_ring_buffer_config *config,
387 const char *name, void *priv, void *buf_addr,
388 size_t subbuf_size,
389 size_t num_subbuf, unsigned int switch_timer_interval,
a6352fd4
MD
390 unsigned int read_timer_interval,
391 int *shmid)
852c2936
MD
392{
393 int ret, cpu;
394 struct channel *chan;
a6352fd4
MD
395 size_t shmsize, bufshmsize;
396 struct shm_header *shm_header;
397 unsigned long num_subbuf_alloc;
852c2936
MD
398
399 if (lib_ring_buffer_check_config(config, switch_timer_interval,
400 read_timer_interval))
401 return NULL;
402
a6352fd4
MD
403 /* Calculate the shm allocation layout */
404 shmsize = sizeof(struct shm_header);
405 shmsize += sizeof(struct channel);
406
407 /* Per-cpu buffer size: control (prior to backend) */
408 bufshmsize = sizeof(struct lib_ring_buffer);
409 shmsize += bufshmsize * num_possible_cpus();
410
411 /* Per-cpu buffer size: backend */
412 /* num_subbuf + 1 is the worse case */
413 num_subbuf_alloc = num_subbuf + 1;
414 bufshmsize = sizeof(struct lib_ring_buffer_backend_pages *) * num_subbuf_alloc;
415 bufshmsize += subbuf_size * (num_subbuf_alloc);
416 bufshmsize += (sizeof(struct lib_ring_buffer_backend_pages) + subbuf_size) * num_subbuf_alloc;
417 bufshmsize += sizeof(struct lib_ring_buffer_backend_subbuffer) * num_subbuf;
418 shmsize += bufshmsize * num_possible_cpus();
419
420 /* Per-cpu buffer size: control (after backend) */
421 bufshmsize += sizeof(struct commit_counters_hot) * num_subbuf;
422 bufshmsize += sizeof(struct commit_counters_cold) * num_subbuf;
423
424 /* Allocate shm */
425 *shmid = shmget(getpid(), shmsize, IPC_CREAT | IPC_EXCL | 0700);
426 if (*shmid < 0) {
427 if (errno == EINVAL)
428 ERR("shmget() returned EINVAL; maybe /proc/sys/kernel/shmmax should be increased.");
429 else
430 PERROR("shmget");
852c2936 431 return NULL;
a6352fd4 432 }
852c2936 433
a6352fd4
MD
434 shm_header = shmat(*shmid, NULL, 0);
435 if (shm_header == (void *) -1) {
436 perror("shmat");
437 goto destroy_shmem;
438 }
852c2936 439
a6352fd4
MD
440 /* Already mark the shared memory for destruction. This will occur only
441 * when all users have detached.
442 */
443 ret = shmctl(*shmid, IPC_RMID, NULL);
444 if (ret == -1) {
445 perror("shmctl");
446 goto destroy_shmem;
447 }
448
449 shm_header->magic = SHM_MAGIC;
450 shm_header->major = SHM_MAJOR;
451 shm_header->major = SHM_MINOR;
452 shm_header->bits_per_long = CAA_BITS_PER_LONG;
453 shm_header->shm_size = shmsize;
454 shm_header->shm_allocated = sizeof(struct shm_header);
455
456 chan = zalloc_shm(shm_header, sizeof(struct channel));
457 if (!chan)
458 goto destroy_shmem;
459 set_shmp(shm_header->chan, chan);
460
461 ret = channel_backend_init(&chan->backend, name, config, priv,
462 subbuf_size, num_subbuf, shm_header);
852c2936 463 if (ret)
a6352fd4 464 goto destroy_shmem;
852c2936
MD
465
466 chan->commit_count_mask = (~0UL >> chan->backend.num_subbuf_order);
a6352fd4
MD
467 //TODO
468 //chan->switch_timer_interval = usecs_to_jiffies(switch_timer_interval);
469 //chan->read_timer_interval = usecs_to_jiffies(read_timer_interval);
470 urcu_ref_init(&chan->ref);
471 //TODO
472 //init_waitqueue_head(&chan->read_wait);
473 //init_waitqueue_head(&chan->hp_wait);
852c2936
MD
474
475 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
852c2936
MD
476 /*
477 * In case of non-hotplug cpu, if the ring-buffer is allocated
478 * in early initcall, it will not be notified of secondary cpus.
479 * In that off case, we need to allocate for all possible cpus.
480 */
852c2936 481 for_each_possible_cpu(cpu) {
a6352fd4 482 struct lib_ring_buffer *buf = &shmp(chan->backend.buf)[cpu];
852c2936
MD
483 lib_ring_buffer_start_switch_timer(buf);
484 lib_ring_buffer_start_read_timer(buf);
852c2936 485 }
852c2936 486 } else {
a6352fd4 487 struct lib_ring_buffer *buf = shmp(chan->backend.buf);
852c2936
MD
488
489 lib_ring_buffer_start_switch_timer(buf);
490 lib_ring_buffer_start_read_timer(buf);
491 }
492
493 return chan;
494
a6352fd4
MD
495destroy_shmem:
496 ret = shmctl(*shmid, IPC_RMID, NULL);
497 if (ret == -1) {
498 perror("shmctl");
499 }
852c2936
MD
500 return NULL;
501}
852c2936
MD
502
503static
a6352fd4 504void channel_release(struct urcu_ref *ref)
852c2936 505{
a6352fd4 506 struct channel *chan = caa_container_of(ref, struct channel, ref);
852c2936
MD
507 channel_free(chan);
508}
509
510/**
511 * channel_destroy - Finalize, wait for q.s. and destroy channel.
512 * @chan: channel to destroy
513 *
514 * Holds cpu hotplug.
515 * Call "destroy" callback, finalize channels, wait for readers to release their
516 * reference, then destroy ring buffer data. Note that when readers have
517 * completed data consumption of finalized channels, get_subbuf() will return
518 * -ENODATA. They should release their handle at that point.
519 * Returns the private data pointer.
520 */
521void *channel_destroy(struct channel *chan)
522{
523 int cpu;
524 const struct lib_ring_buffer_config *config = chan->backend.config;
525 void *priv;
526
527 channel_unregister_notifiers(chan);
528
529 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
852c2936 530 for_each_channel_cpu(cpu, chan) {
a6352fd4 531 struct lib_ring_buffer *buf = &shmp(chan->backend.buf)[cpu];
852c2936
MD
532
533 if (config->cb.buffer_finalize)
534 config->cb.buffer_finalize(buf,
535 chan->backend.priv,
536 cpu);
537 if (buf->backend.allocated)
538 lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH);
539 /*
540 * Perform flush before writing to finalized.
541 */
a6352fd4 542 cmm_smp_wmb();
14641deb 543 CMM_ACCESS_ONCE(buf->finalized) = 1;
a6352fd4 544 //wake_up_interruptible(&buf->read_wait);
852c2936
MD
545 }
546 } else {
a6352fd4 547 struct lib_ring_buffer *buf = shmp(chan->backend.buf);
852c2936
MD
548
549 if (config->cb.buffer_finalize)
550 config->cb.buffer_finalize(buf, chan->backend.priv, -1);
551 if (buf->backend.allocated)
552 lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH);
553 /*
554 * Perform flush before writing to finalized.
555 */
a6352fd4 556 cmm_smp_wmb();
14641deb 557 CMM_ACCESS_ONCE(buf->finalized) = 1;
a6352fd4 558 //wake_up_interruptible(&buf->read_wait);
852c2936 559 }
14641deb 560 CMM_ACCESS_ONCE(chan->finalized) = 1;
a6352fd4
MD
561 //wake_up_interruptible(&chan->hp_wait);
562 //wake_up_interruptible(&chan->read_wait);
563 urcu_ref_put(&chan->ref, channel_release);
852c2936
MD
564 priv = chan->backend.priv;
565 return priv;
566}
852c2936
MD
567
568struct lib_ring_buffer *channel_get_ring_buffer(
569 const struct lib_ring_buffer_config *config,
570 struct channel *chan, int cpu)
571{
572 if (config->alloc == RING_BUFFER_ALLOC_GLOBAL)
a6352fd4 573 return shmp(chan->backend.buf);
852c2936 574 else
a6352fd4 575 return &shmp(chan->backend.buf)[cpu];
852c2936 576}
852c2936
MD
577
578int lib_ring_buffer_open_read(struct lib_ring_buffer *buf)
579{
a6352fd4 580 struct channel *chan = shmp(buf->backend.chan);
852c2936 581
a6352fd4 582 if (uatomic_cmpxchg(&buf->active_readers, 0, 1) != 0)
852c2936 583 return -EBUSY;
a6352fd4
MD
584 urcu_ref_get(&chan->ref);
585 cmm_smp_mb();
852c2936
MD
586 return 0;
587}
852c2936
MD
588
589void lib_ring_buffer_release_read(struct lib_ring_buffer *buf)
590{
a6352fd4 591 struct channel *chan = shmp(buf->backend.chan);
852c2936 592
a6352fd4
MD
593 CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1);
594 cmm_smp_mb();
595 uatomic_dec(&buf->active_readers);
596 urcu_ref_put(&chan->ref, channel_release);
852c2936
MD
597}
598
599/**
600 * lib_ring_buffer_snapshot - save subbuffer position snapshot (for read)
601 * @buf: ring buffer
602 * @consumed: consumed count indicating the position where to read
603 * @produced: produced count, indicates position when to stop reading
604 *
605 * Returns -ENODATA if buffer is finalized, -EAGAIN if there is currently no
606 * data to read at consumed position, or 0 if the get operation succeeds.
852c2936
MD
607 */
608
609int lib_ring_buffer_snapshot(struct lib_ring_buffer *buf,
610 unsigned long *consumed, unsigned long *produced)
611{
a6352fd4 612 struct channel *chan = shmp(buf->backend.chan);
852c2936
MD
613 const struct lib_ring_buffer_config *config = chan->backend.config;
614 unsigned long consumed_cur, write_offset;
615 int finalized;
616
14641deb 617 finalized = CMM_ACCESS_ONCE(buf->finalized);
852c2936
MD
618 /*
619 * Read finalized before counters.
620 */
a6352fd4
MD
621 cmm_smp_rmb();
622 consumed_cur = uatomic_read(&buf->consumed);
852c2936
MD
623 /*
624 * No need to issue a memory barrier between consumed count read and
625 * write offset read, because consumed count can only change
626 * concurrently in overwrite mode, and we keep a sequence counter
627 * identifier derived from the write offset to check we are getting
628 * the same sub-buffer we are expecting (the sub-buffers are atomically
629 * "tagged" upon writes, tags are checked upon read).
630 */
631 write_offset = v_read(config, &buf->offset);
632
633 /*
634 * Check that we are not about to read the same subbuffer in
635 * which the writer head is.
636 */
637 if (subbuf_trunc(write_offset, chan) - subbuf_trunc(consumed_cur, chan)
638 == 0)
639 goto nodata;
640
641 *consumed = consumed_cur;
642 *produced = subbuf_trunc(write_offset, chan);
643
644 return 0;
645
646nodata:
647 /*
648 * The memory barriers __wait_event()/wake_up_interruptible() take care
649 * of "raw_spin_is_locked" memory ordering.
650 */
651 if (finalized)
652 return -ENODATA;
852c2936
MD
653 else
654 return -EAGAIN;
655}
852c2936
MD
656
657/**
658 * lib_ring_buffer_put_snapshot - move consumed counter forward
659 * @buf: ring buffer
660 * @consumed_new: new consumed count value
661 */
662void lib_ring_buffer_move_consumer(struct lib_ring_buffer *buf,
663 unsigned long consumed_new)
664{
665 struct lib_ring_buffer_backend *bufb = &buf->backend;
a6352fd4 666 struct channel *chan = shmp(bufb->chan);
852c2936
MD
667 unsigned long consumed;
668
a6352fd4 669 CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1);
852c2936
MD
670
671 /*
672 * Only push the consumed value forward.
673 * If the consumed cmpxchg fails, this is because we have been pushed by
674 * the writer in flight recorder mode.
675 */
a6352fd4 676 consumed = uatomic_read(&buf->consumed);
852c2936 677 while ((long) consumed - (long) consumed_new < 0)
a6352fd4
MD
678 consumed = uatomic_cmpxchg(&buf->consumed, consumed,
679 consumed_new);
852c2936 680}
852c2936
MD
681
682/**
683 * lib_ring_buffer_get_subbuf - get exclusive access to subbuffer for reading
684 * @buf: ring buffer
685 * @consumed: consumed count indicating the position where to read
686 *
687 * Returns -ENODATA if buffer is finalized, -EAGAIN if there is currently no
688 * data to read at consumed position, or 0 if the get operation succeeds.
852c2936
MD
689 */
690int lib_ring_buffer_get_subbuf(struct lib_ring_buffer *buf,
691 unsigned long consumed)
692{
a6352fd4 693 struct channel *chan = shmp(buf->backend.chan);
852c2936
MD
694 const struct lib_ring_buffer_config *config = chan->backend.config;
695 unsigned long consumed_cur, consumed_idx, commit_count, write_offset;
696 int ret;
697 int finalized;
698
699retry:
14641deb 700 finalized = CMM_ACCESS_ONCE(buf->finalized);
852c2936
MD
701 /*
702 * Read finalized before counters.
703 */
a6352fd4
MD
704 cmm_smp_rmb();
705 consumed_cur = uatomic_read(&buf->consumed);
852c2936 706 consumed_idx = subbuf_index(consumed, chan);
a6352fd4 707 commit_count = v_read(config, &shmp(buf->commit_cold)[consumed_idx].cc_sb);
852c2936
MD
708 /*
709 * Make sure we read the commit count before reading the buffer
710 * data and the write offset. Correct consumed offset ordering
711 * wrt commit count is insured by the use of cmpxchg to update
712 * the consumed offset.
852c2936 713 */
a6352fd4
MD
714 /*
715 * Local rmb to match the remote wmb to read the commit count
716 * before the buffer data and the write offset.
717 */
718 cmm_smp_rmb();
852c2936
MD
719
720 write_offset = v_read(config, &buf->offset);
721
722 /*
723 * Check that the buffer we are getting is after or at consumed_cur
724 * position.
725 */
726 if ((long) subbuf_trunc(consumed, chan)
727 - (long) subbuf_trunc(consumed_cur, chan) < 0)
728 goto nodata;
729
730 /*
731 * Check that the subbuffer we are trying to consume has been
732 * already fully committed.
733 */
734 if (((commit_count - chan->backend.subbuf_size)
735 & chan->commit_count_mask)
736 - (buf_trunc(consumed_cur, chan)
737 >> chan->backend.num_subbuf_order)
738 != 0)
739 goto nodata;
740
741 /*
742 * Check that we are not about to read the same subbuffer in
743 * which the writer head is.
744 */
745 if (subbuf_trunc(write_offset, chan) - subbuf_trunc(consumed_cur, chan)
746 == 0)
747 goto nodata;
748
749 /*
750 * Failure to get the subbuffer causes a busy-loop retry without going
751 * to a wait queue. These are caused by short-lived race windows where
752 * the writer is getting access to a subbuffer we were trying to get
753 * access to. Also checks that the "consumed" buffer count we are
754 * looking for matches the one contained in the subbuffer id.
755 */
756 ret = update_read_sb_index(config, &buf->backend, &chan->backend,
757 consumed_idx, buf_trunc_val(consumed, chan));
758 if (ret)
759 goto retry;
760 subbuffer_id_clear_noref(config, &buf->backend.buf_rsb.id);
761
762 buf->get_subbuf_consumed = consumed;
763 buf->get_subbuf = 1;
764
765 return 0;
766
767nodata:
768 /*
769 * The memory barriers __wait_event()/wake_up_interruptible() take care
770 * of "raw_spin_is_locked" memory ordering.
771 */
772 if (finalized)
773 return -ENODATA;
852c2936
MD
774 else
775 return -EAGAIN;
776}
852c2936
MD
777
778/**
779 * lib_ring_buffer_put_subbuf - release exclusive subbuffer access
780 * @buf: ring buffer
781 */
782void lib_ring_buffer_put_subbuf(struct lib_ring_buffer *buf)
783{
784 struct lib_ring_buffer_backend *bufb = &buf->backend;
a6352fd4 785 struct channel *chan = shmp(bufb->chan);
852c2936
MD
786 const struct lib_ring_buffer_config *config = chan->backend.config;
787 unsigned long read_sb_bindex, consumed_idx, consumed;
788
a6352fd4 789 CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1);
852c2936
MD
790
791 if (!buf->get_subbuf) {
792 /*
793 * Reader puts a subbuffer it did not get.
794 */
795 CHAN_WARN_ON(chan, 1);
796 return;
797 }
798 consumed = buf->get_subbuf_consumed;
799 buf->get_subbuf = 0;
800
801 /*
802 * Clear the records_unread counter. (overruns counter)
803 * Can still be non-zero if a file reader simply grabbed the data
804 * without using iterators.
805 * Can be below zero if an iterator is used on a snapshot more than
806 * once.
807 */
808 read_sb_bindex = subbuffer_id_get_index(config, bufb->buf_rsb.id);
809 v_add(config, v_read(config,
a6352fd4 810 &shmp(bufb->array)[read_sb_bindex]->records_unread),
852c2936 811 &bufb->records_read);
a6352fd4 812 v_set(config, &shmp(bufb->array)[read_sb_bindex]->records_unread, 0);
852c2936
MD
813 CHAN_WARN_ON(chan, config->mode == RING_BUFFER_OVERWRITE
814 && subbuffer_id_is_noref(config, bufb->buf_rsb.id));
815 subbuffer_id_set_noref(config, &bufb->buf_rsb.id);
816
817 /*
818 * Exchange the reader subbuffer with the one we put in its place in the
819 * writer subbuffer table. Expect the original consumed count. If
820 * update_read_sb_index fails, this is because the writer updated the
821 * subbuffer concurrently. We should therefore keep the subbuffer we
822 * currently have: it has become invalid to try reading this sub-buffer
823 * consumed count value anyway.
824 */
825 consumed_idx = subbuf_index(consumed, chan);
826 update_read_sb_index(config, &buf->backend, &chan->backend,
827 consumed_idx, buf_trunc_val(consumed, chan));
828 /*
829 * update_read_sb_index return value ignored. Don't exchange sub-buffer
830 * if the writer concurrently updated it.
831 */
832}
852c2936
MD
833
834/*
835 * cons_offset is an iterator on all subbuffer offsets between the reader
836 * position and the writer position. (inclusive)
837 */
838static
839void lib_ring_buffer_print_subbuffer_errors(struct lib_ring_buffer *buf,
840 struct channel *chan,
841 unsigned long cons_offset,
842 int cpu)
843{
844 const struct lib_ring_buffer_config *config = chan->backend.config;
845 unsigned long cons_idx, commit_count, commit_count_sb;
846
847 cons_idx = subbuf_index(cons_offset, chan);
a6352fd4
MD
848 commit_count = v_read(config, &shmp(buf->commit_hot)[cons_idx].cc);
849 commit_count_sb = v_read(config, &shmp(buf->commit_cold)[cons_idx].cc_sb);
852c2936
MD
850
851 if (subbuf_offset(commit_count, chan) != 0)
a6352fd4 852 ERRMSG("ring buffer %s, cpu %d: "
852c2936
MD
853 "commit count in subbuffer %lu,\n"
854 "expecting multiples of %lu bytes\n"
855 " [ %lu bytes committed, %lu bytes reader-visible ]\n",
856 chan->backend.name, cpu, cons_idx,
857 chan->backend.subbuf_size,
858 commit_count, commit_count_sb);
859
a6352fd4 860 ERRMSG("ring buffer: %s, cpu %d: %lu bytes committed\n",
852c2936
MD
861 chan->backend.name, cpu, commit_count);
862}
863
864static
865void lib_ring_buffer_print_buffer_errors(struct lib_ring_buffer *buf,
866 struct channel *chan,
867 void *priv, int cpu)
868{
869 const struct lib_ring_buffer_config *config = chan->backend.config;
870 unsigned long write_offset, cons_offset;
871
872 /*
873 * Can be called in the error path of allocation when
874 * trans_channel_data is not yet set.
875 */
876 if (!chan)
877 return;
878 /*
879 * No need to order commit_count, write_offset and cons_offset reads
880 * because we execute at teardown when no more writer nor reader
881 * references are left.
882 */
883 write_offset = v_read(config, &buf->offset);
a6352fd4 884 cons_offset = uatomic_read(&buf->consumed);
852c2936 885 if (write_offset != cons_offset)
a6352fd4 886 ERRMSG("ring buffer %s, cpu %d: "
852c2936
MD
887 "non-consumed data\n"
888 " [ %lu bytes written, %lu bytes read ]\n",
889 chan->backend.name, cpu, write_offset, cons_offset);
890
a6352fd4 891 for (cons_offset = uatomic_read(&buf->consumed);
852c2936
MD
892 (long) (subbuf_trunc((unsigned long) v_read(config, &buf->offset),
893 chan)
894 - cons_offset) > 0;
895 cons_offset = subbuf_align(cons_offset, chan))
896 lib_ring_buffer_print_subbuffer_errors(buf, chan, cons_offset,
897 cpu);
898}
899
900static
901void lib_ring_buffer_print_errors(struct channel *chan,
902 struct lib_ring_buffer *buf, int cpu)
903{
904 const struct lib_ring_buffer_config *config = chan->backend.config;
905 void *priv = chan->backend.priv;
906
a6352fd4 907 ERRMSG("ring buffer %s, cpu %d: %lu records written, "
852c2936
MD
908 "%lu records overrun\n",
909 chan->backend.name, cpu,
910 v_read(config, &buf->records_count),
911 v_read(config, &buf->records_overrun));
912
913 if (v_read(config, &buf->records_lost_full)
914 || v_read(config, &buf->records_lost_wrap)
915 || v_read(config, &buf->records_lost_big))
a6352fd4 916 ERRMSG("ring buffer %s, cpu %d: records were lost. Caused by:\n"
852c2936
MD
917 " [ %lu buffer full, %lu nest buffer wrap-around, "
918 "%lu event too big ]\n",
919 chan->backend.name, cpu,
920 v_read(config, &buf->records_lost_full),
921 v_read(config, &buf->records_lost_wrap),
922 v_read(config, &buf->records_lost_big));
923
924 lib_ring_buffer_print_buffer_errors(buf, chan, priv, cpu);
925}
926
927/*
928 * lib_ring_buffer_switch_old_start: Populate old subbuffer header.
929 *
930 * Only executed when the buffer is finalized, in SWITCH_FLUSH.
931 */
932static
933void lib_ring_buffer_switch_old_start(struct lib_ring_buffer *buf,
934 struct channel *chan,
935 struct switch_offsets *offsets,
936 u64 tsc)
937{
938 const struct lib_ring_buffer_config *config = chan->backend.config;
939 unsigned long oldidx = subbuf_index(offsets->old, chan);
940 unsigned long commit_count;
941
942 config->cb.buffer_begin(buf, tsc, oldidx);
943
944 /*
945 * Order all writes to buffer before the commit count update that will
946 * determine that the subbuffer is full.
947 */
a6352fd4 948 cmm_smp_wmb();
852c2936 949 v_add(config, config->cb.subbuffer_header_size(),
a6352fd4
MD
950 &shmp(buf->commit_hot)[oldidx].cc);
951 commit_count = v_read(config, &shmp(buf->commit_hot)[oldidx].cc);
852c2936
MD
952 /* Check if the written buffer has to be delivered */
953 lib_ring_buffer_check_deliver(config, buf, chan, offsets->old,
954 commit_count, oldidx);
955 lib_ring_buffer_write_commit_counter(config, buf, chan, oldidx,
956 offsets->old, commit_count,
957 config->cb.subbuffer_header_size());
958}
959
960/*
961 * lib_ring_buffer_switch_old_end: switch old subbuffer
962 *
963 * Note : offset_old should never be 0 here. It is ok, because we never perform
964 * buffer switch on an empty subbuffer in SWITCH_ACTIVE mode. The caller
965 * increments the offset_old value when doing a SWITCH_FLUSH on an empty
966 * subbuffer.
967 */
968static
969void lib_ring_buffer_switch_old_end(struct lib_ring_buffer *buf,
970 struct channel *chan,
971 struct switch_offsets *offsets,
972 u64 tsc)
973{
974 const struct lib_ring_buffer_config *config = chan->backend.config;
975 unsigned long oldidx = subbuf_index(offsets->old - 1, chan);
976 unsigned long commit_count, padding_size, data_size;
977
978 data_size = subbuf_offset(offsets->old - 1, chan) + 1;
979 padding_size = chan->backend.subbuf_size - data_size;
980 subbuffer_set_data_size(config, &buf->backend, oldidx, data_size);
981
982 /*
983 * Order all writes to buffer before the commit count update that will
984 * determine that the subbuffer is full.
985 */
a6352fd4
MD
986 cmm_smp_wmb();
987 v_add(config, padding_size, &shmp(buf->commit_hot)[oldidx].cc);
988 commit_count = v_read(config, &shmp(buf->commit_hot)[oldidx].cc);
852c2936
MD
989 lib_ring_buffer_check_deliver(config, buf, chan, offsets->old - 1,
990 commit_count, oldidx);
991 lib_ring_buffer_write_commit_counter(config, buf, chan, oldidx,
992 offsets->old, commit_count,
993 padding_size);
994}
995
996/*
997 * lib_ring_buffer_switch_new_start: Populate new subbuffer.
998 *
999 * This code can be executed unordered : writers may already have written to the
1000 * sub-buffer before this code gets executed, caution. The commit makes sure
1001 * that this code is executed before the deliver of this sub-buffer.
1002 */
1003static
1004void lib_ring_buffer_switch_new_start(struct lib_ring_buffer *buf,
1005 struct channel *chan,
1006 struct switch_offsets *offsets,
1007 u64 tsc)
1008{
1009 const struct lib_ring_buffer_config *config = chan->backend.config;
1010 unsigned long beginidx = subbuf_index(offsets->begin, chan);
1011 unsigned long commit_count;
1012
1013 config->cb.buffer_begin(buf, tsc, beginidx);
1014
1015 /*
1016 * Order all writes to buffer before the commit count update that will
1017 * determine that the subbuffer is full.
1018 */
a6352fd4 1019 cmm_smp_wmb();
852c2936 1020 v_add(config, config->cb.subbuffer_header_size(),
a6352fd4
MD
1021 &shmp(buf->commit_hot)[beginidx].cc);
1022 commit_count = v_read(config, &shmp(buf->commit_hot)[beginidx].cc);
852c2936
MD
1023 /* Check if the written buffer has to be delivered */
1024 lib_ring_buffer_check_deliver(config, buf, chan, offsets->begin,
1025 commit_count, beginidx);
1026 lib_ring_buffer_write_commit_counter(config, buf, chan, beginidx,
1027 offsets->begin, commit_count,
1028 config->cb.subbuffer_header_size());
1029}
1030
1031/*
1032 * lib_ring_buffer_switch_new_end: finish switching current subbuffer
1033 *
1034 * The only remaining threads could be the ones with pending commits. They will
1035 * have to do the deliver themselves.
1036 */
1037static
1038void lib_ring_buffer_switch_new_end(struct lib_ring_buffer *buf,
1039 struct channel *chan,
1040 struct switch_offsets *offsets,
1041 u64 tsc)
1042{
1043 const struct lib_ring_buffer_config *config = chan->backend.config;
1044 unsigned long endidx = subbuf_index(offsets->end - 1, chan);
1045 unsigned long commit_count, padding_size, data_size;
1046
1047 data_size = subbuf_offset(offsets->end - 1, chan) + 1;
1048 padding_size = chan->backend.subbuf_size - data_size;
1049 subbuffer_set_data_size(config, &buf->backend, endidx, data_size);
1050
1051 /*
1052 * Order all writes to buffer before the commit count update that will
1053 * determine that the subbuffer is full.
1054 */
a6352fd4
MD
1055 cmm_smp_wmb();
1056 v_add(config, padding_size, &shmp(buf->commit_hot)[endidx].cc);
1057 commit_count = v_read(config, &shmp(buf->commit_hot)[endidx].cc);
852c2936
MD
1058 lib_ring_buffer_check_deliver(config, buf, chan, offsets->end - 1,
1059 commit_count, endidx);
1060 lib_ring_buffer_write_commit_counter(config, buf, chan, endidx,
1061 offsets->end, commit_count,
1062 padding_size);
1063}
1064
1065/*
1066 * Returns :
1067 * 0 if ok
1068 * !0 if execution must be aborted.
1069 */
1070static
1071int lib_ring_buffer_try_switch_slow(enum switch_mode mode,
1072 struct lib_ring_buffer *buf,
1073 struct channel *chan,
1074 struct switch_offsets *offsets,
1075 u64 *tsc)
1076{
1077 const struct lib_ring_buffer_config *config = chan->backend.config;
1078 unsigned long off;
1079
1080 offsets->begin = v_read(config, &buf->offset);
1081 offsets->old = offsets->begin;
1082 offsets->switch_old_start = 0;
1083 off = subbuf_offset(offsets->begin, chan);
1084
1085 *tsc = config->cb.ring_buffer_clock_read(chan);
1086
1087 /*
1088 * Ensure we flush the header of an empty subbuffer when doing the
1089 * finalize (SWITCH_FLUSH). This ensures that we end up knowing the
1090 * total data gathering duration even if there were no records saved
1091 * after the last buffer switch.
1092 * In SWITCH_ACTIVE mode, switch the buffer when it contains events.
1093 * SWITCH_ACTIVE only flushes the current subbuffer, dealing with end of
1094 * subbuffer header as appropriate.
1095 * The next record that reserves space will be responsible for
1096 * populating the following subbuffer header. We choose not to populate
1097 * the next subbuffer header here because we want to be able to use
a6352fd4
MD
1098 * SWITCH_ACTIVE for periodical buffer flush, which must
1099 * guarantee that all the buffer content (records and header
1100 * timestamps) are visible to the reader. This is required for
1101 * quiescence guarantees for the fusion merge.
852c2936
MD
1102 */
1103 if (mode == SWITCH_FLUSH || off > 0) {
1104 if (unlikely(off == 0)) {
1105 /*
1106 * The client does not save any header information.
1107 * Don't switch empty subbuffer on finalize, because it
1108 * is invalid to deliver a completely empty subbuffer.
1109 */
1110 if (!config->cb.subbuffer_header_size())
1111 return -1;
1112 /*
1113 * Need to write the subbuffer start header on finalize.
1114 */
1115 offsets->switch_old_start = 1;
1116 }
1117 offsets->begin = subbuf_align(offsets->begin, chan);
1118 } else
1119 return -1; /* we do not have to switch : buffer is empty */
1120 /* Note: old points to the next subbuf at offset 0 */
1121 offsets->end = offsets->begin;
1122 return 0;
1123}
1124
1125/*
1126 * Force a sub-buffer switch. This operation is completely reentrant : can be
1127 * called while tracing is active with absolutely no lock held.
1128 *
1129 * Note, however, that as a v_cmpxchg is used for some atomic
1130 * operations, this function must be called from the CPU which owns the buffer
1131 * for a ACTIVE flush.
1132 */
1133void lib_ring_buffer_switch_slow(struct lib_ring_buffer *buf, enum switch_mode mode)
1134{
a6352fd4 1135 struct channel *chan = shmp(buf->backend.chan);
852c2936
MD
1136 const struct lib_ring_buffer_config *config = chan->backend.config;
1137 struct switch_offsets offsets;
1138 unsigned long oldidx;
1139 u64 tsc;
1140
1141 offsets.size = 0;
1142
1143 /*
1144 * Perform retryable operations.
1145 */
1146 do {
1147 if (lib_ring_buffer_try_switch_slow(mode, buf, chan, &offsets,
1148 &tsc))
1149 return; /* Switch not needed */
1150 } while (v_cmpxchg(config, &buf->offset, offsets.old, offsets.end)
1151 != offsets.old);
1152
1153 /*
1154 * Atomically update last_tsc. This update races against concurrent
1155 * atomic updates, but the race will always cause supplementary full TSC
1156 * records, never the opposite (missing a full TSC record when it would
1157 * be needed).
1158 */
1159 save_last_tsc(config, buf, tsc);
1160
1161 /*
1162 * Push the reader if necessary
1163 */
1164 lib_ring_buffer_reserve_push_reader(buf, chan, offsets.old);
1165
1166 oldidx = subbuf_index(offsets.old, chan);
1167 lib_ring_buffer_clear_noref(config, &buf->backend, oldidx);
1168
1169 /*
1170 * May need to populate header start on SWITCH_FLUSH.
1171 */
1172 if (offsets.switch_old_start) {
1173 lib_ring_buffer_switch_old_start(buf, chan, &offsets, tsc);
1174 offsets.old += config->cb.subbuffer_header_size();
1175 }
1176
1177 /*
1178 * Switch old subbuffer.
1179 */
1180 lib_ring_buffer_switch_old_end(buf, chan, &offsets, tsc);
1181}
852c2936
MD
1182
1183/*
1184 * Returns :
1185 * 0 if ok
1186 * -ENOSPC if event size is too large for packet.
1187 * -ENOBUFS if there is currently not enough space in buffer for the event.
1188 * -EIO if data cannot be written into the buffer for any other reason.
1189 */
1190static
1191int lib_ring_buffer_try_reserve_slow(struct lib_ring_buffer *buf,
1192 struct channel *chan,
1193 struct switch_offsets *offsets,
1194 struct lib_ring_buffer_ctx *ctx)
1195{
1196 const struct lib_ring_buffer_config *config = chan->backend.config;
1197 unsigned long reserve_commit_diff;
1198
1199 offsets->begin = v_read(config, &buf->offset);
1200 offsets->old = offsets->begin;
1201 offsets->switch_new_start = 0;
1202 offsets->switch_new_end = 0;
1203 offsets->switch_old_end = 0;
1204 offsets->pre_header_padding = 0;
1205
1206 ctx->tsc = config->cb.ring_buffer_clock_read(chan);
1207 if ((int64_t) ctx->tsc == -EIO)
1208 return -EIO;
1209
1210 if (last_tsc_overflow(config, buf, ctx->tsc))
1211 ctx->rflags |= RING_BUFFER_RFLAG_FULL_TSC;
1212
1213 if (unlikely(subbuf_offset(offsets->begin, ctx->chan) == 0)) {
1214 offsets->switch_new_start = 1; /* For offsets->begin */
1215 } else {
1216 offsets->size = config->cb.record_header_size(config, chan,
1217 offsets->begin,
1218 &offsets->pre_header_padding,
1219 ctx);
1220 offsets->size +=
1221 lib_ring_buffer_align(offsets->begin + offsets->size,
1222 ctx->largest_align)
1223 + ctx->data_size;
1224 if (unlikely(subbuf_offset(offsets->begin, chan) +
1225 offsets->size > chan->backend.subbuf_size)) {
1226 offsets->switch_old_end = 1; /* For offsets->old */
1227 offsets->switch_new_start = 1; /* For offsets->begin */
1228 }
1229 }
1230 if (unlikely(offsets->switch_new_start)) {
1231 unsigned long sb_index;
1232
1233 /*
1234 * We are typically not filling the previous buffer completely.
1235 */
1236 if (likely(offsets->switch_old_end))
1237 offsets->begin = subbuf_align(offsets->begin, chan);
1238 offsets->begin = offsets->begin
1239 + config->cb.subbuffer_header_size();
1240 /* Test new buffer integrity */
1241 sb_index = subbuf_index(offsets->begin, chan);
1242 reserve_commit_diff =
1243 (buf_trunc(offsets->begin, chan)
1244 >> chan->backend.num_subbuf_order)
1245 - ((unsigned long) v_read(config,
a6352fd4 1246 &shmp(buf->commit_cold)[sb_index].cc_sb)
852c2936
MD
1247 & chan->commit_count_mask);
1248 if (likely(reserve_commit_diff == 0)) {
1249 /* Next subbuffer not being written to. */
1250 if (unlikely(config->mode != RING_BUFFER_OVERWRITE &&
1251 subbuf_trunc(offsets->begin, chan)
1252 - subbuf_trunc((unsigned long)
a6352fd4 1253 uatomic_read(&buf->consumed), chan)
852c2936
MD
1254 >= chan->backend.buf_size)) {
1255 /*
1256 * We do not overwrite non consumed buffers
1257 * and we are full : record is lost.
1258 */
1259 v_inc(config, &buf->records_lost_full);
1260 return -ENOBUFS;
1261 } else {
1262 /*
1263 * Next subbuffer not being written to, and we
1264 * are either in overwrite mode or the buffer is
1265 * not full. It's safe to write in this new
1266 * subbuffer.
1267 */
1268 }
1269 } else {
1270 /*
1271 * Next subbuffer reserve offset does not match the
1272 * commit offset. Drop record in producer-consumer and
1273 * overwrite mode. Caused by either a writer OOPS or too
1274 * many nested writes over a reserve/commit pair.
1275 */
1276 v_inc(config, &buf->records_lost_wrap);
1277 return -EIO;
1278 }
1279 offsets->size =
1280 config->cb.record_header_size(config, chan,
1281 offsets->begin,
1282 &offsets->pre_header_padding,
1283 ctx);
1284 offsets->size +=
1285 lib_ring_buffer_align(offsets->begin + offsets->size,
1286 ctx->largest_align)
1287 + ctx->data_size;
1288 if (unlikely(subbuf_offset(offsets->begin, chan)
1289 + offsets->size > chan->backend.subbuf_size)) {
1290 /*
1291 * Record too big for subbuffers, report error, don't
1292 * complete the sub-buffer switch.
1293 */
1294 v_inc(config, &buf->records_lost_big);
1295 return -ENOSPC;
1296 } else {
1297 /*
1298 * We just made a successful buffer switch and the
1299 * record fits in the new subbuffer. Let's write.
1300 */
1301 }
1302 } else {
1303 /*
1304 * Record fits in the current buffer and we are not on a switch
1305 * boundary. It's safe to write.
1306 */
1307 }
1308 offsets->end = offsets->begin + offsets->size;
1309
1310 if (unlikely(subbuf_offset(offsets->end, chan) == 0)) {
1311 /*
1312 * The offset_end will fall at the very beginning of the next
1313 * subbuffer.
1314 */
1315 offsets->switch_new_end = 1; /* For offsets->begin */
1316 }
1317 return 0;
1318}
1319
1320/**
1321 * lib_ring_buffer_reserve_slow - Atomic slot reservation in a buffer.
1322 * @ctx: ring buffer context.
1323 *
1324 * Return : -NOBUFS if not enough space, -ENOSPC if event size too large,
1325 * -EIO for other errors, else returns 0.
1326 * It will take care of sub-buffer switching.
1327 */
1328int lib_ring_buffer_reserve_slow(struct lib_ring_buffer_ctx *ctx)
1329{
1330 struct channel *chan = ctx->chan;
1331 const struct lib_ring_buffer_config *config = chan->backend.config;
1332 struct lib_ring_buffer *buf;
1333 struct switch_offsets offsets;
1334 int ret;
1335
1336 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
a6352fd4 1337 buf = &shmp(chan->backend.buf)[ctx->cpu];
852c2936 1338 else
a6352fd4 1339 buf = shmp(chan->backend.buf);
852c2936
MD
1340 ctx->buf = buf;
1341
1342 offsets.size = 0;
1343
1344 do {
1345 ret = lib_ring_buffer_try_reserve_slow(buf, chan, &offsets,
1346 ctx);
1347 if (unlikely(ret))
1348 return ret;
1349 } while (unlikely(v_cmpxchg(config, &buf->offset, offsets.old,
1350 offsets.end)
1351 != offsets.old));
1352
1353 /*
1354 * Atomically update last_tsc. This update races against concurrent
1355 * atomic updates, but the race will always cause supplementary full TSC
1356 * records, never the opposite (missing a full TSC record when it would
1357 * be needed).
1358 */
1359 save_last_tsc(config, buf, ctx->tsc);
1360
1361 /*
1362 * Push the reader if necessary
1363 */
1364 lib_ring_buffer_reserve_push_reader(buf, chan, offsets.end - 1);
1365
1366 /*
1367 * Clear noref flag for this subbuffer.
1368 */
1369 lib_ring_buffer_clear_noref(config, &buf->backend,
1370 subbuf_index(offsets.end - 1, chan));
1371
1372 /*
1373 * Switch old subbuffer if needed.
1374 */
1375 if (unlikely(offsets.switch_old_end)) {
1376 lib_ring_buffer_clear_noref(config, &buf->backend,
1377 subbuf_index(offsets.old - 1, chan));
1378 lib_ring_buffer_switch_old_end(buf, chan, &offsets, ctx->tsc);
1379 }
1380
1381 /*
1382 * Populate new subbuffer.
1383 */
1384 if (unlikely(offsets.switch_new_start))
1385 lib_ring_buffer_switch_new_start(buf, chan, &offsets, ctx->tsc);
1386
1387 if (unlikely(offsets.switch_new_end))
1388 lib_ring_buffer_switch_new_end(buf, chan, &offsets, ctx->tsc);
1389
1390 ctx->slot_size = offsets.size;
1391 ctx->pre_offset = offsets.begin;
1392 ctx->buf_offset = offsets.begin + offsets.pre_header_padding;
1393 return 0;
1394}
This page took 0.080165 seconds and 4 git commands to generate.