Add ring buffer comment about shm
[lttng-ust.git] / libringbuffer / ring_buffer_frontend.c
CommitLineData
852c2936
MD
1/*
2 * ring_buffer_frontend.c
3 *
4 * (C) Copyright 2005-2010 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
5 *
6 * Ring buffer wait-free buffer synchronization. Producer-consumer and flight
7 * recorder (overwrite) modes. See thesis:
8 *
9 * Desnoyers, Mathieu (2009), "Low-Impact Operating System Tracing", Ph.D.
10 * dissertation, Ecole Polytechnique de Montreal.
11 * http://www.lttng.org/pub/thesis/desnoyers-dissertation-2009-12.pdf
12 *
13 * - Algorithm presentation in Chapter 5:
14 * "Lockless Multi-Core High-Throughput Buffering".
15 * - Algorithm formal verification in Section 8.6:
16 * "Formal verification of LTTng"
17 *
18 * Author:
19 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
20 *
21 * Inspired from LTT and RelayFS:
22 * Karim Yaghmour <karim@opersys.com>
23 * Tom Zanussi <zanussi@us.ibm.com>
24 * Bob Wisniewski <bob@watson.ibm.com>
25 * And from K42 :
26 * Bob Wisniewski <bob@watson.ibm.com>
27 *
28 * Buffer reader semantic :
29 *
30 * - get_subbuf_size
31 * while buffer is not finalized and empty
32 * - get_subbuf
33 * - if return value != 0, continue
34 * - splice one subbuffer worth of data to a pipe
35 * - splice the data from pipe to disk/network
36 * - put_subbuf
37 *
38 * Dual LGPL v2.1/GPL v2 license.
39 */
40
a6352fd4 41#include <sys/types.h>
431d5cf0
MD
42#include <sys/mman.h>
43#include <sys/stat.h>
44#include <fcntl.h>
14641deb 45#include <urcu/compiler.h>
a6352fd4 46#include <urcu/ref.h>
14641deb 47
a6352fd4 48#include "smp.h"
4931a13e
MD
49#include "config.h"
50#include "backend.h"
51#include "frontend.h"
a6352fd4 52#include "shm.h"
852c2936 53
431d5cf0
MD
54#ifndef max
55#define max(a, b) ((a) > (b) ? (a) : (b))
56#endif
57
2432c3c9
MD
58/*
59 * Use POSIX SHM: shm_open(3) and shm_unlink(3).
60 * close(2) to close the fd returned by shm_open.
61 * shm_unlink releases the shared memory object name.
62 * ftruncate(2) sets the size of the memory object.
63 * mmap/munmap maps the shared memory obj to a virtual address in the
64 * calling proceess (should be done both in libust and consumer).
65 * See shm_overview(7) for details.
66 * Pass file descriptor returned by shm_open(3) to ltt-sessiond through
67 * a UNIX socket.
68 *
69 * Since we don't need to access the object using its name, we can
70 * immediately shm_unlink(3) it, and only keep the handle with its file
71 * descriptor.
72 */
73
852c2936
MD
74/*
75 * Internal structure representing offsets to use at a sub-buffer switch.
76 */
77struct switch_offsets {
78 unsigned long begin, end, old;
79 size_t pre_header_padding, size;
80 unsigned int switch_new_start:1, switch_new_end:1, switch_old_start:1,
81 switch_old_end:1;
82};
83
a6352fd4 84__thread unsigned int lib_ring_buffer_nesting;
852c2936
MD
85
86static
87void lib_ring_buffer_print_errors(struct channel *chan,
88 struct lib_ring_buffer *buf, int cpu);
89
90/*
91 * Must be called under cpu hotplug protection.
92 */
93void lib_ring_buffer_free(struct lib_ring_buffer *buf)
94{
a6352fd4 95 struct channel *chan = shmp(buf->backend.chan);
852c2936
MD
96
97 lib_ring_buffer_print_errors(chan, buf, buf->backend.cpu);
431d5cf0
MD
98 /* buf->commit_hot will be freed by shm teardown */
99 /* buf->commit_cold will be freed by shm teardown */
852c2936
MD
100
101 lib_ring_buffer_backend_free(&buf->backend);
102}
103
104/**
105 * lib_ring_buffer_reset - Reset ring buffer to initial values.
106 * @buf: Ring buffer.
107 *
108 * Effectively empty the ring buffer. Should be called when the buffer is not
109 * used for writing. The ring buffer can be opened for reading, but the reader
110 * should not be using the iterator concurrently with reset. The previous
111 * current iterator record is reset.
112 */
113void lib_ring_buffer_reset(struct lib_ring_buffer *buf)
114{
a6352fd4 115 struct channel *chan = shmp(buf->backend.chan);
852c2936
MD
116 const struct lib_ring_buffer_config *config = chan->backend.config;
117 unsigned int i;
118
119 /*
120 * Reset iterator first. It will put the subbuffer if it currently holds
121 * it.
122 */
852c2936
MD
123 v_set(config, &buf->offset, 0);
124 for (i = 0; i < chan->backend.num_subbuf; i++) {
a6352fd4
MD
125 v_set(config, &shmp(buf->commit_hot)[i].cc, 0);
126 v_set(config, &shmp(buf->commit_hot)[i].seq, 0);
127 v_set(config, &shmp(buf->commit_cold)[i].cc_sb, 0);
852c2936 128 }
a6352fd4
MD
129 uatomic_set(&buf->consumed, 0);
130 uatomic_set(&buf->record_disabled, 0);
852c2936
MD
131 v_set(config, &buf->last_tsc, 0);
132 lib_ring_buffer_backend_reset(&buf->backend);
133 /* Don't reset number of active readers */
134 v_set(config, &buf->records_lost_full, 0);
135 v_set(config, &buf->records_lost_wrap, 0);
136 v_set(config, &buf->records_lost_big, 0);
137 v_set(config, &buf->records_count, 0);
138 v_set(config, &buf->records_overrun, 0);
139 buf->finalized = 0;
140}
852c2936
MD
141
142/**
143 * channel_reset - Reset channel to initial values.
144 * @chan: Channel.
145 *
146 * Effectively empty the channel. Should be called when the channel is not used
147 * for writing. The channel can be opened for reading, but the reader should not
148 * be using the iterator concurrently with reset. The previous current iterator
149 * record is reset.
150 */
151void channel_reset(struct channel *chan)
152{
153 /*
154 * Reset iterators first. Will put the subbuffer if held for reading.
155 */
a6352fd4 156 uatomic_set(&chan->record_disabled, 0);
852c2936
MD
157 /* Don't reset commit_count_mask, still valid */
158 channel_backend_reset(&chan->backend);
159 /* Don't reset switch/read timer interval */
160 /* Don't reset notifiers and notifier enable bits */
161 /* Don't reset reader reference count */
162}
852c2936
MD
163
164/*
165 * Must be called under cpu hotplug protection.
166 */
167int lib_ring_buffer_create(struct lib_ring_buffer *buf,
a6352fd4
MD
168 struct channel_backend *chanb, int cpu,
169 struct shm_header *shm_header)
852c2936
MD
170{
171 const struct lib_ring_buffer_config *config = chanb->config;
14641deb 172 struct channel *chan = caa_container_of(chanb, struct channel, backend);
852c2936
MD
173 void *priv = chanb->priv;
174 unsigned int num_subbuf;
175 size_t subbuf_header_size;
176 u64 tsc;
177 int ret;
178
179 /* Test for cpu hotplug */
180 if (buf->backend.allocated)
181 return 0;
182
a6352fd4
MD
183 ret = lib_ring_buffer_backend_create(&buf->backend, &chan->backend,
184 cpu, shm_header);
852c2936
MD
185 if (ret)
186 return ret;
187
431d5cf0
MD
188 align_shm(shm_header,
189 max(__alignof__(struct commit_counters_hot),
190 __alignof__(struct commit_counters_cold)));
a6352fd4
MD
191 set_shmp(&buf->commit_hot,
192 zalloc_shm(shm_header,
193 sizeof(*buf->commit_hot) * chan->backend.num_subbuf));
194 if (!shmp(buf->commit_hot)) {
852c2936
MD
195 ret = -ENOMEM;
196 goto free_chanbuf;
197 }
198
431d5cf0 199 align_shm(shm_header, __alignof__(struct commit_counters_cold));
a6352fd4
MD
200 set_shmp(&buf->commit_cold,
201 zalloc_shm(shm_header,
202 sizeof(*buf->commit_cold) * chan->backend.num_subbuf));
203 if (!shmp(buf->commit_cold)) {
852c2936
MD
204 ret = -ENOMEM;
205 goto free_commit;
206 }
207
208 num_subbuf = chan->backend.num_subbuf;
a6352fd4 209 //init_waitqueue_head(&buf->read_wait);
852c2936
MD
210
211 /*
212 * Write the subbuffer header for first subbuffer so we know the total
213 * duration of data gathering.
214 */
215 subbuf_header_size = config->cb.subbuffer_header_size();
216 v_set(config, &buf->offset, subbuf_header_size);
a6352fd4
MD
217 subbuffer_id_clear_noref(config, &shmp(buf->backend.buf_wsb)[0].id);
218 tsc = config->cb.ring_buffer_clock_read(shmp(buf->backend.chan));
852c2936 219 config->cb.buffer_begin(buf, tsc, 0);
a6352fd4 220 v_add(config, subbuf_header_size, &shmp(buf->commit_hot)[0].cc);
852c2936
MD
221
222 if (config->cb.buffer_create) {
223 ret = config->cb.buffer_create(buf, priv, cpu, chanb->name);
224 if (ret)
225 goto free_init;
226 }
852c2936 227 buf->backend.allocated = 1;
852c2936
MD
228 return 0;
229
230 /* Error handling */
231free_init:
a6352fd4 232 /* commit_cold will be freed by shm teardown */
852c2936 233free_commit:
a6352fd4 234 /* commit_hot will be freed by shm teardown */
852c2936
MD
235free_chanbuf:
236 lib_ring_buffer_backend_free(&buf->backend);
237 return ret;
238}
239
240static void switch_buffer_timer(unsigned long data)
241{
242 struct lib_ring_buffer *buf = (struct lib_ring_buffer *)data;
a6352fd4 243 struct channel *chan = shmp(buf->backend.chan);
852c2936
MD
244 const struct lib_ring_buffer_config *config = chan->backend.config;
245
246 /*
247 * Only flush buffers periodically if readers are active.
248 */
a6352fd4 249 if (uatomic_read(&buf->active_readers))
852c2936
MD
250 lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
251
a6352fd4
MD
252 //TODO timers
253 //if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
254 // mod_timer_pinned(&buf->switch_timer,
255 // jiffies + chan->switch_timer_interval);
256 //else
257 // mod_timer(&buf->switch_timer,
258 // jiffies + chan->switch_timer_interval);
852c2936
MD
259}
260
852c2936
MD
261static void lib_ring_buffer_start_switch_timer(struct lib_ring_buffer *buf)
262{
a6352fd4 263 struct channel *chan = shmp(buf->backend.chan);
852c2936
MD
264 const struct lib_ring_buffer_config *config = chan->backend.config;
265
266 if (!chan->switch_timer_interval || buf->switch_timer_enabled)
267 return;
a6352fd4
MD
268 //TODO
269 //init_timer(&buf->switch_timer);
270 //buf->switch_timer.function = switch_buffer_timer;
271 //buf->switch_timer.expires = jiffies + chan->switch_timer_interval;
272 //buf->switch_timer.data = (unsigned long)buf;
273 //if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
274 // add_timer_on(&buf->switch_timer, buf->backend.cpu);
275 //else
276 // add_timer(&buf->switch_timer);
852c2936
MD
277 buf->switch_timer_enabled = 1;
278}
279
852c2936
MD
280static void lib_ring_buffer_stop_switch_timer(struct lib_ring_buffer *buf)
281{
a6352fd4 282 struct channel *chan = shmp(buf->backend.chan);
852c2936
MD
283
284 if (!chan->switch_timer_interval || !buf->switch_timer_enabled)
285 return;
286
a6352fd4
MD
287 //TODO
288 //del_timer_sync(&buf->switch_timer);
852c2936
MD
289 buf->switch_timer_enabled = 0;
290}
291
292/*
293 * Polling timer to check the channels for data.
294 */
295static void read_buffer_timer(unsigned long data)
296{
297 struct lib_ring_buffer *buf = (struct lib_ring_buffer *)data;
a6352fd4 298 struct channel *chan = shmp(buf->backend.chan);
852c2936
MD
299 const struct lib_ring_buffer_config *config = chan->backend.config;
300
301 CHAN_WARN_ON(chan, !buf->backend.allocated);
302
a6352fd4 303 if (uatomic_read(&buf->active_readers)
852c2936 304 && lib_ring_buffer_poll_deliver(config, buf, chan)) {
a6352fd4
MD
305 //TODO
306 //wake_up_interruptible(&buf->read_wait);
307 //wake_up_interruptible(&chan->read_wait);
852c2936
MD
308 }
309
a6352fd4
MD
310 //TODO
311 //if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
312 // mod_timer_pinned(&buf->read_timer,
313 // jiffies + chan->read_timer_interval);
314 //else
315 // mod_timer(&buf->read_timer,
316 // jiffies + chan->read_timer_interval);
852c2936
MD
317}
318
852c2936
MD
319static void lib_ring_buffer_start_read_timer(struct lib_ring_buffer *buf)
320{
a6352fd4 321 struct channel *chan = shmp(buf->backend.chan);
852c2936
MD
322 const struct lib_ring_buffer_config *config = chan->backend.config;
323
324 if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER
325 || !chan->read_timer_interval
326 || buf->read_timer_enabled)
327 return;
328
a6352fd4
MD
329 //TODO
330 //init_timer(&buf->read_timer);
331 //buf->read_timer.function = read_buffer_timer;
332 //buf->read_timer.expires = jiffies + chan->read_timer_interval;
333 //buf->read_timer.data = (unsigned long)buf;
852c2936 334
a6352fd4
MD
335 //if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
336 // add_timer_on(&buf->read_timer, buf->backend.cpu);
337 //else
338 // add_timer(&buf->read_timer);
852c2936
MD
339 buf->read_timer_enabled = 1;
340}
341
852c2936
MD
342static void lib_ring_buffer_stop_read_timer(struct lib_ring_buffer *buf)
343{
a6352fd4 344 struct channel *chan = shmp(buf->backend.chan);
852c2936
MD
345 const struct lib_ring_buffer_config *config = chan->backend.config;
346
347 if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER
348 || !chan->read_timer_interval
349 || !buf->read_timer_enabled)
350 return;
351
a6352fd4
MD
352 //TODO
353 //del_timer_sync(&buf->read_timer);
852c2936
MD
354 /*
355 * do one more check to catch data that has been written in the last
356 * timer period.
357 */
358 if (lib_ring_buffer_poll_deliver(config, buf, chan)) {
a6352fd4
MD
359 //TODO
360 //wake_up_interruptible(&buf->read_wait);
361 //wake_up_interruptible(&chan->read_wait);
852c2936
MD
362 }
363 buf->read_timer_enabled = 0;
364}
365
852c2936
MD
366static void channel_unregister_notifiers(struct channel *chan)
367{
368 const struct lib_ring_buffer_config *config = chan->backend.config;
369 int cpu;
370
852c2936 371 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
852c2936 372 for_each_possible_cpu(cpu) {
a6352fd4
MD
373 struct lib_ring_buffer *buf = &shmp(chan->backend.buf)[cpu];
374
852c2936
MD
375 lib_ring_buffer_stop_switch_timer(buf);
376 lib_ring_buffer_stop_read_timer(buf);
377 }
852c2936 378 } else {
a6352fd4 379 struct lib_ring_buffer *buf = shmp(chan->backend.buf);
852c2936
MD
380
381 lib_ring_buffer_stop_switch_timer(buf);
382 lib_ring_buffer_stop_read_timer(buf);
383 }
384 channel_backend_unregister_notifiers(&chan->backend);
385}
386
431d5cf0 387static void channel_free(struct shm_handle *handle)
852c2936 388{
431d5cf0
MD
389 struct shm_header *header = handle->header;
390 struct channel *chan = shmp(header->chan);
391 int ret;
392
852c2936 393 channel_backend_free(&chan->backend);
431d5cf0
MD
394 /* chan is freed by shm teardown */
395 ret = munmap(header, header->shm_size);
396 if (ret) {
397 PERROR("umnmap");
398 assert(0);
399 }
400 ret = close(handle->shmfd);
401 if (ret) {
402 PERROR("close");
403 assert(0);
404 }
852c2936
MD
405}
406
407/**
408 * channel_create - Create channel.
409 * @config: ring buffer instance configuration
410 * @name: name of the channel
411 * @priv: ring buffer client private data
412 * @buf_addr: pointer the the beginning of the preallocated buffer contiguous
413 * address mapping. It is used only by RING_BUFFER_STATIC
414 * configuration. It can be set to NULL for other backends.
415 * @subbuf_size: subbuffer size
416 * @num_subbuf: number of subbuffers
417 * @switch_timer_interval: Time interval (in us) to fill sub-buffers with
418 * padding to let readers get those sub-buffers.
419 * Used for live streaming.
420 * @read_timer_interval: Time interval (in us) to wake up pending readers.
431d5cf0
MD
421 * @shmfd: shared memory file descriptor (output, needs to be closed by
422 * the caller)
852c2936
MD
423 *
424 * Holds cpu hotplug.
425 * Returns NULL on failure.
426 */
431d5cf0 427struct shm_handle *channel_create(const struct lib_ring_buffer_config *config,
852c2936
MD
428 const char *name, void *priv, void *buf_addr,
429 size_t subbuf_size,
430 size_t num_subbuf, unsigned int switch_timer_interval,
431d5cf0 431 unsigned int read_timer_interval)
852c2936 432{
431d5cf0 433 int ret, cpu, shmfd;
852c2936 434 struct channel *chan;
431d5cf0 435 size_t shmsize, bufshmsize, bufshmalign;
a6352fd4
MD
436 struct shm_header *shm_header;
437 unsigned long num_subbuf_alloc;
431d5cf0 438 struct shm_handle *handle;
852c2936
MD
439
440 if (lib_ring_buffer_check_config(config, switch_timer_interval,
441 read_timer_interval))
442 return NULL;
443
431d5cf0
MD
444 handle = zmalloc(sizeof(struct shm_handle));
445 if (!handle)
446 return NULL;
447
a6352fd4
MD
448 /* Calculate the shm allocation layout */
449 shmsize = sizeof(struct shm_header);
431d5cf0 450 shmsize += offset_align(shmsize, __alignof__(struct channel));
a6352fd4
MD
451 shmsize += sizeof(struct channel);
452
453 /* Per-cpu buffer size: control (prior to backend) */
431d5cf0 454 shmsize += offset_align(shmsize, __alignof__(struct lib_ring_buffer));
a6352fd4
MD
455 bufshmsize = sizeof(struct lib_ring_buffer);
456 shmsize += bufshmsize * num_possible_cpus();
457
458 /* Per-cpu buffer size: backend */
431d5cf0 459 shmsize += offset_align(shmsize, PAGE_SIZE);
a6352fd4
MD
460 /* num_subbuf + 1 is the worse case */
461 num_subbuf_alloc = num_subbuf + 1;
462 bufshmsize = sizeof(struct lib_ring_buffer_backend_pages *) * num_subbuf_alloc;
431d5cf0
MD
463 bufshmsize += offset_align(bufshmsize, PAGE_SIZE);
464 bufshmsize += subbuf_size * num_subbuf_alloc;
465 bufshmsize += offset_align(bufshmsize, __alignof__(struct lib_ring_buffer_backend_pages));
466 bufshmsize += sizeof(struct lib_ring_buffer_backend_pages) * num_subbuf_alloc;
467 bufshmsize += offset_align(bufshmsize, __alignof__(struct lib_ring_buffer_backend_subbuffer));
a6352fd4 468 bufshmsize += sizeof(struct lib_ring_buffer_backend_subbuffer) * num_subbuf;
431d5cf0 469 bufshmsize += offset_align(bufshmsize, PAGE_SIZE);
a6352fd4
MD
470 shmsize += bufshmsize * num_possible_cpus();
471
472 /* Per-cpu buffer size: control (after backend) */
431d5cf0
MD
473 shmsize += offset_align(shmsize,
474 max(__alignof__(struct commit_counters_hot),
475 __alignof__(struct commit_counters_cold)));
476 bufshmsize = sizeof(struct commit_counters_hot) * num_subbuf;
477 bufshmsize += offset_align(bufshmsize, __alignof__(struct commit_counters_cold));
a6352fd4 478 bufshmsize += sizeof(struct commit_counters_cold) * num_subbuf;
431d5cf0 479 shmsize += bufshmsize * num_possible_cpus();
a6352fd4 480
431d5cf0
MD
481 /*
482 * Allocate shm, and immediately unlink its shm oject, keeping
483 * only the file descriptor as a reference to the object. If it
484 * already exists (caused by short race window during which the
485 * global object exists in a concurrent shm_open), simply retry.
486 */
487 do {
488 shmfd = shm_open("/ust-shm-tmp",
489 O_CREAT | O_EXCL | O_RDWR, 0700);
490 } while (shmfd < 0 && errno == EEXIST);
491 if (shmfd < 0) {
492 PERROR("shm_open");
493 goto error_shm_open;
a6352fd4 494 }
431d5cf0
MD
495 ret = shm_unlink("/ust-shm-tmp");
496 if (ret) {
497 PERROR("shm_unlink");
498 goto error_unlink;
499 }
500 ret = ftruncate(shmfd, shmsize);
501 if (ret) {
502 PERROR("ftruncate");
503 goto error_ftruncate;
a6352fd4 504 }
852c2936 505
431d5cf0
MD
506 shm_header = mmap(NULL, shmsize, PROT_READ | PROT_WRITE,
507 MAP_SHARED, shmfd, 0);
508 if (shm_header == MAP_FAILED) {
509 PERROR("mmap");
510 goto error_mmap;
a6352fd4
MD
511 }
512
513 shm_header->magic = SHM_MAGIC;
514 shm_header->major = SHM_MAJOR;
515 shm_header->major = SHM_MINOR;
516 shm_header->bits_per_long = CAA_BITS_PER_LONG;
517 shm_header->shm_size = shmsize;
518 shm_header->shm_allocated = sizeof(struct shm_header);
519
431d5cf0 520 align_shm(shm_header, __alignof__(struct channel));
a6352fd4
MD
521 chan = zalloc_shm(shm_header, sizeof(struct channel));
522 if (!chan)
523 goto destroy_shmem;
524 set_shmp(shm_header->chan, chan);
525
526 ret = channel_backend_init(&chan->backend, name, config, priv,
527 subbuf_size, num_subbuf, shm_header);
852c2936 528 if (ret)
a6352fd4 529 goto destroy_shmem;
852c2936
MD
530
531 chan->commit_count_mask = (~0UL >> chan->backend.num_subbuf_order);
a6352fd4
MD
532 //TODO
533 //chan->switch_timer_interval = usecs_to_jiffies(switch_timer_interval);
534 //chan->read_timer_interval = usecs_to_jiffies(read_timer_interval);
a6352fd4
MD
535 //TODO
536 //init_waitqueue_head(&chan->read_wait);
537 //init_waitqueue_head(&chan->hp_wait);
852c2936
MD
538
539 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
852c2936
MD
540 /*
541 * In case of non-hotplug cpu, if the ring-buffer is allocated
542 * in early initcall, it will not be notified of secondary cpus.
543 * In that off case, we need to allocate for all possible cpus.
544 */
852c2936 545 for_each_possible_cpu(cpu) {
a6352fd4 546 struct lib_ring_buffer *buf = &shmp(chan->backend.buf)[cpu];
852c2936
MD
547 lib_ring_buffer_start_switch_timer(buf);
548 lib_ring_buffer_start_read_timer(buf);
852c2936 549 }
852c2936 550 } else {
a6352fd4 551 struct lib_ring_buffer *buf = shmp(chan->backend.buf);
852c2936
MD
552
553 lib_ring_buffer_start_switch_timer(buf);
554 lib_ring_buffer_start_read_timer(buf);
555 }
556
431d5cf0
MD
557 handle->header = shm_header;
558 handle->shmfd = shmfd;
559 return handle;
852c2936 560
a6352fd4 561destroy_shmem:
431d5cf0
MD
562 ret = munmap(shm_header, shmsize);
563 if (ret) {
564 PERROR("umnmap");
565 assert(0);
a6352fd4 566 }
431d5cf0
MD
567error_mmap:
568error_ftruncate:
569error_unlink:
570 ret = close(shmfd);
571 if (ret) {
572 PERROR("close");
573 assert(0);
574 }
575error_shm_open:
576 free(handle);
852c2936
MD
577 return NULL;
578}
852c2936
MD
579
580static
431d5cf0 581void channel_release(struct shm_handle *handle)
852c2936 582{
431d5cf0 583 channel_free(handle);
852c2936
MD
584}
585
586/**
587 * channel_destroy - Finalize, wait for q.s. and destroy channel.
588 * @chan: channel to destroy
589 *
590 * Holds cpu hotplug.
431d5cf0
MD
591 * Call "destroy" callback, finalize channels, decrement the channel
592 * reference count. Note that when readers have completed data
593 * consumption of finalized channels, get_subbuf() will return -ENODATA.
594 * They should release their handle at that point. Returns the private
595 * data pointer.
852c2936 596 */
431d5cf0 597void *channel_destroy(struct shm_handle *handle)
852c2936 598{
431d5cf0
MD
599 struct shm_header *header = handle->header;
600 struct channel *chan = shmp(header->chan);
852c2936
MD
601 const struct lib_ring_buffer_config *config = chan->backend.config;
602 void *priv;
431d5cf0 603 int cpu;
852c2936
MD
604
605 channel_unregister_notifiers(chan);
606
607 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
852c2936 608 for_each_channel_cpu(cpu, chan) {
a6352fd4 609 struct lib_ring_buffer *buf = &shmp(chan->backend.buf)[cpu];
852c2936
MD
610
611 if (config->cb.buffer_finalize)
612 config->cb.buffer_finalize(buf,
613 chan->backend.priv,
614 cpu);
615 if (buf->backend.allocated)
616 lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH);
617 /*
618 * Perform flush before writing to finalized.
619 */
a6352fd4 620 cmm_smp_wmb();
14641deb 621 CMM_ACCESS_ONCE(buf->finalized) = 1;
a6352fd4 622 //wake_up_interruptible(&buf->read_wait);
852c2936
MD
623 }
624 } else {
a6352fd4 625 struct lib_ring_buffer *buf = shmp(chan->backend.buf);
852c2936
MD
626
627 if (config->cb.buffer_finalize)
628 config->cb.buffer_finalize(buf, chan->backend.priv, -1);
629 if (buf->backend.allocated)
630 lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH);
631 /*
632 * Perform flush before writing to finalized.
633 */
a6352fd4 634 cmm_smp_wmb();
14641deb 635 CMM_ACCESS_ONCE(buf->finalized) = 1;
a6352fd4 636 //wake_up_interruptible(&buf->read_wait);
852c2936 637 }
14641deb 638 CMM_ACCESS_ONCE(chan->finalized) = 1;
a6352fd4
MD
639 //wake_up_interruptible(&chan->hp_wait);
640 //wake_up_interruptible(&chan->read_wait);
431d5cf0
MD
641 /*
642 * sessiond/consumer are keeping a reference on the shm file
643 * descriptor directly. No need to refcount.
644 */
645 channel_release(handle);
852c2936
MD
646 priv = chan->backend.priv;
647 return priv;
648}
852c2936
MD
649
650struct lib_ring_buffer *channel_get_ring_buffer(
651 const struct lib_ring_buffer_config *config,
652 struct channel *chan, int cpu)
653{
654 if (config->alloc == RING_BUFFER_ALLOC_GLOBAL)
a6352fd4 655 return shmp(chan->backend.buf);
852c2936 656 else
a6352fd4 657 return &shmp(chan->backend.buf)[cpu];
852c2936 658}
852c2936
MD
659
660int lib_ring_buffer_open_read(struct lib_ring_buffer *buf)
661{
a6352fd4 662 struct channel *chan = shmp(buf->backend.chan);
852c2936 663
a6352fd4 664 if (uatomic_cmpxchg(&buf->active_readers, 0, 1) != 0)
852c2936 665 return -EBUSY;
a6352fd4 666 cmm_smp_mb();
852c2936
MD
667 return 0;
668}
852c2936
MD
669
670void lib_ring_buffer_release_read(struct lib_ring_buffer *buf)
671{
a6352fd4 672 struct channel *chan = shmp(buf->backend.chan);
852c2936 673
a6352fd4
MD
674 CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1);
675 cmm_smp_mb();
676 uatomic_dec(&buf->active_readers);
852c2936
MD
677}
678
679/**
680 * lib_ring_buffer_snapshot - save subbuffer position snapshot (for read)
681 * @buf: ring buffer
682 * @consumed: consumed count indicating the position where to read
683 * @produced: produced count, indicates position when to stop reading
684 *
685 * Returns -ENODATA if buffer is finalized, -EAGAIN if there is currently no
686 * data to read at consumed position, or 0 if the get operation succeeds.
852c2936
MD
687 */
688
689int lib_ring_buffer_snapshot(struct lib_ring_buffer *buf,
690 unsigned long *consumed, unsigned long *produced)
691{
a6352fd4 692 struct channel *chan = shmp(buf->backend.chan);
852c2936
MD
693 const struct lib_ring_buffer_config *config = chan->backend.config;
694 unsigned long consumed_cur, write_offset;
695 int finalized;
696
14641deb 697 finalized = CMM_ACCESS_ONCE(buf->finalized);
852c2936
MD
698 /*
699 * Read finalized before counters.
700 */
a6352fd4
MD
701 cmm_smp_rmb();
702 consumed_cur = uatomic_read(&buf->consumed);
852c2936
MD
703 /*
704 * No need to issue a memory barrier between consumed count read and
705 * write offset read, because consumed count can only change
706 * concurrently in overwrite mode, and we keep a sequence counter
707 * identifier derived from the write offset to check we are getting
708 * the same sub-buffer we are expecting (the sub-buffers are atomically
709 * "tagged" upon writes, tags are checked upon read).
710 */
711 write_offset = v_read(config, &buf->offset);
712
713 /*
714 * Check that we are not about to read the same subbuffer in
715 * which the writer head is.
716 */
717 if (subbuf_trunc(write_offset, chan) - subbuf_trunc(consumed_cur, chan)
718 == 0)
719 goto nodata;
720
721 *consumed = consumed_cur;
722 *produced = subbuf_trunc(write_offset, chan);
723
724 return 0;
725
726nodata:
727 /*
728 * The memory barriers __wait_event()/wake_up_interruptible() take care
729 * of "raw_spin_is_locked" memory ordering.
730 */
731 if (finalized)
732 return -ENODATA;
852c2936
MD
733 else
734 return -EAGAIN;
735}
852c2936
MD
736
737/**
738 * lib_ring_buffer_put_snapshot - move consumed counter forward
739 * @buf: ring buffer
740 * @consumed_new: new consumed count value
741 */
742void lib_ring_buffer_move_consumer(struct lib_ring_buffer *buf,
743 unsigned long consumed_new)
744{
745 struct lib_ring_buffer_backend *bufb = &buf->backend;
a6352fd4 746 struct channel *chan = shmp(bufb->chan);
852c2936
MD
747 unsigned long consumed;
748
a6352fd4 749 CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1);
852c2936
MD
750
751 /*
752 * Only push the consumed value forward.
753 * If the consumed cmpxchg fails, this is because we have been pushed by
754 * the writer in flight recorder mode.
755 */
a6352fd4 756 consumed = uatomic_read(&buf->consumed);
852c2936 757 while ((long) consumed - (long) consumed_new < 0)
a6352fd4
MD
758 consumed = uatomic_cmpxchg(&buf->consumed, consumed,
759 consumed_new);
852c2936 760}
852c2936
MD
761
762/**
763 * lib_ring_buffer_get_subbuf - get exclusive access to subbuffer for reading
764 * @buf: ring buffer
765 * @consumed: consumed count indicating the position where to read
766 *
767 * Returns -ENODATA if buffer is finalized, -EAGAIN if there is currently no
768 * data to read at consumed position, or 0 if the get operation succeeds.
852c2936
MD
769 */
770int lib_ring_buffer_get_subbuf(struct lib_ring_buffer *buf,
771 unsigned long consumed)
772{
a6352fd4 773 struct channel *chan = shmp(buf->backend.chan);
852c2936
MD
774 const struct lib_ring_buffer_config *config = chan->backend.config;
775 unsigned long consumed_cur, consumed_idx, commit_count, write_offset;
776 int ret;
777 int finalized;
778
779retry:
14641deb 780 finalized = CMM_ACCESS_ONCE(buf->finalized);
852c2936
MD
781 /*
782 * Read finalized before counters.
783 */
a6352fd4
MD
784 cmm_smp_rmb();
785 consumed_cur = uatomic_read(&buf->consumed);
852c2936 786 consumed_idx = subbuf_index(consumed, chan);
a6352fd4 787 commit_count = v_read(config, &shmp(buf->commit_cold)[consumed_idx].cc_sb);
852c2936
MD
788 /*
789 * Make sure we read the commit count before reading the buffer
790 * data and the write offset. Correct consumed offset ordering
791 * wrt commit count is insured by the use of cmpxchg to update
792 * the consumed offset.
852c2936 793 */
a6352fd4
MD
794 /*
795 * Local rmb to match the remote wmb to read the commit count
796 * before the buffer data and the write offset.
797 */
798 cmm_smp_rmb();
852c2936
MD
799
800 write_offset = v_read(config, &buf->offset);
801
802 /*
803 * Check that the buffer we are getting is after or at consumed_cur
804 * position.
805 */
806 if ((long) subbuf_trunc(consumed, chan)
807 - (long) subbuf_trunc(consumed_cur, chan) < 0)
808 goto nodata;
809
810 /*
811 * Check that the subbuffer we are trying to consume has been
812 * already fully committed.
813 */
814 if (((commit_count - chan->backend.subbuf_size)
815 & chan->commit_count_mask)
816 - (buf_trunc(consumed_cur, chan)
817 >> chan->backend.num_subbuf_order)
818 != 0)
819 goto nodata;
820
821 /*
822 * Check that we are not about to read the same subbuffer in
823 * which the writer head is.
824 */
825 if (subbuf_trunc(write_offset, chan) - subbuf_trunc(consumed_cur, chan)
826 == 0)
827 goto nodata;
828
829 /*
830 * Failure to get the subbuffer causes a busy-loop retry without going
831 * to a wait queue. These are caused by short-lived race windows where
832 * the writer is getting access to a subbuffer we were trying to get
833 * access to. Also checks that the "consumed" buffer count we are
834 * looking for matches the one contained in the subbuffer id.
835 */
836 ret = update_read_sb_index(config, &buf->backend, &chan->backend,
837 consumed_idx, buf_trunc_val(consumed, chan));
838 if (ret)
839 goto retry;
840 subbuffer_id_clear_noref(config, &buf->backend.buf_rsb.id);
841
842 buf->get_subbuf_consumed = consumed;
843 buf->get_subbuf = 1;
844
845 return 0;
846
847nodata:
848 /*
849 * The memory barriers __wait_event()/wake_up_interruptible() take care
850 * of "raw_spin_is_locked" memory ordering.
851 */
852 if (finalized)
853 return -ENODATA;
852c2936
MD
854 else
855 return -EAGAIN;
856}
852c2936
MD
857
858/**
859 * lib_ring_buffer_put_subbuf - release exclusive subbuffer access
860 * @buf: ring buffer
861 */
862void lib_ring_buffer_put_subbuf(struct lib_ring_buffer *buf)
863{
864 struct lib_ring_buffer_backend *bufb = &buf->backend;
a6352fd4 865 struct channel *chan = shmp(bufb->chan);
852c2936
MD
866 const struct lib_ring_buffer_config *config = chan->backend.config;
867 unsigned long read_sb_bindex, consumed_idx, consumed;
868
a6352fd4 869 CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1);
852c2936
MD
870
871 if (!buf->get_subbuf) {
872 /*
873 * Reader puts a subbuffer it did not get.
874 */
875 CHAN_WARN_ON(chan, 1);
876 return;
877 }
878 consumed = buf->get_subbuf_consumed;
879 buf->get_subbuf = 0;
880
881 /*
882 * Clear the records_unread counter. (overruns counter)
883 * Can still be non-zero if a file reader simply grabbed the data
884 * without using iterators.
885 * Can be below zero if an iterator is used on a snapshot more than
886 * once.
887 */
888 read_sb_bindex = subbuffer_id_get_index(config, bufb->buf_rsb.id);
889 v_add(config, v_read(config,
a6352fd4 890 &shmp(bufb->array)[read_sb_bindex]->records_unread),
852c2936 891 &bufb->records_read);
a6352fd4 892 v_set(config, &shmp(bufb->array)[read_sb_bindex]->records_unread, 0);
852c2936
MD
893 CHAN_WARN_ON(chan, config->mode == RING_BUFFER_OVERWRITE
894 && subbuffer_id_is_noref(config, bufb->buf_rsb.id));
895 subbuffer_id_set_noref(config, &bufb->buf_rsb.id);
896
897 /*
898 * Exchange the reader subbuffer with the one we put in its place in the
899 * writer subbuffer table. Expect the original consumed count. If
900 * update_read_sb_index fails, this is because the writer updated the
901 * subbuffer concurrently. We should therefore keep the subbuffer we
902 * currently have: it has become invalid to try reading this sub-buffer
903 * consumed count value anyway.
904 */
905 consumed_idx = subbuf_index(consumed, chan);
906 update_read_sb_index(config, &buf->backend, &chan->backend,
907 consumed_idx, buf_trunc_val(consumed, chan));
908 /*
909 * update_read_sb_index return value ignored. Don't exchange sub-buffer
910 * if the writer concurrently updated it.
911 */
912}
852c2936
MD
913
914/*
915 * cons_offset is an iterator on all subbuffer offsets between the reader
916 * position and the writer position. (inclusive)
917 */
918static
919void lib_ring_buffer_print_subbuffer_errors(struct lib_ring_buffer *buf,
920 struct channel *chan,
921 unsigned long cons_offset,
922 int cpu)
923{
924 const struct lib_ring_buffer_config *config = chan->backend.config;
925 unsigned long cons_idx, commit_count, commit_count_sb;
926
927 cons_idx = subbuf_index(cons_offset, chan);
a6352fd4
MD
928 commit_count = v_read(config, &shmp(buf->commit_hot)[cons_idx].cc);
929 commit_count_sb = v_read(config, &shmp(buf->commit_cold)[cons_idx].cc_sb);
852c2936
MD
930
931 if (subbuf_offset(commit_count, chan) != 0)
a6352fd4 932 ERRMSG("ring buffer %s, cpu %d: "
852c2936
MD
933 "commit count in subbuffer %lu,\n"
934 "expecting multiples of %lu bytes\n"
935 " [ %lu bytes committed, %lu bytes reader-visible ]\n",
936 chan->backend.name, cpu, cons_idx,
937 chan->backend.subbuf_size,
938 commit_count, commit_count_sb);
939
a6352fd4 940 ERRMSG("ring buffer: %s, cpu %d: %lu bytes committed\n",
852c2936
MD
941 chan->backend.name, cpu, commit_count);
942}
943
944static
945void lib_ring_buffer_print_buffer_errors(struct lib_ring_buffer *buf,
946 struct channel *chan,
947 void *priv, int cpu)
948{
949 const struct lib_ring_buffer_config *config = chan->backend.config;
950 unsigned long write_offset, cons_offset;
951
952 /*
953 * Can be called in the error path of allocation when
954 * trans_channel_data is not yet set.
955 */
956 if (!chan)
957 return;
958 /*
959 * No need to order commit_count, write_offset and cons_offset reads
960 * because we execute at teardown when no more writer nor reader
961 * references are left.
962 */
963 write_offset = v_read(config, &buf->offset);
a6352fd4 964 cons_offset = uatomic_read(&buf->consumed);
852c2936 965 if (write_offset != cons_offset)
a6352fd4 966 ERRMSG("ring buffer %s, cpu %d: "
852c2936
MD
967 "non-consumed data\n"
968 " [ %lu bytes written, %lu bytes read ]\n",
969 chan->backend.name, cpu, write_offset, cons_offset);
970
a6352fd4 971 for (cons_offset = uatomic_read(&buf->consumed);
852c2936
MD
972 (long) (subbuf_trunc((unsigned long) v_read(config, &buf->offset),
973 chan)
974 - cons_offset) > 0;
975 cons_offset = subbuf_align(cons_offset, chan))
976 lib_ring_buffer_print_subbuffer_errors(buf, chan, cons_offset,
977 cpu);
978}
979
980static
981void lib_ring_buffer_print_errors(struct channel *chan,
982 struct lib_ring_buffer *buf, int cpu)
983{
984 const struct lib_ring_buffer_config *config = chan->backend.config;
985 void *priv = chan->backend.priv;
986
a6352fd4 987 ERRMSG("ring buffer %s, cpu %d: %lu records written, "
852c2936
MD
988 "%lu records overrun\n",
989 chan->backend.name, cpu,
990 v_read(config, &buf->records_count),
991 v_read(config, &buf->records_overrun));
992
993 if (v_read(config, &buf->records_lost_full)
994 || v_read(config, &buf->records_lost_wrap)
995 || v_read(config, &buf->records_lost_big))
a6352fd4 996 ERRMSG("ring buffer %s, cpu %d: records were lost. Caused by:\n"
852c2936
MD
997 " [ %lu buffer full, %lu nest buffer wrap-around, "
998 "%lu event too big ]\n",
999 chan->backend.name, cpu,
1000 v_read(config, &buf->records_lost_full),
1001 v_read(config, &buf->records_lost_wrap),
1002 v_read(config, &buf->records_lost_big));
1003
1004 lib_ring_buffer_print_buffer_errors(buf, chan, priv, cpu);
1005}
1006
1007/*
1008 * lib_ring_buffer_switch_old_start: Populate old subbuffer header.
1009 *
1010 * Only executed when the buffer is finalized, in SWITCH_FLUSH.
1011 */
1012static
1013void lib_ring_buffer_switch_old_start(struct lib_ring_buffer *buf,
1014 struct channel *chan,
1015 struct switch_offsets *offsets,
1016 u64 tsc)
1017{
1018 const struct lib_ring_buffer_config *config = chan->backend.config;
1019 unsigned long oldidx = subbuf_index(offsets->old, chan);
1020 unsigned long commit_count;
1021
1022 config->cb.buffer_begin(buf, tsc, oldidx);
1023
1024 /*
1025 * Order all writes to buffer before the commit count update that will
1026 * determine that the subbuffer is full.
1027 */
a6352fd4 1028 cmm_smp_wmb();
852c2936 1029 v_add(config, config->cb.subbuffer_header_size(),
a6352fd4
MD
1030 &shmp(buf->commit_hot)[oldidx].cc);
1031 commit_count = v_read(config, &shmp(buf->commit_hot)[oldidx].cc);
852c2936
MD
1032 /* Check if the written buffer has to be delivered */
1033 lib_ring_buffer_check_deliver(config, buf, chan, offsets->old,
1034 commit_count, oldidx);
1035 lib_ring_buffer_write_commit_counter(config, buf, chan, oldidx,
1036 offsets->old, commit_count,
1037 config->cb.subbuffer_header_size());
1038}
1039
1040/*
1041 * lib_ring_buffer_switch_old_end: switch old subbuffer
1042 *
1043 * Note : offset_old should never be 0 here. It is ok, because we never perform
1044 * buffer switch on an empty subbuffer in SWITCH_ACTIVE mode. The caller
1045 * increments the offset_old value when doing a SWITCH_FLUSH on an empty
1046 * subbuffer.
1047 */
1048static
1049void lib_ring_buffer_switch_old_end(struct lib_ring_buffer *buf,
1050 struct channel *chan,
1051 struct switch_offsets *offsets,
1052 u64 tsc)
1053{
1054 const struct lib_ring_buffer_config *config = chan->backend.config;
1055 unsigned long oldidx = subbuf_index(offsets->old - 1, chan);
1056 unsigned long commit_count, padding_size, data_size;
1057
1058 data_size = subbuf_offset(offsets->old - 1, chan) + 1;
1059 padding_size = chan->backend.subbuf_size - data_size;
1060 subbuffer_set_data_size(config, &buf->backend, oldidx, data_size);
1061
1062 /*
1063 * Order all writes to buffer before the commit count update that will
1064 * determine that the subbuffer is full.
1065 */
a6352fd4
MD
1066 cmm_smp_wmb();
1067 v_add(config, padding_size, &shmp(buf->commit_hot)[oldidx].cc);
1068 commit_count = v_read(config, &shmp(buf->commit_hot)[oldidx].cc);
852c2936
MD
1069 lib_ring_buffer_check_deliver(config, buf, chan, offsets->old - 1,
1070 commit_count, oldidx);
1071 lib_ring_buffer_write_commit_counter(config, buf, chan, oldidx,
1072 offsets->old, commit_count,
1073 padding_size);
1074}
1075
1076/*
1077 * lib_ring_buffer_switch_new_start: Populate new subbuffer.
1078 *
1079 * This code can be executed unordered : writers may already have written to the
1080 * sub-buffer before this code gets executed, caution. The commit makes sure
1081 * that this code is executed before the deliver of this sub-buffer.
1082 */
1083static
1084void lib_ring_buffer_switch_new_start(struct lib_ring_buffer *buf,
1085 struct channel *chan,
1086 struct switch_offsets *offsets,
1087 u64 tsc)
1088{
1089 const struct lib_ring_buffer_config *config = chan->backend.config;
1090 unsigned long beginidx = subbuf_index(offsets->begin, chan);
1091 unsigned long commit_count;
1092
1093 config->cb.buffer_begin(buf, tsc, beginidx);
1094
1095 /*
1096 * Order all writes to buffer before the commit count update that will
1097 * determine that the subbuffer is full.
1098 */
a6352fd4 1099 cmm_smp_wmb();
852c2936 1100 v_add(config, config->cb.subbuffer_header_size(),
a6352fd4
MD
1101 &shmp(buf->commit_hot)[beginidx].cc);
1102 commit_count = v_read(config, &shmp(buf->commit_hot)[beginidx].cc);
852c2936
MD
1103 /* Check if the written buffer has to be delivered */
1104 lib_ring_buffer_check_deliver(config, buf, chan, offsets->begin,
1105 commit_count, beginidx);
1106 lib_ring_buffer_write_commit_counter(config, buf, chan, beginidx,
1107 offsets->begin, commit_count,
1108 config->cb.subbuffer_header_size());
1109}
1110
1111/*
1112 * lib_ring_buffer_switch_new_end: finish switching current subbuffer
1113 *
1114 * The only remaining threads could be the ones with pending commits. They will
1115 * have to do the deliver themselves.
1116 */
1117static
1118void lib_ring_buffer_switch_new_end(struct lib_ring_buffer *buf,
1119 struct channel *chan,
1120 struct switch_offsets *offsets,
1121 u64 tsc)
1122{
1123 const struct lib_ring_buffer_config *config = chan->backend.config;
1124 unsigned long endidx = subbuf_index(offsets->end - 1, chan);
1125 unsigned long commit_count, padding_size, data_size;
1126
1127 data_size = subbuf_offset(offsets->end - 1, chan) + 1;
1128 padding_size = chan->backend.subbuf_size - data_size;
1129 subbuffer_set_data_size(config, &buf->backend, endidx, data_size);
1130
1131 /*
1132 * Order all writes to buffer before the commit count update that will
1133 * determine that the subbuffer is full.
1134 */
a6352fd4
MD
1135 cmm_smp_wmb();
1136 v_add(config, padding_size, &shmp(buf->commit_hot)[endidx].cc);
1137 commit_count = v_read(config, &shmp(buf->commit_hot)[endidx].cc);
852c2936
MD
1138 lib_ring_buffer_check_deliver(config, buf, chan, offsets->end - 1,
1139 commit_count, endidx);
1140 lib_ring_buffer_write_commit_counter(config, buf, chan, endidx,
1141 offsets->end, commit_count,
1142 padding_size);
1143}
1144
1145/*
1146 * Returns :
1147 * 0 if ok
1148 * !0 if execution must be aborted.
1149 */
1150static
1151int lib_ring_buffer_try_switch_slow(enum switch_mode mode,
1152 struct lib_ring_buffer *buf,
1153 struct channel *chan,
1154 struct switch_offsets *offsets,
1155 u64 *tsc)
1156{
1157 const struct lib_ring_buffer_config *config = chan->backend.config;
1158 unsigned long off;
1159
1160 offsets->begin = v_read(config, &buf->offset);
1161 offsets->old = offsets->begin;
1162 offsets->switch_old_start = 0;
1163 off = subbuf_offset(offsets->begin, chan);
1164
1165 *tsc = config->cb.ring_buffer_clock_read(chan);
1166
1167 /*
1168 * Ensure we flush the header of an empty subbuffer when doing the
1169 * finalize (SWITCH_FLUSH). This ensures that we end up knowing the
1170 * total data gathering duration even if there were no records saved
1171 * after the last buffer switch.
1172 * In SWITCH_ACTIVE mode, switch the buffer when it contains events.
1173 * SWITCH_ACTIVE only flushes the current subbuffer, dealing with end of
1174 * subbuffer header as appropriate.
1175 * The next record that reserves space will be responsible for
1176 * populating the following subbuffer header. We choose not to populate
1177 * the next subbuffer header here because we want to be able to use
a6352fd4
MD
1178 * SWITCH_ACTIVE for periodical buffer flush, which must
1179 * guarantee that all the buffer content (records and header
1180 * timestamps) are visible to the reader. This is required for
1181 * quiescence guarantees for the fusion merge.
852c2936
MD
1182 */
1183 if (mode == SWITCH_FLUSH || off > 0) {
1184 if (unlikely(off == 0)) {
1185 /*
1186 * The client does not save any header information.
1187 * Don't switch empty subbuffer on finalize, because it
1188 * is invalid to deliver a completely empty subbuffer.
1189 */
1190 if (!config->cb.subbuffer_header_size())
1191 return -1;
1192 /*
1193 * Need to write the subbuffer start header on finalize.
1194 */
1195 offsets->switch_old_start = 1;
1196 }
1197 offsets->begin = subbuf_align(offsets->begin, chan);
1198 } else
1199 return -1; /* we do not have to switch : buffer is empty */
1200 /* Note: old points to the next subbuf at offset 0 */
1201 offsets->end = offsets->begin;
1202 return 0;
1203}
1204
1205/*
1206 * Force a sub-buffer switch. This operation is completely reentrant : can be
1207 * called while tracing is active with absolutely no lock held.
1208 *
1209 * Note, however, that as a v_cmpxchg is used for some atomic
1210 * operations, this function must be called from the CPU which owns the buffer
1211 * for a ACTIVE flush.
1212 */
1213void lib_ring_buffer_switch_slow(struct lib_ring_buffer *buf, enum switch_mode mode)
1214{
a6352fd4 1215 struct channel *chan = shmp(buf->backend.chan);
852c2936
MD
1216 const struct lib_ring_buffer_config *config = chan->backend.config;
1217 struct switch_offsets offsets;
1218 unsigned long oldidx;
1219 u64 tsc;
1220
1221 offsets.size = 0;
1222
1223 /*
1224 * Perform retryable operations.
1225 */
1226 do {
1227 if (lib_ring_buffer_try_switch_slow(mode, buf, chan, &offsets,
1228 &tsc))
1229 return; /* Switch not needed */
1230 } while (v_cmpxchg(config, &buf->offset, offsets.old, offsets.end)
1231 != offsets.old);
1232
1233 /*
1234 * Atomically update last_tsc. This update races against concurrent
1235 * atomic updates, but the race will always cause supplementary full TSC
1236 * records, never the opposite (missing a full TSC record when it would
1237 * be needed).
1238 */
1239 save_last_tsc(config, buf, tsc);
1240
1241 /*
1242 * Push the reader if necessary
1243 */
1244 lib_ring_buffer_reserve_push_reader(buf, chan, offsets.old);
1245
1246 oldidx = subbuf_index(offsets.old, chan);
1247 lib_ring_buffer_clear_noref(config, &buf->backend, oldidx);
1248
1249 /*
1250 * May need to populate header start on SWITCH_FLUSH.
1251 */
1252 if (offsets.switch_old_start) {
1253 lib_ring_buffer_switch_old_start(buf, chan, &offsets, tsc);
1254 offsets.old += config->cb.subbuffer_header_size();
1255 }
1256
1257 /*
1258 * Switch old subbuffer.
1259 */
1260 lib_ring_buffer_switch_old_end(buf, chan, &offsets, tsc);
1261}
852c2936
MD
1262
1263/*
1264 * Returns :
1265 * 0 if ok
1266 * -ENOSPC if event size is too large for packet.
1267 * -ENOBUFS if there is currently not enough space in buffer for the event.
1268 * -EIO if data cannot be written into the buffer for any other reason.
1269 */
1270static
1271int lib_ring_buffer_try_reserve_slow(struct lib_ring_buffer *buf,
1272 struct channel *chan,
1273 struct switch_offsets *offsets,
1274 struct lib_ring_buffer_ctx *ctx)
1275{
1276 const struct lib_ring_buffer_config *config = chan->backend.config;
1277 unsigned long reserve_commit_diff;
1278
1279 offsets->begin = v_read(config, &buf->offset);
1280 offsets->old = offsets->begin;
1281 offsets->switch_new_start = 0;
1282 offsets->switch_new_end = 0;
1283 offsets->switch_old_end = 0;
1284 offsets->pre_header_padding = 0;
1285
1286 ctx->tsc = config->cb.ring_buffer_clock_read(chan);
1287 if ((int64_t) ctx->tsc == -EIO)
1288 return -EIO;
1289
1290 if (last_tsc_overflow(config, buf, ctx->tsc))
1291 ctx->rflags |= RING_BUFFER_RFLAG_FULL_TSC;
1292
1293 if (unlikely(subbuf_offset(offsets->begin, ctx->chan) == 0)) {
1294 offsets->switch_new_start = 1; /* For offsets->begin */
1295 } else {
1296 offsets->size = config->cb.record_header_size(config, chan,
1297 offsets->begin,
1298 &offsets->pre_header_padding,
1299 ctx);
1300 offsets->size +=
1301 lib_ring_buffer_align(offsets->begin + offsets->size,
1302 ctx->largest_align)
1303 + ctx->data_size;
1304 if (unlikely(subbuf_offset(offsets->begin, chan) +
1305 offsets->size > chan->backend.subbuf_size)) {
1306 offsets->switch_old_end = 1; /* For offsets->old */
1307 offsets->switch_new_start = 1; /* For offsets->begin */
1308 }
1309 }
1310 if (unlikely(offsets->switch_new_start)) {
1311 unsigned long sb_index;
1312
1313 /*
1314 * We are typically not filling the previous buffer completely.
1315 */
1316 if (likely(offsets->switch_old_end))
1317 offsets->begin = subbuf_align(offsets->begin, chan);
1318 offsets->begin = offsets->begin
1319 + config->cb.subbuffer_header_size();
1320 /* Test new buffer integrity */
1321 sb_index = subbuf_index(offsets->begin, chan);
1322 reserve_commit_diff =
1323 (buf_trunc(offsets->begin, chan)
1324 >> chan->backend.num_subbuf_order)
1325 - ((unsigned long) v_read(config,
a6352fd4 1326 &shmp(buf->commit_cold)[sb_index].cc_sb)
852c2936
MD
1327 & chan->commit_count_mask);
1328 if (likely(reserve_commit_diff == 0)) {
1329 /* Next subbuffer not being written to. */
1330 if (unlikely(config->mode != RING_BUFFER_OVERWRITE &&
1331 subbuf_trunc(offsets->begin, chan)
1332 - subbuf_trunc((unsigned long)
a6352fd4 1333 uatomic_read(&buf->consumed), chan)
852c2936
MD
1334 >= chan->backend.buf_size)) {
1335 /*
1336 * We do not overwrite non consumed buffers
1337 * and we are full : record is lost.
1338 */
1339 v_inc(config, &buf->records_lost_full);
1340 return -ENOBUFS;
1341 } else {
1342 /*
1343 * Next subbuffer not being written to, and we
1344 * are either in overwrite mode or the buffer is
1345 * not full. It's safe to write in this new
1346 * subbuffer.
1347 */
1348 }
1349 } else {
1350 /*
1351 * Next subbuffer reserve offset does not match the
1352 * commit offset. Drop record in producer-consumer and
1353 * overwrite mode. Caused by either a writer OOPS or too
1354 * many nested writes over a reserve/commit pair.
1355 */
1356 v_inc(config, &buf->records_lost_wrap);
1357 return -EIO;
1358 }
1359 offsets->size =
1360 config->cb.record_header_size(config, chan,
1361 offsets->begin,
1362 &offsets->pre_header_padding,
1363 ctx);
1364 offsets->size +=
1365 lib_ring_buffer_align(offsets->begin + offsets->size,
1366 ctx->largest_align)
1367 + ctx->data_size;
1368 if (unlikely(subbuf_offset(offsets->begin, chan)
1369 + offsets->size > chan->backend.subbuf_size)) {
1370 /*
1371 * Record too big for subbuffers, report error, don't
1372 * complete the sub-buffer switch.
1373 */
1374 v_inc(config, &buf->records_lost_big);
1375 return -ENOSPC;
1376 } else {
1377 /*
1378 * We just made a successful buffer switch and the
1379 * record fits in the new subbuffer. Let's write.
1380 */
1381 }
1382 } else {
1383 /*
1384 * Record fits in the current buffer and we are not on a switch
1385 * boundary. It's safe to write.
1386 */
1387 }
1388 offsets->end = offsets->begin + offsets->size;
1389
1390 if (unlikely(subbuf_offset(offsets->end, chan) == 0)) {
1391 /*
1392 * The offset_end will fall at the very beginning of the next
1393 * subbuffer.
1394 */
1395 offsets->switch_new_end = 1; /* For offsets->begin */
1396 }
1397 return 0;
1398}
1399
1400/**
1401 * lib_ring_buffer_reserve_slow - Atomic slot reservation in a buffer.
1402 * @ctx: ring buffer context.
1403 *
1404 * Return : -NOBUFS if not enough space, -ENOSPC if event size too large,
1405 * -EIO for other errors, else returns 0.
1406 * It will take care of sub-buffer switching.
1407 */
1408int lib_ring_buffer_reserve_slow(struct lib_ring_buffer_ctx *ctx)
1409{
1410 struct channel *chan = ctx->chan;
1411 const struct lib_ring_buffer_config *config = chan->backend.config;
1412 struct lib_ring_buffer *buf;
1413 struct switch_offsets offsets;
1414 int ret;
1415
1416 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
a6352fd4 1417 buf = &shmp(chan->backend.buf)[ctx->cpu];
852c2936 1418 else
a6352fd4 1419 buf = shmp(chan->backend.buf);
852c2936
MD
1420 ctx->buf = buf;
1421
1422 offsets.size = 0;
1423
1424 do {
1425 ret = lib_ring_buffer_try_reserve_slow(buf, chan, &offsets,
1426 ctx);
1427 if (unlikely(ret))
1428 return ret;
1429 } while (unlikely(v_cmpxchg(config, &buf->offset, offsets.old,
1430 offsets.end)
1431 != offsets.old));
1432
1433 /*
1434 * Atomically update last_tsc. This update races against concurrent
1435 * atomic updates, but the race will always cause supplementary full TSC
1436 * records, never the opposite (missing a full TSC record when it would
1437 * be needed).
1438 */
1439 save_last_tsc(config, buf, ctx->tsc);
1440
1441 /*
1442 * Push the reader if necessary
1443 */
1444 lib_ring_buffer_reserve_push_reader(buf, chan, offsets.end - 1);
1445
1446 /*
1447 * Clear noref flag for this subbuffer.
1448 */
1449 lib_ring_buffer_clear_noref(config, &buf->backend,
1450 subbuf_index(offsets.end - 1, chan));
1451
1452 /*
1453 * Switch old subbuffer if needed.
1454 */
1455 if (unlikely(offsets.switch_old_end)) {
1456 lib_ring_buffer_clear_noref(config, &buf->backend,
1457 subbuf_index(offsets.old - 1, chan));
1458 lib_ring_buffer_switch_old_end(buf, chan, &offsets, ctx->tsc);
1459 }
1460
1461 /*
1462 * Populate new subbuffer.
1463 */
1464 if (unlikely(offsets.switch_new_start))
1465 lib_ring_buffer_switch_new_start(buf, chan, &offsets, ctx->tsc);
1466
1467 if (unlikely(offsets.switch_new_end))
1468 lib_ring_buffer_switch_new_end(buf, chan, &offsets, ctx->tsc);
1469
1470 ctx->slot_size = offsets.size;
1471 ctx->pre_offset = offsets.begin;
1472 ctx->buf_offset = offsets.begin + offsets.pre_header_padding;
1473 return 0;
1474}
This page took 0.082589 seconds and 4 git commands to generate.