Use shm handle, fix allocation space, take care of alignment
[lttng-ust.git] / libringbuffer / ring_buffer_frontend.c
CommitLineData
852c2936
MD
1/*
2 * ring_buffer_frontend.c
3 *
4 * (C) Copyright 2005-2010 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
5 *
6 * Ring buffer wait-free buffer synchronization. Producer-consumer and flight
7 * recorder (overwrite) modes. See thesis:
8 *
9 * Desnoyers, Mathieu (2009), "Low-Impact Operating System Tracing", Ph.D.
10 * dissertation, Ecole Polytechnique de Montreal.
11 * http://www.lttng.org/pub/thesis/desnoyers-dissertation-2009-12.pdf
12 *
13 * - Algorithm presentation in Chapter 5:
14 * "Lockless Multi-Core High-Throughput Buffering".
15 * - Algorithm formal verification in Section 8.6:
16 * "Formal verification of LTTng"
17 *
18 * Author:
19 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
20 *
21 * Inspired from LTT and RelayFS:
22 * Karim Yaghmour <karim@opersys.com>
23 * Tom Zanussi <zanussi@us.ibm.com>
24 * Bob Wisniewski <bob@watson.ibm.com>
25 * And from K42 :
26 * Bob Wisniewski <bob@watson.ibm.com>
27 *
28 * Buffer reader semantic :
29 *
30 * - get_subbuf_size
31 * while buffer is not finalized and empty
32 * - get_subbuf
33 * - if return value != 0, continue
34 * - splice one subbuffer worth of data to a pipe
35 * - splice the data from pipe to disk/network
36 * - put_subbuf
37 *
38 * Dual LGPL v2.1/GPL v2 license.
39 */
40
a6352fd4 41#include <sys/types.h>
431d5cf0
MD
42#include <sys/mman.h>
43#include <sys/stat.h>
44#include <fcntl.h>
14641deb 45#include <urcu/compiler.h>
a6352fd4 46#include <urcu/ref.h>
14641deb 47
a6352fd4 48#include "smp.h"
4931a13e
MD
49#include "config.h"
50#include "backend.h"
51#include "frontend.h"
a6352fd4 52#include "shm.h"
852c2936 53
431d5cf0
MD
54#ifndef max
55#define max(a, b) ((a) > (b) ? (a) : (b))
56#endif
57
852c2936
MD
58/*
59 * Internal structure representing offsets to use at a sub-buffer switch.
60 */
61struct switch_offsets {
62 unsigned long begin, end, old;
63 size_t pre_header_padding, size;
64 unsigned int switch_new_start:1, switch_new_end:1, switch_old_start:1,
65 switch_old_end:1;
66};
67
a6352fd4 68__thread unsigned int lib_ring_buffer_nesting;
852c2936
MD
69
70static
71void lib_ring_buffer_print_errors(struct channel *chan,
72 struct lib_ring_buffer *buf, int cpu);
73
74/*
75 * Must be called under cpu hotplug protection.
76 */
77void lib_ring_buffer_free(struct lib_ring_buffer *buf)
78{
a6352fd4 79 struct channel *chan = shmp(buf->backend.chan);
852c2936
MD
80
81 lib_ring_buffer_print_errors(chan, buf, buf->backend.cpu);
431d5cf0
MD
82 /* buf->commit_hot will be freed by shm teardown */
83 /* buf->commit_cold will be freed by shm teardown */
852c2936
MD
84
85 lib_ring_buffer_backend_free(&buf->backend);
86}
87
88/**
89 * lib_ring_buffer_reset - Reset ring buffer to initial values.
90 * @buf: Ring buffer.
91 *
92 * Effectively empty the ring buffer. Should be called when the buffer is not
93 * used for writing. The ring buffer can be opened for reading, but the reader
94 * should not be using the iterator concurrently with reset. The previous
95 * current iterator record is reset.
96 */
97void lib_ring_buffer_reset(struct lib_ring_buffer *buf)
98{
a6352fd4 99 struct channel *chan = shmp(buf->backend.chan);
852c2936
MD
100 const struct lib_ring_buffer_config *config = chan->backend.config;
101 unsigned int i;
102
103 /*
104 * Reset iterator first. It will put the subbuffer if it currently holds
105 * it.
106 */
852c2936
MD
107 v_set(config, &buf->offset, 0);
108 for (i = 0; i < chan->backend.num_subbuf; i++) {
a6352fd4
MD
109 v_set(config, &shmp(buf->commit_hot)[i].cc, 0);
110 v_set(config, &shmp(buf->commit_hot)[i].seq, 0);
111 v_set(config, &shmp(buf->commit_cold)[i].cc_sb, 0);
852c2936 112 }
a6352fd4
MD
113 uatomic_set(&buf->consumed, 0);
114 uatomic_set(&buf->record_disabled, 0);
852c2936
MD
115 v_set(config, &buf->last_tsc, 0);
116 lib_ring_buffer_backend_reset(&buf->backend);
117 /* Don't reset number of active readers */
118 v_set(config, &buf->records_lost_full, 0);
119 v_set(config, &buf->records_lost_wrap, 0);
120 v_set(config, &buf->records_lost_big, 0);
121 v_set(config, &buf->records_count, 0);
122 v_set(config, &buf->records_overrun, 0);
123 buf->finalized = 0;
124}
852c2936
MD
125
126/**
127 * channel_reset - Reset channel to initial values.
128 * @chan: Channel.
129 *
130 * Effectively empty the channel. Should be called when the channel is not used
131 * for writing. The channel can be opened for reading, but the reader should not
132 * be using the iterator concurrently with reset. The previous current iterator
133 * record is reset.
134 */
135void channel_reset(struct channel *chan)
136{
137 /*
138 * Reset iterators first. Will put the subbuffer if held for reading.
139 */
a6352fd4 140 uatomic_set(&chan->record_disabled, 0);
852c2936
MD
141 /* Don't reset commit_count_mask, still valid */
142 channel_backend_reset(&chan->backend);
143 /* Don't reset switch/read timer interval */
144 /* Don't reset notifiers and notifier enable bits */
145 /* Don't reset reader reference count */
146}
852c2936
MD
147
148/*
149 * Must be called under cpu hotplug protection.
150 */
151int lib_ring_buffer_create(struct lib_ring_buffer *buf,
a6352fd4
MD
152 struct channel_backend *chanb, int cpu,
153 struct shm_header *shm_header)
852c2936
MD
154{
155 const struct lib_ring_buffer_config *config = chanb->config;
14641deb 156 struct channel *chan = caa_container_of(chanb, struct channel, backend);
852c2936
MD
157 void *priv = chanb->priv;
158 unsigned int num_subbuf;
159 size_t subbuf_header_size;
160 u64 tsc;
161 int ret;
162
163 /* Test for cpu hotplug */
164 if (buf->backend.allocated)
165 return 0;
166
a6352fd4
MD
167 ret = lib_ring_buffer_backend_create(&buf->backend, &chan->backend,
168 cpu, shm_header);
852c2936
MD
169 if (ret)
170 return ret;
171
431d5cf0
MD
172 align_shm(shm_header,
173 max(__alignof__(struct commit_counters_hot),
174 __alignof__(struct commit_counters_cold)));
a6352fd4
MD
175 set_shmp(&buf->commit_hot,
176 zalloc_shm(shm_header,
177 sizeof(*buf->commit_hot) * chan->backend.num_subbuf));
178 if (!shmp(buf->commit_hot)) {
852c2936
MD
179 ret = -ENOMEM;
180 goto free_chanbuf;
181 }
182
431d5cf0 183 align_shm(shm_header, __alignof__(struct commit_counters_cold));
a6352fd4
MD
184 set_shmp(&buf->commit_cold,
185 zalloc_shm(shm_header,
186 sizeof(*buf->commit_cold) * chan->backend.num_subbuf));
187 if (!shmp(buf->commit_cold)) {
852c2936
MD
188 ret = -ENOMEM;
189 goto free_commit;
190 }
191
192 num_subbuf = chan->backend.num_subbuf;
a6352fd4 193 //init_waitqueue_head(&buf->read_wait);
852c2936
MD
194
195 /*
196 * Write the subbuffer header for first subbuffer so we know the total
197 * duration of data gathering.
198 */
199 subbuf_header_size = config->cb.subbuffer_header_size();
200 v_set(config, &buf->offset, subbuf_header_size);
a6352fd4
MD
201 subbuffer_id_clear_noref(config, &shmp(buf->backend.buf_wsb)[0].id);
202 tsc = config->cb.ring_buffer_clock_read(shmp(buf->backend.chan));
852c2936 203 config->cb.buffer_begin(buf, tsc, 0);
a6352fd4 204 v_add(config, subbuf_header_size, &shmp(buf->commit_hot)[0].cc);
852c2936
MD
205
206 if (config->cb.buffer_create) {
207 ret = config->cb.buffer_create(buf, priv, cpu, chanb->name);
208 if (ret)
209 goto free_init;
210 }
852c2936 211 buf->backend.allocated = 1;
852c2936
MD
212 return 0;
213
214 /* Error handling */
215free_init:
a6352fd4 216 /* commit_cold will be freed by shm teardown */
852c2936 217free_commit:
a6352fd4 218 /* commit_hot will be freed by shm teardown */
852c2936
MD
219free_chanbuf:
220 lib_ring_buffer_backend_free(&buf->backend);
221 return ret;
222}
223
224static void switch_buffer_timer(unsigned long data)
225{
226 struct lib_ring_buffer *buf = (struct lib_ring_buffer *)data;
a6352fd4 227 struct channel *chan = shmp(buf->backend.chan);
852c2936
MD
228 const struct lib_ring_buffer_config *config = chan->backend.config;
229
230 /*
231 * Only flush buffers periodically if readers are active.
232 */
a6352fd4 233 if (uatomic_read(&buf->active_readers))
852c2936
MD
234 lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
235
a6352fd4
MD
236 //TODO timers
237 //if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
238 // mod_timer_pinned(&buf->switch_timer,
239 // jiffies + chan->switch_timer_interval);
240 //else
241 // mod_timer(&buf->switch_timer,
242 // jiffies + chan->switch_timer_interval);
852c2936
MD
243}
244
852c2936
MD
245static void lib_ring_buffer_start_switch_timer(struct lib_ring_buffer *buf)
246{
a6352fd4 247 struct channel *chan = shmp(buf->backend.chan);
852c2936
MD
248 const struct lib_ring_buffer_config *config = chan->backend.config;
249
250 if (!chan->switch_timer_interval || buf->switch_timer_enabled)
251 return;
a6352fd4
MD
252 //TODO
253 //init_timer(&buf->switch_timer);
254 //buf->switch_timer.function = switch_buffer_timer;
255 //buf->switch_timer.expires = jiffies + chan->switch_timer_interval;
256 //buf->switch_timer.data = (unsigned long)buf;
257 //if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
258 // add_timer_on(&buf->switch_timer, buf->backend.cpu);
259 //else
260 // add_timer(&buf->switch_timer);
852c2936
MD
261 buf->switch_timer_enabled = 1;
262}
263
852c2936
MD
264static void lib_ring_buffer_stop_switch_timer(struct lib_ring_buffer *buf)
265{
a6352fd4 266 struct channel *chan = shmp(buf->backend.chan);
852c2936
MD
267
268 if (!chan->switch_timer_interval || !buf->switch_timer_enabled)
269 return;
270
a6352fd4
MD
271 //TODO
272 //del_timer_sync(&buf->switch_timer);
852c2936
MD
273 buf->switch_timer_enabled = 0;
274}
275
276/*
277 * Polling timer to check the channels for data.
278 */
279static void read_buffer_timer(unsigned long data)
280{
281 struct lib_ring_buffer *buf = (struct lib_ring_buffer *)data;
a6352fd4 282 struct channel *chan = shmp(buf->backend.chan);
852c2936
MD
283 const struct lib_ring_buffer_config *config = chan->backend.config;
284
285 CHAN_WARN_ON(chan, !buf->backend.allocated);
286
a6352fd4 287 if (uatomic_read(&buf->active_readers)
852c2936 288 && lib_ring_buffer_poll_deliver(config, buf, chan)) {
a6352fd4
MD
289 //TODO
290 //wake_up_interruptible(&buf->read_wait);
291 //wake_up_interruptible(&chan->read_wait);
852c2936
MD
292 }
293
a6352fd4
MD
294 //TODO
295 //if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
296 // mod_timer_pinned(&buf->read_timer,
297 // jiffies + chan->read_timer_interval);
298 //else
299 // mod_timer(&buf->read_timer,
300 // jiffies + chan->read_timer_interval);
852c2936
MD
301}
302
852c2936
MD
303static void lib_ring_buffer_start_read_timer(struct lib_ring_buffer *buf)
304{
a6352fd4 305 struct channel *chan = shmp(buf->backend.chan);
852c2936
MD
306 const struct lib_ring_buffer_config *config = chan->backend.config;
307
308 if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER
309 || !chan->read_timer_interval
310 || buf->read_timer_enabled)
311 return;
312
a6352fd4
MD
313 //TODO
314 //init_timer(&buf->read_timer);
315 //buf->read_timer.function = read_buffer_timer;
316 //buf->read_timer.expires = jiffies + chan->read_timer_interval;
317 //buf->read_timer.data = (unsigned long)buf;
852c2936 318
a6352fd4
MD
319 //if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
320 // add_timer_on(&buf->read_timer, buf->backend.cpu);
321 //else
322 // add_timer(&buf->read_timer);
852c2936
MD
323 buf->read_timer_enabled = 1;
324}
325
852c2936
MD
326static void lib_ring_buffer_stop_read_timer(struct lib_ring_buffer *buf)
327{
a6352fd4 328 struct channel *chan = shmp(buf->backend.chan);
852c2936
MD
329 const struct lib_ring_buffer_config *config = chan->backend.config;
330
331 if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER
332 || !chan->read_timer_interval
333 || !buf->read_timer_enabled)
334 return;
335
a6352fd4
MD
336 //TODO
337 //del_timer_sync(&buf->read_timer);
852c2936
MD
338 /*
339 * do one more check to catch data that has been written in the last
340 * timer period.
341 */
342 if (lib_ring_buffer_poll_deliver(config, buf, chan)) {
a6352fd4
MD
343 //TODO
344 //wake_up_interruptible(&buf->read_wait);
345 //wake_up_interruptible(&chan->read_wait);
852c2936
MD
346 }
347 buf->read_timer_enabled = 0;
348}
349
852c2936
MD
350static void channel_unregister_notifiers(struct channel *chan)
351{
352 const struct lib_ring_buffer_config *config = chan->backend.config;
353 int cpu;
354
852c2936 355 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
852c2936 356 for_each_possible_cpu(cpu) {
a6352fd4
MD
357 struct lib_ring_buffer *buf = &shmp(chan->backend.buf)[cpu];
358
852c2936
MD
359 lib_ring_buffer_stop_switch_timer(buf);
360 lib_ring_buffer_stop_read_timer(buf);
361 }
852c2936 362 } else {
a6352fd4 363 struct lib_ring_buffer *buf = shmp(chan->backend.buf);
852c2936
MD
364
365 lib_ring_buffer_stop_switch_timer(buf);
366 lib_ring_buffer_stop_read_timer(buf);
367 }
368 channel_backend_unregister_notifiers(&chan->backend);
369}
370
431d5cf0 371static void channel_free(struct shm_handle *handle)
852c2936 372{
431d5cf0
MD
373 struct shm_header *header = handle->header;
374 struct channel *chan = shmp(header->chan);
375 int ret;
376
852c2936 377 channel_backend_free(&chan->backend);
431d5cf0
MD
378 /* chan is freed by shm teardown */
379 ret = munmap(header, header->shm_size);
380 if (ret) {
381 PERROR("umnmap");
382 assert(0);
383 }
384 ret = close(handle->shmfd);
385 if (ret) {
386 PERROR("close");
387 assert(0);
388 }
852c2936
MD
389}
390
391/**
392 * channel_create - Create channel.
393 * @config: ring buffer instance configuration
394 * @name: name of the channel
395 * @priv: ring buffer client private data
396 * @buf_addr: pointer the the beginning of the preallocated buffer contiguous
397 * address mapping. It is used only by RING_BUFFER_STATIC
398 * configuration. It can be set to NULL for other backends.
399 * @subbuf_size: subbuffer size
400 * @num_subbuf: number of subbuffers
401 * @switch_timer_interval: Time interval (in us) to fill sub-buffers with
402 * padding to let readers get those sub-buffers.
403 * Used for live streaming.
404 * @read_timer_interval: Time interval (in us) to wake up pending readers.
431d5cf0
MD
405 * @shmfd: shared memory file descriptor (output, needs to be closed by
406 * the caller)
852c2936
MD
407 *
408 * Holds cpu hotplug.
409 * Returns NULL on failure.
410 */
431d5cf0 411struct shm_handle *channel_create(const struct lib_ring_buffer_config *config,
852c2936
MD
412 const char *name, void *priv, void *buf_addr,
413 size_t subbuf_size,
414 size_t num_subbuf, unsigned int switch_timer_interval,
431d5cf0 415 unsigned int read_timer_interval)
852c2936 416{
431d5cf0 417 int ret, cpu, shmfd;
852c2936 418 struct channel *chan;
431d5cf0 419 size_t shmsize, bufshmsize, bufshmalign;
a6352fd4
MD
420 struct shm_header *shm_header;
421 unsigned long num_subbuf_alloc;
431d5cf0 422 struct shm_handle *handle;
852c2936
MD
423
424 if (lib_ring_buffer_check_config(config, switch_timer_interval,
425 read_timer_interval))
426 return NULL;
427
431d5cf0
MD
428 handle = zmalloc(sizeof(struct shm_handle));
429 if (!handle)
430 return NULL;
431
a6352fd4
MD
432 /* Calculate the shm allocation layout */
433 shmsize = sizeof(struct shm_header);
431d5cf0 434 shmsize += offset_align(shmsize, __alignof__(struct channel));
a6352fd4
MD
435 shmsize += sizeof(struct channel);
436
437 /* Per-cpu buffer size: control (prior to backend) */
431d5cf0 438 shmsize += offset_align(shmsize, __alignof__(struct lib_ring_buffer));
a6352fd4
MD
439 bufshmsize = sizeof(struct lib_ring_buffer);
440 shmsize += bufshmsize * num_possible_cpus();
441
442 /* Per-cpu buffer size: backend */
431d5cf0 443 shmsize += offset_align(shmsize, PAGE_SIZE);
a6352fd4
MD
444 /* num_subbuf + 1 is the worse case */
445 num_subbuf_alloc = num_subbuf + 1;
446 bufshmsize = sizeof(struct lib_ring_buffer_backend_pages *) * num_subbuf_alloc;
431d5cf0
MD
447 bufshmsize += offset_align(bufshmsize, PAGE_SIZE);
448 bufshmsize += subbuf_size * num_subbuf_alloc;
449 bufshmsize += offset_align(bufshmsize, __alignof__(struct lib_ring_buffer_backend_pages));
450 bufshmsize += sizeof(struct lib_ring_buffer_backend_pages) * num_subbuf_alloc;
451 bufshmsize += offset_align(bufshmsize, __alignof__(struct lib_ring_buffer_backend_subbuffer));
a6352fd4 452 bufshmsize += sizeof(struct lib_ring_buffer_backend_subbuffer) * num_subbuf;
431d5cf0 453 bufshmsize += offset_align(bufshmsize, PAGE_SIZE);
a6352fd4
MD
454 shmsize += bufshmsize * num_possible_cpus();
455
456 /* Per-cpu buffer size: control (after backend) */
431d5cf0
MD
457 shmsize += offset_align(shmsize,
458 max(__alignof__(struct commit_counters_hot),
459 __alignof__(struct commit_counters_cold)));
460 bufshmsize = sizeof(struct commit_counters_hot) * num_subbuf;
461 bufshmsize += offset_align(bufshmsize, __alignof__(struct commit_counters_cold));
a6352fd4 462 bufshmsize += sizeof(struct commit_counters_cold) * num_subbuf;
431d5cf0 463 shmsize += bufshmsize * num_possible_cpus();
a6352fd4 464
431d5cf0
MD
465 /*
466 * Allocate shm, and immediately unlink its shm oject, keeping
467 * only the file descriptor as a reference to the object. If it
468 * already exists (caused by short race window during which the
469 * global object exists in a concurrent shm_open), simply retry.
470 */
471 do {
472 shmfd = shm_open("/ust-shm-tmp",
473 O_CREAT | O_EXCL | O_RDWR, 0700);
474 } while (shmfd < 0 && errno == EEXIST);
475 if (shmfd < 0) {
476 PERROR("shm_open");
477 goto error_shm_open;
a6352fd4 478 }
431d5cf0
MD
479 ret = shm_unlink("/ust-shm-tmp");
480 if (ret) {
481 PERROR("shm_unlink");
482 goto error_unlink;
483 }
484 ret = ftruncate(shmfd, shmsize);
485 if (ret) {
486 PERROR("ftruncate");
487 goto error_ftruncate;
a6352fd4 488 }
852c2936 489
431d5cf0
MD
490 shm_header = mmap(NULL, shmsize, PROT_READ | PROT_WRITE,
491 MAP_SHARED, shmfd, 0);
492 if (shm_header == MAP_FAILED) {
493 PERROR("mmap");
494 goto error_mmap;
a6352fd4
MD
495 }
496
497 shm_header->magic = SHM_MAGIC;
498 shm_header->major = SHM_MAJOR;
499 shm_header->major = SHM_MINOR;
500 shm_header->bits_per_long = CAA_BITS_PER_LONG;
501 shm_header->shm_size = shmsize;
502 shm_header->shm_allocated = sizeof(struct shm_header);
503
431d5cf0 504 align_shm(shm_header, __alignof__(struct channel));
a6352fd4
MD
505 chan = zalloc_shm(shm_header, sizeof(struct channel));
506 if (!chan)
507 goto destroy_shmem;
508 set_shmp(shm_header->chan, chan);
509
510 ret = channel_backend_init(&chan->backend, name, config, priv,
511 subbuf_size, num_subbuf, shm_header);
852c2936 512 if (ret)
a6352fd4 513 goto destroy_shmem;
852c2936
MD
514
515 chan->commit_count_mask = (~0UL >> chan->backend.num_subbuf_order);
a6352fd4
MD
516 //TODO
517 //chan->switch_timer_interval = usecs_to_jiffies(switch_timer_interval);
518 //chan->read_timer_interval = usecs_to_jiffies(read_timer_interval);
a6352fd4
MD
519 //TODO
520 //init_waitqueue_head(&chan->read_wait);
521 //init_waitqueue_head(&chan->hp_wait);
852c2936
MD
522
523 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
852c2936
MD
524 /*
525 * In case of non-hotplug cpu, if the ring-buffer is allocated
526 * in early initcall, it will not be notified of secondary cpus.
527 * In that off case, we need to allocate for all possible cpus.
528 */
852c2936 529 for_each_possible_cpu(cpu) {
a6352fd4 530 struct lib_ring_buffer *buf = &shmp(chan->backend.buf)[cpu];
852c2936
MD
531 lib_ring_buffer_start_switch_timer(buf);
532 lib_ring_buffer_start_read_timer(buf);
852c2936 533 }
852c2936 534 } else {
a6352fd4 535 struct lib_ring_buffer *buf = shmp(chan->backend.buf);
852c2936
MD
536
537 lib_ring_buffer_start_switch_timer(buf);
538 lib_ring_buffer_start_read_timer(buf);
539 }
540
431d5cf0
MD
541 handle->header = shm_header;
542 handle->shmfd = shmfd;
543 return handle;
852c2936 544
a6352fd4 545destroy_shmem:
431d5cf0
MD
546 ret = munmap(shm_header, shmsize);
547 if (ret) {
548 PERROR("umnmap");
549 assert(0);
a6352fd4 550 }
431d5cf0
MD
551error_mmap:
552error_ftruncate:
553error_unlink:
554 ret = close(shmfd);
555 if (ret) {
556 PERROR("close");
557 assert(0);
558 }
559error_shm_open:
560 free(handle);
852c2936
MD
561 return NULL;
562}
852c2936
MD
563
564static
431d5cf0 565void channel_release(struct shm_handle *handle)
852c2936 566{
431d5cf0 567 channel_free(handle);
852c2936
MD
568}
569
570/**
571 * channel_destroy - Finalize, wait for q.s. and destroy channel.
572 * @chan: channel to destroy
573 *
574 * Holds cpu hotplug.
431d5cf0
MD
575 * Call "destroy" callback, finalize channels, decrement the channel
576 * reference count. Note that when readers have completed data
577 * consumption of finalized channels, get_subbuf() will return -ENODATA.
578 * They should release their handle at that point. Returns the private
579 * data pointer.
852c2936 580 */
431d5cf0 581void *channel_destroy(struct shm_handle *handle)
852c2936 582{
431d5cf0
MD
583 struct shm_header *header = handle->header;
584 struct channel *chan = shmp(header->chan);
852c2936
MD
585 const struct lib_ring_buffer_config *config = chan->backend.config;
586 void *priv;
431d5cf0 587 int cpu;
852c2936
MD
588
589 channel_unregister_notifiers(chan);
590
591 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
852c2936 592 for_each_channel_cpu(cpu, chan) {
a6352fd4 593 struct lib_ring_buffer *buf = &shmp(chan->backend.buf)[cpu];
852c2936
MD
594
595 if (config->cb.buffer_finalize)
596 config->cb.buffer_finalize(buf,
597 chan->backend.priv,
598 cpu);
599 if (buf->backend.allocated)
600 lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH);
601 /*
602 * Perform flush before writing to finalized.
603 */
a6352fd4 604 cmm_smp_wmb();
14641deb 605 CMM_ACCESS_ONCE(buf->finalized) = 1;
a6352fd4 606 //wake_up_interruptible(&buf->read_wait);
852c2936
MD
607 }
608 } else {
a6352fd4 609 struct lib_ring_buffer *buf = shmp(chan->backend.buf);
852c2936
MD
610
611 if (config->cb.buffer_finalize)
612 config->cb.buffer_finalize(buf, chan->backend.priv, -1);
613 if (buf->backend.allocated)
614 lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH);
615 /*
616 * Perform flush before writing to finalized.
617 */
a6352fd4 618 cmm_smp_wmb();
14641deb 619 CMM_ACCESS_ONCE(buf->finalized) = 1;
a6352fd4 620 //wake_up_interruptible(&buf->read_wait);
852c2936 621 }
14641deb 622 CMM_ACCESS_ONCE(chan->finalized) = 1;
a6352fd4
MD
623 //wake_up_interruptible(&chan->hp_wait);
624 //wake_up_interruptible(&chan->read_wait);
431d5cf0
MD
625 /*
626 * sessiond/consumer are keeping a reference on the shm file
627 * descriptor directly. No need to refcount.
628 */
629 channel_release(handle);
852c2936
MD
630 priv = chan->backend.priv;
631 return priv;
632}
852c2936
MD
633
634struct lib_ring_buffer *channel_get_ring_buffer(
635 const struct lib_ring_buffer_config *config,
636 struct channel *chan, int cpu)
637{
638 if (config->alloc == RING_BUFFER_ALLOC_GLOBAL)
a6352fd4 639 return shmp(chan->backend.buf);
852c2936 640 else
a6352fd4 641 return &shmp(chan->backend.buf)[cpu];
852c2936 642}
852c2936
MD
643
644int lib_ring_buffer_open_read(struct lib_ring_buffer *buf)
645{
a6352fd4 646 struct channel *chan = shmp(buf->backend.chan);
852c2936 647
a6352fd4 648 if (uatomic_cmpxchg(&buf->active_readers, 0, 1) != 0)
852c2936 649 return -EBUSY;
a6352fd4 650 cmm_smp_mb();
852c2936
MD
651 return 0;
652}
852c2936
MD
653
654void lib_ring_buffer_release_read(struct lib_ring_buffer *buf)
655{
a6352fd4 656 struct channel *chan = shmp(buf->backend.chan);
852c2936 657
a6352fd4
MD
658 CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1);
659 cmm_smp_mb();
660 uatomic_dec(&buf->active_readers);
852c2936
MD
661}
662
663/**
664 * lib_ring_buffer_snapshot - save subbuffer position snapshot (for read)
665 * @buf: ring buffer
666 * @consumed: consumed count indicating the position where to read
667 * @produced: produced count, indicates position when to stop reading
668 *
669 * Returns -ENODATA if buffer is finalized, -EAGAIN if there is currently no
670 * data to read at consumed position, or 0 if the get operation succeeds.
852c2936
MD
671 */
672
673int lib_ring_buffer_snapshot(struct lib_ring_buffer *buf,
674 unsigned long *consumed, unsigned long *produced)
675{
a6352fd4 676 struct channel *chan = shmp(buf->backend.chan);
852c2936
MD
677 const struct lib_ring_buffer_config *config = chan->backend.config;
678 unsigned long consumed_cur, write_offset;
679 int finalized;
680
14641deb 681 finalized = CMM_ACCESS_ONCE(buf->finalized);
852c2936
MD
682 /*
683 * Read finalized before counters.
684 */
a6352fd4
MD
685 cmm_smp_rmb();
686 consumed_cur = uatomic_read(&buf->consumed);
852c2936
MD
687 /*
688 * No need to issue a memory barrier between consumed count read and
689 * write offset read, because consumed count can only change
690 * concurrently in overwrite mode, and we keep a sequence counter
691 * identifier derived from the write offset to check we are getting
692 * the same sub-buffer we are expecting (the sub-buffers are atomically
693 * "tagged" upon writes, tags are checked upon read).
694 */
695 write_offset = v_read(config, &buf->offset);
696
697 /*
698 * Check that we are not about to read the same subbuffer in
699 * which the writer head is.
700 */
701 if (subbuf_trunc(write_offset, chan) - subbuf_trunc(consumed_cur, chan)
702 == 0)
703 goto nodata;
704
705 *consumed = consumed_cur;
706 *produced = subbuf_trunc(write_offset, chan);
707
708 return 0;
709
710nodata:
711 /*
712 * The memory barriers __wait_event()/wake_up_interruptible() take care
713 * of "raw_spin_is_locked" memory ordering.
714 */
715 if (finalized)
716 return -ENODATA;
852c2936
MD
717 else
718 return -EAGAIN;
719}
852c2936
MD
720
721/**
722 * lib_ring_buffer_put_snapshot - move consumed counter forward
723 * @buf: ring buffer
724 * @consumed_new: new consumed count value
725 */
726void lib_ring_buffer_move_consumer(struct lib_ring_buffer *buf,
727 unsigned long consumed_new)
728{
729 struct lib_ring_buffer_backend *bufb = &buf->backend;
a6352fd4 730 struct channel *chan = shmp(bufb->chan);
852c2936
MD
731 unsigned long consumed;
732
a6352fd4 733 CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1);
852c2936
MD
734
735 /*
736 * Only push the consumed value forward.
737 * If the consumed cmpxchg fails, this is because we have been pushed by
738 * the writer in flight recorder mode.
739 */
a6352fd4 740 consumed = uatomic_read(&buf->consumed);
852c2936 741 while ((long) consumed - (long) consumed_new < 0)
a6352fd4
MD
742 consumed = uatomic_cmpxchg(&buf->consumed, consumed,
743 consumed_new);
852c2936 744}
852c2936
MD
745
746/**
747 * lib_ring_buffer_get_subbuf - get exclusive access to subbuffer for reading
748 * @buf: ring buffer
749 * @consumed: consumed count indicating the position where to read
750 *
751 * Returns -ENODATA if buffer is finalized, -EAGAIN if there is currently no
752 * data to read at consumed position, or 0 if the get operation succeeds.
852c2936
MD
753 */
754int lib_ring_buffer_get_subbuf(struct lib_ring_buffer *buf,
755 unsigned long consumed)
756{
a6352fd4 757 struct channel *chan = shmp(buf->backend.chan);
852c2936
MD
758 const struct lib_ring_buffer_config *config = chan->backend.config;
759 unsigned long consumed_cur, consumed_idx, commit_count, write_offset;
760 int ret;
761 int finalized;
762
763retry:
14641deb 764 finalized = CMM_ACCESS_ONCE(buf->finalized);
852c2936
MD
765 /*
766 * Read finalized before counters.
767 */
a6352fd4
MD
768 cmm_smp_rmb();
769 consumed_cur = uatomic_read(&buf->consumed);
852c2936 770 consumed_idx = subbuf_index(consumed, chan);
a6352fd4 771 commit_count = v_read(config, &shmp(buf->commit_cold)[consumed_idx].cc_sb);
852c2936
MD
772 /*
773 * Make sure we read the commit count before reading the buffer
774 * data and the write offset. Correct consumed offset ordering
775 * wrt commit count is insured by the use of cmpxchg to update
776 * the consumed offset.
852c2936 777 */
a6352fd4
MD
778 /*
779 * Local rmb to match the remote wmb to read the commit count
780 * before the buffer data and the write offset.
781 */
782 cmm_smp_rmb();
852c2936
MD
783
784 write_offset = v_read(config, &buf->offset);
785
786 /*
787 * Check that the buffer we are getting is after or at consumed_cur
788 * position.
789 */
790 if ((long) subbuf_trunc(consumed, chan)
791 - (long) subbuf_trunc(consumed_cur, chan) < 0)
792 goto nodata;
793
794 /*
795 * Check that the subbuffer we are trying to consume has been
796 * already fully committed.
797 */
798 if (((commit_count - chan->backend.subbuf_size)
799 & chan->commit_count_mask)
800 - (buf_trunc(consumed_cur, chan)
801 >> chan->backend.num_subbuf_order)
802 != 0)
803 goto nodata;
804
805 /*
806 * Check that we are not about to read the same subbuffer in
807 * which the writer head is.
808 */
809 if (subbuf_trunc(write_offset, chan) - subbuf_trunc(consumed_cur, chan)
810 == 0)
811 goto nodata;
812
813 /*
814 * Failure to get the subbuffer causes a busy-loop retry without going
815 * to a wait queue. These are caused by short-lived race windows where
816 * the writer is getting access to a subbuffer we were trying to get
817 * access to. Also checks that the "consumed" buffer count we are
818 * looking for matches the one contained in the subbuffer id.
819 */
820 ret = update_read_sb_index(config, &buf->backend, &chan->backend,
821 consumed_idx, buf_trunc_val(consumed, chan));
822 if (ret)
823 goto retry;
824 subbuffer_id_clear_noref(config, &buf->backend.buf_rsb.id);
825
826 buf->get_subbuf_consumed = consumed;
827 buf->get_subbuf = 1;
828
829 return 0;
830
831nodata:
832 /*
833 * The memory barriers __wait_event()/wake_up_interruptible() take care
834 * of "raw_spin_is_locked" memory ordering.
835 */
836 if (finalized)
837 return -ENODATA;
852c2936
MD
838 else
839 return -EAGAIN;
840}
852c2936
MD
841
842/**
843 * lib_ring_buffer_put_subbuf - release exclusive subbuffer access
844 * @buf: ring buffer
845 */
846void lib_ring_buffer_put_subbuf(struct lib_ring_buffer *buf)
847{
848 struct lib_ring_buffer_backend *bufb = &buf->backend;
a6352fd4 849 struct channel *chan = shmp(bufb->chan);
852c2936
MD
850 const struct lib_ring_buffer_config *config = chan->backend.config;
851 unsigned long read_sb_bindex, consumed_idx, consumed;
852
a6352fd4 853 CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1);
852c2936
MD
854
855 if (!buf->get_subbuf) {
856 /*
857 * Reader puts a subbuffer it did not get.
858 */
859 CHAN_WARN_ON(chan, 1);
860 return;
861 }
862 consumed = buf->get_subbuf_consumed;
863 buf->get_subbuf = 0;
864
865 /*
866 * Clear the records_unread counter. (overruns counter)
867 * Can still be non-zero if a file reader simply grabbed the data
868 * without using iterators.
869 * Can be below zero if an iterator is used on a snapshot more than
870 * once.
871 */
872 read_sb_bindex = subbuffer_id_get_index(config, bufb->buf_rsb.id);
873 v_add(config, v_read(config,
a6352fd4 874 &shmp(bufb->array)[read_sb_bindex]->records_unread),
852c2936 875 &bufb->records_read);
a6352fd4 876 v_set(config, &shmp(bufb->array)[read_sb_bindex]->records_unread, 0);
852c2936
MD
877 CHAN_WARN_ON(chan, config->mode == RING_BUFFER_OVERWRITE
878 && subbuffer_id_is_noref(config, bufb->buf_rsb.id));
879 subbuffer_id_set_noref(config, &bufb->buf_rsb.id);
880
881 /*
882 * Exchange the reader subbuffer with the one we put in its place in the
883 * writer subbuffer table. Expect the original consumed count. If
884 * update_read_sb_index fails, this is because the writer updated the
885 * subbuffer concurrently. We should therefore keep the subbuffer we
886 * currently have: it has become invalid to try reading this sub-buffer
887 * consumed count value anyway.
888 */
889 consumed_idx = subbuf_index(consumed, chan);
890 update_read_sb_index(config, &buf->backend, &chan->backend,
891 consumed_idx, buf_trunc_val(consumed, chan));
892 /*
893 * update_read_sb_index return value ignored. Don't exchange sub-buffer
894 * if the writer concurrently updated it.
895 */
896}
852c2936
MD
897
898/*
899 * cons_offset is an iterator on all subbuffer offsets between the reader
900 * position and the writer position. (inclusive)
901 */
902static
903void lib_ring_buffer_print_subbuffer_errors(struct lib_ring_buffer *buf,
904 struct channel *chan,
905 unsigned long cons_offset,
906 int cpu)
907{
908 const struct lib_ring_buffer_config *config = chan->backend.config;
909 unsigned long cons_idx, commit_count, commit_count_sb;
910
911 cons_idx = subbuf_index(cons_offset, chan);
a6352fd4
MD
912 commit_count = v_read(config, &shmp(buf->commit_hot)[cons_idx].cc);
913 commit_count_sb = v_read(config, &shmp(buf->commit_cold)[cons_idx].cc_sb);
852c2936
MD
914
915 if (subbuf_offset(commit_count, chan) != 0)
a6352fd4 916 ERRMSG("ring buffer %s, cpu %d: "
852c2936
MD
917 "commit count in subbuffer %lu,\n"
918 "expecting multiples of %lu bytes\n"
919 " [ %lu bytes committed, %lu bytes reader-visible ]\n",
920 chan->backend.name, cpu, cons_idx,
921 chan->backend.subbuf_size,
922 commit_count, commit_count_sb);
923
a6352fd4 924 ERRMSG("ring buffer: %s, cpu %d: %lu bytes committed\n",
852c2936
MD
925 chan->backend.name, cpu, commit_count);
926}
927
928static
929void lib_ring_buffer_print_buffer_errors(struct lib_ring_buffer *buf,
930 struct channel *chan,
931 void *priv, int cpu)
932{
933 const struct lib_ring_buffer_config *config = chan->backend.config;
934 unsigned long write_offset, cons_offset;
935
936 /*
937 * Can be called in the error path of allocation when
938 * trans_channel_data is not yet set.
939 */
940 if (!chan)
941 return;
942 /*
943 * No need to order commit_count, write_offset and cons_offset reads
944 * because we execute at teardown when no more writer nor reader
945 * references are left.
946 */
947 write_offset = v_read(config, &buf->offset);
a6352fd4 948 cons_offset = uatomic_read(&buf->consumed);
852c2936 949 if (write_offset != cons_offset)
a6352fd4 950 ERRMSG("ring buffer %s, cpu %d: "
852c2936
MD
951 "non-consumed data\n"
952 " [ %lu bytes written, %lu bytes read ]\n",
953 chan->backend.name, cpu, write_offset, cons_offset);
954
a6352fd4 955 for (cons_offset = uatomic_read(&buf->consumed);
852c2936
MD
956 (long) (subbuf_trunc((unsigned long) v_read(config, &buf->offset),
957 chan)
958 - cons_offset) > 0;
959 cons_offset = subbuf_align(cons_offset, chan))
960 lib_ring_buffer_print_subbuffer_errors(buf, chan, cons_offset,
961 cpu);
962}
963
964static
965void lib_ring_buffer_print_errors(struct channel *chan,
966 struct lib_ring_buffer *buf, int cpu)
967{
968 const struct lib_ring_buffer_config *config = chan->backend.config;
969 void *priv = chan->backend.priv;
970
a6352fd4 971 ERRMSG("ring buffer %s, cpu %d: %lu records written, "
852c2936
MD
972 "%lu records overrun\n",
973 chan->backend.name, cpu,
974 v_read(config, &buf->records_count),
975 v_read(config, &buf->records_overrun));
976
977 if (v_read(config, &buf->records_lost_full)
978 || v_read(config, &buf->records_lost_wrap)
979 || v_read(config, &buf->records_lost_big))
a6352fd4 980 ERRMSG("ring buffer %s, cpu %d: records were lost. Caused by:\n"
852c2936
MD
981 " [ %lu buffer full, %lu nest buffer wrap-around, "
982 "%lu event too big ]\n",
983 chan->backend.name, cpu,
984 v_read(config, &buf->records_lost_full),
985 v_read(config, &buf->records_lost_wrap),
986 v_read(config, &buf->records_lost_big));
987
988 lib_ring_buffer_print_buffer_errors(buf, chan, priv, cpu);
989}
990
991/*
992 * lib_ring_buffer_switch_old_start: Populate old subbuffer header.
993 *
994 * Only executed when the buffer is finalized, in SWITCH_FLUSH.
995 */
996static
997void lib_ring_buffer_switch_old_start(struct lib_ring_buffer *buf,
998 struct channel *chan,
999 struct switch_offsets *offsets,
1000 u64 tsc)
1001{
1002 const struct lib_ring_buffer_config *config = chan->backend.config;
1003 unsigned long oldidx = subbuf_index(offsets->old, chan);
1004 unsigned long commit_count;
1005
1006 config->cb.buffer_begin(buf, tsc, oldidx);
1007
1008 /*
1009 * Order all writes to buffer before the commit count update that will
1010 * determine that the subbuffer is full.
1011 */
a6352fd4 1012 cmm_smp_wmb();
852c2936 1013 v_add(config, config->cb.subbuffer_header_size(),
a6352fd4
MD
1014 &shmp(buf->commit_hot)[oldidx].cc);
1015 commit_count = v_read(config, &shmp(buf->commit_hot)[oldidx].cc);
852c2936
MD
1016 /* Check if the written buffer has to be delivered */
1017 lib_ring_buffer_check_deliver(config, buf, chan, offsets->old,
1018 commit_count, oldidx);
1019 lib_ring_buffer_write_commit_counter(config, buf, chan, oldidx,
1020 offsets->old, commit_count,
1021 config->cb.subbuffer_header_size());
1022}
1023
1024/*
1025 * lib_ring_buffer_switch_old_end: switch old subbuffer
1026 *
1027 * Note : offset_old should never be 0 here. It is ok, because we never perform
1028 * buffer switch on an empty subbuffer in SWITCH_ACTIVE mode. The caller
1029 * increments the offset_old value when doing a SWITCH_FLUSH on an empty
1030 * subbuffer.
1031 */
1032static
1033void lib_ring_buffer_switch_old_end(struct lib_ring_buffer *buf,
1034 struct channel *chan,
1035 struct switch_offsets *offsets,
1036 u64 tsc)
1037{
1038 const struct lib_ring_buffer_config *config = chan->backend.config;
1039 unsigned long oldidx = subbuf_index(offsets->old - 1, chan);
1040 unsigned long commit_count, padding_size, data_size;
1041
1042 data_size = subbuf_offset(offsets->old - 1, chan) + 1;
1043 padding_size = chan->backend.subbuf_size - data_size;
1044 subbuffer_set_data_size(config, &buf->backend, oldidx, data_size);
1045
1046 /*
1047 * Order all writes to buffer before the commit count update that will
1048 * determine that the subbuffer is full.
1049 */
a6352fd4
MD
1050 cmm_smp_wmb();
1051 v_add(config, padding_size, &shmp(buf->commit_hot)[oldidx].cc);
1052 commit_count = v_read(config, &shmp(buf->commit_hot)[oldidx].cc);
852c2936
MD
1053 lib_ring_buffer_check_deliver(config, buf, chan, offsets->old - 1,
1054 commit_count, oldidx);
1055 lib_ring_buffer_write_commit_counter(config, buf, chan, oldidx,
1056 offsets->old, commit_count,
1057 padding_size);
1058}
1059
1060/*
1061 * lib_ring_buffer_switch_new_start: Populate new subbuffer.
1062 *
1063 * This code can be executed unordered : writers may already have written to the
1064 * sub-buffer before this code gets executed, caution. The commit makes sure
1065 * that this code is executed before the deliver of this sub-buffer.
1066 */
1067static
1068void lib_ring_buffer_switch_new_start(struct lib_ring_buffer *buf,
1069 struct channel *chan,
1070 struct switch_offsets *offsets,
1071 u64 tsc)
1072{
1073 const struct lib_ring_buffer_config *config = chan->backend.config;
1074 unsigned long beginidx = subbuf_index(offsets->begin, chan);
1075 unsigned long commit_count;
1076
1077 config->cb.buffer_begin(buf, tsc, beginidx);
1078
1079 /*
1080 * Order all writes to buffer before the commit count update that will
1081 * determine that the subbuffer is full.
1082 */
a6352fd4 1083 cmm_smp_wmb();
852c2936 1084 v_add(config, config->cb.subbuffer_header_size(),
a6352fd4
MD
1085 &shmp(buf->commit_hot)[beginidx].cc);
1086 commit_count = v_read(config, &shmp(buf->commit_hot)[beginidx].cc);
852c2936
MD
1087 /* Check if the written buffer has to be delivered */
1088 lib_ring_buffer_check_deliver(config, buf, chan, offsets->begin,
1089 commit_count, beginidx);
1090 lib_ring_buffer_write_commit_counter(config, buf, chan, beginidx,
1091 offsets->begin, commit_count,
1092 config->cb.subbuffer_header_size());
1093}
1094
1095/*
1096 * lib_ring_buffer_switch_new_end: finish switching current subbuffer
1097 *
1098 * The only remaining threads could be the ones with pending commits. They will
1099 * have to do the deliver themselves.
1100 */
1101static
1102void lib_ring_buffer_switch_new_end(struct lib_ring_buffer *buf,
1103 struct channel *chan,
1104 struct switch_offsets *offsets,
1105 u64 tsc)
1106{
1107 const struct lib_ring_buffer_config *config = chan->backend.config;
1108 unsigned long endidx = subbuf_index(offsets->end - 1, chan);
1109 unsigned long commit_count, padding_size, data_size;
1110
1111 data_size = subbuf_offset(offsets->end - 1, chan) + 1;
1112 padding_size = chan->backend.subbuf_size - data_size;
1113 subbuffer_set_data_size(config, &buf->backend, endidx, data_size);
1114
1115 /*
1116 * Order all writes to buffer before the commit count update that will
1117 * determine that the subbuffer is full.
1118 */
a6352fd4
MD
1119 cmm_smp_wmb();
1120 v_add(config, padding_size, &shmp(buf->commit_hot)[endidx].cc);
1121 commit_count = v_read(config, &shmp(buf->commit_hot)[endidx].cc);
852c2936
MD
1122 lib_ring_buffer_check_deliver(config, buf, chan, offsets->end - 1,
1123 commit_count, endidx);
1124 lib_ring_buffer_write_commit_counter(config, buf, chan, endidx,
1125 offsets->end, commit_count,
1126 padding_size);
1127}
1128
1129/*
1130 * Returns :
1131 * 0 if ok
1132 * !0 if execution must be aborted.
1133 */
1134static
1135int lib_ring_buffer_try_switch_slow(enum switch_mode mode,
1136 struct lib_ring_buffer *buf,
1137 struct channel *chan,
1138 struct switch_offsets *offsets,
1139 u64 *tsc)
1140{
1141 const struct lib_ring_buffer_config *config = chan->backend.config;
1142 unsigned long off;
1143
1144 offsets->begin = v_read(config, &buf->offset);
1145 offsets->old = offsets->begin;
1146 offsets->switch_old_start = 0;
1147 off = subbuf_offset(offsets->begin, chan);
1148
1149 *tsc = config->cb.ring_buffer_clock_read(chan);
1150
1151 /*
1152 * Ensure we flush the header of an empty subbuffer when doing the
1153 * finalize (SWITCH_FLUSH). This ensures that we end up knowing the
1154 * total data gathering duration even if there were no records saved
1155 * after the last buffer switch.
1156 * In SWITCH_ACTIVE mode, switch the buffer when it contains events.
1157 * SWITCH_ACTIVE only flushes the current subbuffer, dealing with end of
1158 * subbuffer header as appropriate.
1159 * The next record that reserves space will be responsible for
1160 * populating the following subbuffer header. We choose not to populate
1161 * the next subbuffer header here because we want to be able to use
a6352fd4
MD
1162 * SWITCH_ACTIVE for periodical buffer flush, which must
1163 * guarantee that all the buffer content (records and header
1164 * timestamps) are visible to the reader. This is required for
1165 * quiescence guarantees for the fusion merge.
852c2936
MD
1166 */
1167 if (mode == SWITCH_FLUSH || off > 0) {
1168 if (unlikely(off == 0)) {
1169 /*
1170 * The client does not save any header information.
1171 * Don't switch empty subbuffer on finalize, because it
1172 * is invalid to deliver a completely empty subbuffer.
1173 */
1174 if (!config->cb.subbuffer_header_size())
1175 return -1;
1176 /*
1177 * Need to write the subbuffer start header on finalize.
1178 */
1179 offsets->switch_old_start = 1;
1180 }
1181 offsets->begin = subbuf_align(offsets->begin, chan);
1182 } else
1183 return -1; /* we do not have to switch : buffer is empty */
1184 /* Note: old points to the next subbuf at offset 0 */
1185 offsets->end = offsets->begin;
1186 return 0;
1187}
1188
1189/*
1190 * Force a sub-buffer switch. This operation is completely reentrant : can be
1191 * called while tracing is active with absolutely no lock held.
1192 *
1193 * Note, however, that as a v_cmpxchg is used for some atomic
1194 * operations, this function must be called from the CPU which owns the buffer
1195 * for a ACTIVE flush.
1196 */
1197void lib_ring_buffer_switch_slow(struct lib_ring_buffer *buf, enum switch_mode mode)
1198{
a6352fd4 1199 struct channel *chan = shmp(buf->backend.chan);
852c2936
MD
1200 const struct lib_ring_buffer_config *config = chan->backend.config;
1201 struct switch_offsets offsets;
1202 unsigned long oldidx;
1203 u64 tsc;
1204
1205 offsets.size = 0;
1206
1207 /*
1208 * Perform retryable operations.
1209 */
1210 do {
1211 if (lib_ring_buffer_try_switch_slow(mode, buf, chan, &offsets,
1212 &tsc))
1213 return; /* Switch not needed */
1214 } while (v_cmpxchg(config, &buf->offset, offsets.old, offsets.end)
1215 != offsets.old);
1216
1217 /*
1218 * Atomically update last_tsc. This update races against concurrent
1219 * atomic updates, but the race will always cause supplementary full TSC
1220 * records, never the opposite (missing a full TSC record when it would
1221 * be needed).
1222 */
1223 save_last_tsc(config, buf, tsc);
1224
1225 /*
1226 * Push the reader if necessary
1227 */
1228 lib_ring_buffer_reserve_push_reader(buf, chan, offsets.old);
1229
1230 oldidx = subbuf_index(offsets.old, chan);
1231 lib_ring_buffer_clear_noref(config, &buf->backend, oldidx);
1232
1233 /*
1234 * May need to populate header start on SWITCH_FLUSH.
1235 */
1236 if (offsets.switch_old_start) {
1237 lib_ring_buffer_switch_old_start(buf, chan, &offsets, tsc);
1238 offsets.old += config->cb.subbuffer_header_size();
1239 }
1240
1241 /*
1242 * Switch old subbuffer.
1243 */
1244 lib_ring_buffer_switch_old_end(buf, chan, &offsets, tsc);
1245}
852c2936
MD
1246
1247/*
1248 * Returns :
1249 * 0 if ok
1250 * -ENOSPC if event size is too large for packet.
1251 * -ENOBUFS if there is currently not enough space in buffer for the event.
1252 * -EIO if data cannot be written into the buffer for any other reason.
1253 */
1254static
1255int lib_ring_buffer_try_reserve_slow(struct lib_ring_buffer *buf,
1256 struct channel *chan,
1257 struct switch_offsets *offsets,
1258 struct lib_ring_buffer_ctx *ctx)
1259{
1260 const struct lib_ring_buffer_config *config = chan->backend.config;
1261 unsigned long reserve_commit_diff;
1262
1263 offsets->begin = v_read(config, &buf->offset);
1264 offsets->old = offsets->begin;
1265 offsets->switch_new_start = 0;
1266 offsets->switch_new_end = 0;
1267 offsets->switch_old_end = 0;
1268 offsets->pre_header_padding = 0;
1269
1270 ctx->tsc = config->cb.ring_buffer_clock_read(chan);
1271 if ((int64_t) ctx->tsc == -EIO)
1272 return -EIO;
1273
1274 if (last_tsc_overflow(config, buf, ctx->tsc))
1275 ctx->rflags |= RING_BUFFER_RFLAG_FULL_TSC;
1276
1277 if (unlikely(subbuf_offset(offsets->begin, ctx->chan) == 0)) {
1278 offsets->switch_new_start = 1; /* For offsets->begin */
1279 } else {
1280 offsets->size = config->cb.record_header_size(config, chan,
1281 offsets->begin,
1282 &offsets->pre_header_padding,
1283 ctx);
1284 offsets->size +=
1285 lib_ring_buffer_align(offsets->begin + offsets->size,
1286 ctx->largest_align)
1287 + ctx->data_size;
1288 if (unlikely(subbuf_offset(offsets->begin, chan) +
1289 offsets->size > chan->backend.subbuf_size)) {
1290 offsets->switch_old_end = 1; /* For offsets->old */
1291 offsets->switch_new_start = 1; /* For offsets->begin */
1292 }
1293 }
1294 if (unlikely(offsets->switch_new_start)) {
1295 unsigned long sb_index;
1296
1297 /*
1298 * We are typically not filling the previous buffer completely.
1299 */
1300 if (likely(offsets->switch_old_end))
1301 offsets->begin = subbuf_align(offsets->begin, chan);
1302 offsets->begin = offsets->begin
1303 + config->cb.subbuffer_header_size();
1304 /* Test new buffer integrity */
1305 sb_index = subbuf_index(offsets->begin, chan);
1306 reserve_commit_diff =
1307 (buf_trunc(offsets->begin, chan)
1308 >> chan->backend.num_subbuf_order)
1309 - ((unsigned long) v_read(config,
a6352fd4 1310 &shmp(buf->commit_cold)[sb_index].cc_sb)
852c2936
MD
1311 & chan->commit_count_mask);
1312 if (likely(reserve_commit_diff == 0)) {
1313 /* Next subbuffer not being written to. */
1314 if (unlikely(config->mode != RING_BUFFER_OVERWRITE &&
1315 subbuf_trunc(offsets->begin, chan)
1316 - subbuf_trunc((unsigned long)
a6352fd4 1317 uatomic_read(&buf->consumed), chan)
852c2936
MD
1318 >= chan->backend.buf_size)) {
1319 /*
1320 * We do not overwrite non consumed buffers
1321 * and we are full : record is lost.
1322 */
1323 v_inc(config, &buf->records_lost_full);
1324 return -ENOBUFS;
1325 } else {
1326 /*
1327 * Next subbuffer not being written to, and we
1328 * are either in overwrite mode or the buffer is
1329 * not full. It's safe to write in this new
1330 * subbuffer.
1331 */
1332 }
1333 } else {
1334 /*
1335 * Next subbuffer reserve offset does not match the
1336 * commit offset. Drop record in producer-consumer and
1337 * overwrite mode. Caused by either a writer OOPS or too
1338 * many nested writes over a reserve/commit pair.
1339 */
1340 v_inc(config, &buf->records_lost_wrap);
1341 return -EIO;
1342 }
1343 offsets->size =
1344 config->cb.record_header_size(config, chan,
1345 offsets->begin,
1346 &offsets->pre_header_padding,
1347 ctx);
1348 offsets->size +=
1349 lib_ring_buffer_align(offsets->begin + offsets->size,
1350 ctx->largest_align)
1351 + ctx->data_size;
1352 if (unlikely(subbuf_offset(offsets->begin, chan)
1353 + offsets->size > chan->backend.subbuf_size)) {
1354 /*
1355 * Record too big for subbuffers, report error, don't
1356 * complete the sub-buffer switch.
1357 */
1358 v_inc(config, &buf->records_lost_big);
1359 return -ENOSPC;
1360 } else {
1361 /*
1362 * We just made a successful buffer switch and the
1363 * record fits in the new subbuffer. Let's write.
1364 */
1365 }
1366 } else {
1367 /*
1368 * Record fits in the current buffer and we are not on a switch
1369 * boundary. It's safe to write.
1370 */
1371 }
1372 offsets->end = offsets->begin + offsets->size;
1373
1374 if (unlikely(subbuf_offset(offsets->end, chan) == 0)) {
1375 /*
1376 * The offset_end will fall at the very beginning of the next
1377 * subbuffer.
1378 */
1379 offsets->switch_new_end = 1; /* For offsets->begin */
1380 }
1381 return 0;
1382}
1383
1384/**
1385 * lib_ring_buffer_reserve_slow - Atomic slot reservation in a buffer.
1386 * @ctx: ring buffer context.
1387 *
1388 * Return : -NOBUFS if not enough space, -ENOSPC if event size too large,
1389 * -EIO for other errors, else returns 0.
1390 * It will take care of sub-buffer switching.
1391 */
1392int lib_ring_buffer_reserve_slow(struct lib_ring_buffer_ctx *ctx)
1393{
1394 struct channel *chan = ctx->chan;
1395 const struct lib_ring_buffer_config *config = chan->backend.config;
1396 struct lib_ring_buffer *buf;
1397 struct switch_offsets offsets;
1398 int ret;
1399
1400 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
a6352fd4 1401 buf = &shmp(chan->backend.buf)[ctx->cpu];
852c2936 1402 else
a6352fd4 1403 buf = shmp(chan->backend.buf);
852c2936
MD
1404 ctx->buf = buf;
1405
1406 offsets.size = 0;
1407
1408 do {
1409 ret = lib_ring_buffer_try_reserve_slow(buf, chan, &offsets,
1410 ctx);
1411 if (unlikely(ret))
1412 return ret;
1413 } while (unlikely(v_cmpxchg(config, &buf->offset, offsets.old,
1414 offsets.end)
1415 != offsets.old));
1416
1417 /*
1418 * Atomically update last_tsc. This update races against concurrent
1419 * atomic updates, but the race will always cause supplementary full TSC
1420 * records, never the opposite (missing a full TSC record when it would
1421 * be needed).
1422 */
1423 save_last_tsc(config, buf, ctx->tsc);
1424
1425 /*
1426 * Push the reader if necessary
1427 */
1428 lib_ring_buffer_reserve_push_reader(buf, chan, offsets.end - 1);
1429
1430 /*
1431 * Clear noref flag for this subbuffer.
1432 */
1433 lib_ring_buffer_clear_noref(config, &buf->backend,
1434 subbuf_index(offsets.end - 1, chan));
1435
1436 /*
1437 * Switch old subbuffer if needed.
1438 */
1439 if (unlikely(offsets.switch_old_end)) {
1440 lib_ring_buffer_clear_noref(config, &buf->backend,
1441 subbuf_index(offsets.old - 1, chan));
1442 lib_ring_buffer_switch_old_end(buf, chan, &offsets, ctx->tsc);
1443 }
1444
1445 /*
1446 * Populate new subbuffer.
1447 */
1448 if (unlikely(offsets.switch_new_start))
1449 lib_ring_buffer_switch_new_start(buf, chan, &offsets, ctx->tsc);
1450
1451 if (unlikely(offsets.switch_new_end))
1452 lib_ring_buffer_switch_new_end(buf, chan, &offsets, ctx->tsc);
1453
1454 ctx->slot_size = offsets.size;
1455 ctx->pre_offset = offsets.begin;
1456 ctx->buf_offset = offsets.begin + offsets.pre_header_padding;
1457 return 0;
1458}
This page took 0.084202 seconds and 4 git commands to generate.