Revert "Use ENOMSG as fallback for ENODATA on freebsd"
[lttng-ust.git] / libringbuffer / ring_buffer_frontend.c
1 /*
2 * ring_buffer_frontend.c
3 *
4 * (C) Copyright 2005-2010 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
5 *
6 * Ring buffer wait-free buffer synchronization. Producer-consumer and flight
7 * recorder (overwrite) modes. See thesis:
8 *
9 * Desnoyers, Mathieu (2009), "Low-Impact Operating System Tracing", Ph.D.
10 * dissertation, Ecole Polytechnique de Montreal.
11 * http://www.lttng.org/pub/thesis/desnoyers-dissertation-2009-12.pdf
12 *
13 * - Algorithm presentation in Chapter 5:
14 * "Lockless Multi-Core High-Throughput Buffering".
15 * - Algorithm formal verification in Section 8.6:
16 * "Formal verification of LTTng"
17 *
18 * Author:
19 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
20 *
21 * Inspired from LTT and RelayFS:
22 * Karim Yaghmour <karim@opersys.com>
23 * Tom Zanussi <zanussi@us.ibm.com>
24 * Bob Wisniewski <bob@watson.ibm.com>
25 * And from K42 :
26 * Bob Wisniewski <bob@watson.ibm.com>
27 *
28 * Buffer reader semantic :
29 *
30 * - get_subbuf_size
31 * while buffer is not finalized and empty
32 * - get_subbuf
33 * - if return value != 0, continue
34 * - splice one subbuffer worth of data to a pipe
35 * - splice the data from pipe to disk/network
36 * - put_subbuf
37 *
38 * Dual LGPL v2.1/GPL v2 license.
39 */
40
41 #define _GNU_SOURCE
42 #include <sys/types.h>
43 #include <sys/mman.h>
44 #include <sys/stat.h>
45 #include <fcntl.h>
46 #include <urcu/compiler.h>
47 #include <urcu/ref.h>
48 #include <helper.h>
49
50 #include "smp.h"
51 #include <lttng/ringbuffer-config.h>
52 #include "vatomic.h"
53 #include "backend.h"
54 #include "frontend.h"
55 #include "shm.h"
56
57 #ifndef max
58 #define max(a, b) ((a) > (b) ? (a) : (b))
59 #endif
60
61 /*
62 * Use POSIX SHM: shm_open(3) and shm_unlink(3).
63 * close(2) to close the fd returned by shm_open.
64 * shm_unlink releases the shared memory object name.
65 * ftruncate(2) sets the size of the memory object.
66 * mmap/munmap maps the shared memory obj to a virtual address in the
67 * calling proceess (should be done both in libust and consumer).
68 * See shm_overview(7) for details.
69 * Pass file descriptor returned by shm_open(3) to ltt-sessiond through
70 * a UNIX socket.
71 *
72 * Since we don't need to access the object using its name, we can
73 * immediately shm_unlink(3) it, and only keep the handle with its file
74 * descriptor.
75 */
76
77 /*
78 * Internal structure representing offsets to use at a sub-buffer switch.
79 */
80 struct switch_offsets {
81 unsigned long begin, end, old;
82 size_t pre_header_padding, size;
83 unsigned int switch_new_start:1, switch_new_end:1, switch_old_start:1,
84 switch_old_end:1;
85 };
86
87 __thread unsigned int lib_ring_buffer_nesting;
88
89 /*
90 * TODO: this is unused. Errors are saved within the ring buffer.
91 * Eventually, allow consumerd to print these errors.
92 */
93 static
94 void lib_ring_buffer_print_errors(struct channel *chan,
95 struct lttng_ust_lib_ring_buffer *buf, int cpu,
96 struct lttng_ust_shm_handle *handle)
97 __attribute__((unused));
98
99 /**
100 * lib_ring_buffer_reset - Reset ring buffer to initial values.
101 * @buf: Ring buffer.
102 *
103 * Effectively empty the ring buffer. Should be called when the buffer is not
104 * used for writing. The ring buffer can be opened for reading, but the reader
105 * should not be using the iterator concurrently with reset. The previous
106 * current iterator record is reset.
107 */
108 void lib_ring_buffer_reset(struct lttng_ust_lib_ring_buffer *buf,
109 struct lttng_ust_shm_handle *handle)
110 {
111 struct channel *chan = shmp(handle, buf->backend.chan);
112 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
113 unsigned int i;
114
115 /*
116 * Reset iterator first. It will put the subbuffer if it currently holds
117 * it.
118 */
119 v_set(config, &buf->offset, 0);
120 for (i = 0; i < chan->backend.num_subbuf; i++) {
121 v_set(config, &shmp_index(handle, buf->commit_hot, i)->cc, 0);
122 v_set(config, &shmp_index(handle, buf->commit_hot, i)->seq, 0);
123 v_set(config, &shmp_index(handle, buf->commit_cold, i)->cc_sb, 0);
124 }
125 uatomic_set(&buf->consumed, 0);
126 uatomic_set(&buf->record_disabled, 0);
127 v_set(config, &buf->last_tsc, 0);
128 lib_ring_buffer_backend_reset(&buf->backend, handle);
129 /* Don't reset number of active readers */
130 v_set(config, &buf->records_lost_full, 0);
131 v_set(config, &buf->records_lost_wrap, 0);
132 v_set(config, &buf->records_lost_big, 0);
133 v_set(config, &buf->records_count, 0);
134 v_set(config, &buf->records_overrun, 0);
135 buf->finalized = 0;
136 }
137
138 /**
139 * channel_reset - Reset channel to initial values.
140 * @chan: Channel.
141 *
142 * Effectively empty the channel. Should be called when the channel is not used
143 * for writing. The channel can be opened for reading, but the reader should not
144 * be using the iterator concurrently with reset. The previous current iterator
145 * record is reset.
146 */
147 void channel_reset(struct channel *chan)
148 {
149 /*
150 * Reset iterators first. Will put the subbuffer if held for reading.
151 */
152 uatomic_set(&chan->record_disabled, 0);
153 /* Don't reset commit_count_mask, still valid */
154 channel_backend_reset(&chan->backend);
155 /* Don't reset switch/read timer interval */
156 /* Don't reset notifiers and notifier enable bits */
157 /* Don't reset reader reference count */
158 }
159
160 /*
161 * Must be called under cpu hotplug protection.
162 */
163 int lib_ring_buffer_create(struct lttng_ust_lib_ring_buffer *buf,
164 struct channel_backend *chanb, int cpu,
165 struct lttng_ust_shm_handle *handle,
166 struct shm_object *shmobj)
167 {
168 const struct lttng_ust_lib_ring_buffer_config *config = &chanb->config;
169 struct channel *chan = caa_container_of(chanb, struct channel, backend);
170 void *priv = channel_get_private(chan);
171 size_t subbuf_header_size;
172 uint64_t tsc;
173 int ret;
174
175 /* Test for cpu hotplug */
176 if (buf->backend.allocated)
177 return 0;
178
179 ret = lib_ring_buffer_backend_create(&buf->backend, &chan->backend,
180 cpu, handle, shmobj);
181 if (ret)
182 return ret;
183
184 align_shm(shmobj, __alignof__(struct commit_counters_hot));
185 set_shmp(buf->commit_hot,
186 zalloc_shm(shmobj,
187 sizeof(struct commit_counters_hot) * chan->backend.num_subbuf));
188 if (!shmp(handle, buf->commit_hot)) {
189 ret = -ENOMEM;
190 goto free_chanbuf;
191 }
192
193 align_shm(shmobj, __alignof__(struct commit_counters_cold));
194 set_shmp(buf->commit_cold,
195 zalloc_shm(shmobj,
196 sizeof(struct commit_counters_cold) * chan->backend.num_subbuf));
197 if (!shmp(handle, buf->commit_cold)) {
198 ret = -ENOMEM;
199 goto free_commit;
200 }
201
202 /*
203 * Write the subbuffer header for first subbuffer so we know the total
204 * duration of data gathering.
205 */
206 subbuf_header_size = config->cb.subbuffer_header_size();
207 v_set(config, &buf->offset, subbuf_header_size);
208 subbuffer_id_clear_noref(config, &shmp_index(handle, buf->backend.buf_wsb, 0)->id);
209 tsc = config->cb.ring_buffer_clock_read(shmp(handle, buf->backend.chan));
210 config->cb.buffer_begin(buf, tsc, 0, handle);
211 v_add(config, subbuf_header_size, &shmp_index(handle, buf->commit_hot, 0)->cc);
212
213 if (config->cb.buffer_create) {
214 ret = config->cb.buffer_create(buf, priv, cpu, chanb->name, handle);
215 if (ret)
216 goto free_init;
217 }
218 buf->backend.allocated = 1;
219 return 0;
220
221 /* Error handling */
222 free_init:
223 /* commit_cold will be freed by shm teardown */
224 free_commit:
225 /* commit_hot will be freed by shm teardown */
226 free_chanbuf:
227 return ret;
228 }
229
230 #if 0
231 static void switch_buffer_timer(unsigned long data)
232 {
233 struct lttng_ust_lib_ring_buffer *buf = (struct lttng_ust_lib_ring_buffer *)data;
234 struct channel *chan = shmp(handle, buf->backend.chan);
235 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
236
237 /*
238 * Only flush buffers periodically if readers are active.
239 */
240 if (uatomic_read(&buf->active_readers) || uatomic_read(&buf->active_shadow_readers))
241 lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE, handle);
242
243 //TODO timers
244 //if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
245 // mod_timer_pinned(&buf->switch_timer,
246 // jiffies + chan->switch_timer_interval);
247 //else
248 // mod_timer(&buf->switch_timer,
249 // jiffies + chan->switch_timer_interval);
250 }
251 #endif //0
252
253 static void lib_ring_buffer_start_switch_timer(struct lttng_ust_lib_ring_buffer *buf,
254 struct lttng_ust_shm_handle *handle)
255 {
256 struct channel *chan = shmp(handle, buf->backend.chan);
257 //const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
258
259 if (!chan->switch_timer_interval || buf->switch_timer_enabled)
260 return;
261 //TODO
262 //init_timer(&buf->switch_timer);
263 //buf->switch_timer.function = switch_buffer_timer;
264 //buf->switch_timer.expires = jiffies + chan->switch_timer_interval;
265 //buf->switch_timer.data = (unsigned long)buf;
266 //if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
267 // add_timer_on(&buf->switch_timer, buf->backend.cpu);
268 //else
269 // add_timer(&buf->switch_timer);
270 buf->switch_timer_enabled = 1;
271 }
272
273 static void lib_ring_buffer_stop_switch_timer(struct lttng_ust_lib_ring_buffer *buf,
274 struct lttng_ust_shm_handle *handle)
275 {
276 struct channel *chan = shmp(handle, buf->backend.chan);
277
278 if (!chan->switch_timer_interval || !buf->switch_timer_enabled)
279 return;
280
281 //TODO
282 //del_timer_sync(&buf->switch_timer);
283 buf->switch_timer_enabled = 0;
284 }
285
286 #if 0
287 /*
288 * Polling timer to check the channels for data.
289 */
290 static void read_buffer_timer(unsigned long data)
291 {
292 struct lttng_ust_lib_ring_buffer *buf = (struct lttng_ust_lib_ring_buffer *)data;
293 struct channel *chan = shmp(handle, buf->backend.chan);
294 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
295
296 CHAN_WARN_ON(chan, !buf->backend.allocated);
297
298 if (uatomic_read(&buf->active_readers) || uatomic_read(&buf->active_shadow_readers))
299 && lib_ring_buffer_poll_deliver(config, buf, chan)) {
300 //TODO
301 //wake_up_interruptible(&buf->read_wait);
302 //wake_up_interruptible(&chan->read_wait);
303 }
304
305 //TODO
306 //if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
307 // mod_timer_pinned(&buf->read_timer,
308 // jiffies + chan->read_timer_interval);
309 //else
310 // mod_timer(&buf->read_timer,
311 // jiffies + chan->read_timer_interval);
312 }
313 #endif //0
314
315 static void lib_ring_buffer_start_read_timer(struct lttng_ust_lib_ring_buffer *buf,
316 struct lttng_ust_shm_handle *handle)
317 {
318 struct channel *chan = shmp(handle, buf->backend.chan);
319 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
320
321 if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER
322 || !chan->read_timer_interval
323 || buf->read_timer_enabled)
324 return;
325
326 //TODO
327 //init_timer(&buf->read_timer);
328 //buf->read_timer.function = read_buffer_timer;
329 //buf->read_timer.expires = jiffies + chan->read_timer_interval;
330 //buf->read_timer.data = (unsigned long)buf;
331
332 //if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
333 // add_timer_on(&buf->read_timer, buf->backend.cpu);
334 //else
335 // add_timer(&buf->read_timer);
336 buf->read_timer_enabled = 1;
337 }
338
339 static void lib_ring_buffer_stop_read_timer(struct lttng_ust_lib_ring_buffer *buf,
340 struct lttng_ust_shm_handle *handle)
341 {
342 struct channel *chan = shmp(handle, buf->backend.chan);
343 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
344
345 if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER
346 || !chan->read_timer_interval
347 || !buf->read_timer_enabled)
348 return;
349
350 //TODO
351 //del_timer_sync(&buf->read_timer);
352 /*
353 * do one more check to catch data that has been written in the last
354 * timer period.
355 */
356 if (lib_ring_buffer_poll_deliver(config, buf, chan, handle)) {
357 //TODO
358 //wake_up_interruptible(&buf->read_wait);
359 //wake_up_interruptible(&chan->read_wait);
360 }
361 buf->read_timer_enabled = 0;
362 }
363
364 static void channel_unregister_notifiers(struct channel *chan,
365 struct lttng_ust_shm_handle *handle)
366 {
367 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
368 int cpu;
369
370 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
371 for_each_possible_cpu(cpu) {
372 struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[cpu].shmp);
373
374 lib_ring_buffer_stop_switch_timer(buf, handle);
375 lib_ring_buffer_stop_read_timer(buf, handle);
376 }
377 } else {
378 struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[0].shmp);
379
380 lib_ring_buffer_stop_switch_timer(buf, handle);
381 lib_ring_buffer_stop_read_timer(buf, handle);
382 }
383 //channel_backend_unregister_notifiers(&chan->backend);
384 }
385
386 static void channel_free(struct channel *chan, struct lttng_ust_shm_handle *handle,
387 int shadow)
388 {
389 if (!shadow)
390 channel_backend_free(&chan->backend, handle);
391 /* chan is freed by shm teardown */
392 shm_object_table_destroy(handle->table);
393 free(handle);
394 }
395
396 /**
397 * channel_create - Create channel.
398 * @config: ring buffer instance configuration
399 * @name: name of the channel
400 * @priv_data: ring buffer client private data area pointer (output)
401 * @priv_data_size: length, in bytes, of the private data area.
402 * @priv_data_init: initialization data for private data.
403 * @buf_addr: pointer the the beginning of the preallocated buffer contiguous
404 * address mapping. It is used only by RING_BUFFER_STATIC
405 * configuration. It can be set to NULL for other backends.
406 * @subbuf_size: subbuffer size
407 * @num_subbuf: number of subbuffers
408 * @switch_timer_interval: Time interval (in us) to fill sub-buffers with
409 * padding to let readers get those sub-buffers.
410 * Used for live streaming.
411 * @read_timer_interval: Time interval (in us) to wake up pending readers.
412 *
413 * Holds cpu hotplug.
414 * Returns NULL on failure.
415 */
416 struct lttng_ust_shm_handle *channel_create(const struct lttng_ust_lib_ring_buffer_config *config,
417 const char *name,
418 void **priv_data,
419 size_t priv_data_align,
420 size_t priv_data_size,
421 void *priv_data_init,
422 void *buf_addr, size_t subbuf_size,
423 size_t num_subbuf, unsigned int switch_timer_interval,
424 unsigned int read_timer_interval,
425 int **shm_fd, int **wait_fd, uint64_t **memory_map_size)
426 {
427 int ret, cpu;
428 size_t shmsize, chansize;
429 struct channel *chan;
430 struct lttng_ust_shm_handle *handle;
431 struct shm_object *shmobj;
432 struct shm_ref *ref;
433
434 if (lib_ring_buffer_check_config(config, switch_timer_interval,
435 read_timer_interval))
436 return NULL;
437
438 handle = zmalloc(sizeof(struct lttng_ust_shm_handle));
439 if (!handle)
440 return NULL;
441
442 /* Allocate table for channel + per-cpu buffers */
443 handle->table = shm_object_table_create(1 + num_possible_cpus());
444 if (!handle->table)
445 goto error_table_alloc;
446
447 /* Calculate the shm allocation layout */
448 shmsize = sizeof(struct channel);
449 shmsize += offset_align(shmsize, __alignof__(struct lttng_ust_lib_ring_buffer_shmp));
450 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
451 shmsize += sizeof(struct lttng_ust_lib_ring_buffer_shmp) * num_possible_cpus();
452 else
453 shmsize += sizeof(struct lttng_ust_lib_ring_buffer_shmp);
454 chansize = shmsize;
455 shmsize += offset_align(shmsize, priv_data_align);
456 shmsize += priv_data_size;
457
458 shmobj = shm_object_table_append(handle->table, shmsize);
459 if (!shmobj)
460 goto error_append;
461 /* struct channel is at object 0, offset 0 (hardcoded) */
462 set_shmp(handle->chan, zalloc_shm(shmobj, chansize));
463 assert(handle->chan._ref.index == 0);
464 assert(handle->chan._ref.offset == 0);
465 chan = shmp(handle, handle->chan);
466 if (!chan)
467 goto error_append;
468
469 /* space for private data */
470 if (priv_data_size) {
471 DECLARE_SHMP(void, priv_data_alloc);
472
473 align_shm(shmobj, priv_data_align);
474 chan->priv_data_offset = shmobj->allocated_len;
475 set_shmp(priv_data_alloc, zalloc_shm(shmobj, priv_data_size));
476 if (!shmp(handle, priv_data_alloc))
477 goto error_append;
478 *priv_data = channel_get_private(chan);
479 memcpy(*priv_data, priv_data_init, priv_data_size);
480 } else {
481 chan->priv_data_offset = -1;
482 *priv_data = NULL;
483 }
484
485 ret = channel_backend_init(&chan->backend, name, config,
486 subbuf_size, num_subbuf, handle);
487 if (ret)
488 goto error_backend_init;
489
490 chan->commit_count_mask = (~0UL >> chan->backend.num_subbuf_order);
491 //TODO
492 //chan->switch_timer_interval = usecs_to_jiffies(switch_timer_interval);
493 //chan->read_timer_interval = usecs_to_jiffies(read_timer_interval);
494 //TODO
495 //init_waitqueue_head(&chan->read_wait);
496 //init_waitqueue_head(&chan->hp_wait);
497
498 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
499 /*
500 * In case of non-hotplug cpu, if the ring-buffer is allocated
501 * in early initcall, it will not be notified of secondary cpus.
502 * In that off case, we need to allocate for all possible cpus.
503 */
504 for_each_possible_cpu(cpu) {
505 struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[cpu].shmp);
506 lib_ring_buffer_start_switch_timer(buf, handle);
507 lib_ring_buffer_start_read_timer(buf, handle);
508 }
509 } else {
510 struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[0].shmp);
511
512 lib_ring_buffer_start_switch_timer(buf, handle);
513 lib_ring_buffer_start_read_timer(buf, handle);
514 }
515 ref = &handle->chan._ref;
516 shm_get_object_data(handle, ref, shm_fd, wait_fd, memory_map_size);
517 return handle;
518
519 error_backend_init:
520 error_append:
521 shm_object_table_destroy(handle->table);
522 error_table_alloc:
523 free(handle);
524 return NULL;
525 }
526
527 struct lttng_ust_shm_handle *channel_handle_create(int shm_fd, int wait_fd,
528 uint64_t memory_map_size)
529 {
530 struct lttng_ust_shm_handle *handle;
531 struct shm_object *object;
532
533 handle = zmalloc(sizeof(struct lttng_ust_shm_handle));
534 if (!handle)
535 return NULL;
536
537 /* Allocate table for channel + per-cpu buffers */
538 handle->table = shm_object_table_create(1 + num_possible_cpus());
539 if (!handle->table)
540 goto error_table_alloc;
541 /* Add channel object */
542 object = shm_object_table_append_shadow(handle->table,
543 shm_fd, wait_fd, memory_map_size);
544 if (!object)
545 goto error_table_object;
546 /* struct channel is at object 0, offset 0 (hardcoded) */
547 handle->chan._ref.index = 0;
548 handle->chan._ref.offset = 0;
549 return handle;
550
551 error_table_object:
552 shm_object_table_destroy(handle->table);
553 error_table_alloc:
554 free(handle);
555 return NULL;
556 }
557
558 int channel_handle_add_stream(struct lttng_ust_shm_handle *handle,
559 int shm_fd, int wait_fd, uint64_t memory_map_size)
560 {
561 struct shm_object *object;
562
563 /* Add stream object */
564 object = shm_object_table_append_shadow(handle->table,
565 shm_fd, wait_fd, memory_map_size);
566 if (!object)
567 return -1;
568 return 0;
569 }
570
571 static
572 void channel_release(struct channel *chan, struct lttng_ust_shm_handle *handle,
573 int shadow)
574 {
575 channel_free(chan, handle, shadow);
576 }
577
578 /**
579 * channel_destroy - Finalize, wait for q.s. and destroy channel.
580 * @chan: channel to destroy
581 *
582 * Holds cpu hotplug.
583 * Call "destroy" callback, finalize channels, decrement the channel
584 * reference count. Note that when readers have completed data
585 * consumption of finalized channels, get_subbuf() will return -ENODATA.
586 * They should release their handle at that point.
587 */
588 void channel_destroy(struct channel *chan, struct lttng_ust_shm_handle *handle,
589 int shadow)
590 {
591 if (shadow) {
592 channel_release(chan, handle, shadow);
593 return;
594 }
595
596 channel_unregister_notifiers(chan, handle);
597
598 /*
599 * Note: the consumer takes care of finalizing and switching the
600 * buffers.
601 */
602
603 /*
604 * sessiond/consumer are keeping a reference on the shm file
605 * descriptor directly. No need to refcount.
606 */
607 channel_release(chan, handle, shadow);
608 return;
609 }
610
611 struct lttng_ust_lib_ring_buffer *channel_get_ring_buffer(
612 const struct lttng_ust_lib_ring_buffer_config *config,
613 struct channel *chan, int cpu,
614 struct lttng_ust_shm_handle *handle,
615 int **shm_fd, int **wait_fd,
616 uint64_t **memory_map_size)
617 {
618 struct shm_ref *ref;
619
620 if (config->alloc == RING_BUFFER_ALLOC_GLOBAL) {
621 ref = &chan->backend.buf[0].shmp._ref;
622 shm_get_object_data(handle, ref, shm_fd, wait_fd,
623 memory_map_size);
624 return shmp(handle, chan->backend.buf[0].shmp);
625 } else {
626 if (cpu >= num_possible_cpus())
627 return NULL;
628 ref = &chan->backend.buf[cpu].shmp._ref;
629 shm_get_object_data(handle, ref, shm_fd, wait_fd,
630 memory_map_size);
631 return shmp(handle, chan->backend.buf[cpu].shmp);
632 }
633 }
634
635 int lib_ring_buffer_open_read(struct lttng_ust_lib_ring_buffer *buf,
636 struct lttng_ust_shm_handle *handle,
637 int shadow)
638 {
639 if (shadow) {
640 if (uatomic_cmpxchg(&buf->active_shadow_readers, 0, 1) != 0)
641 return -EBUSY;
642 cmm_smp_mb();
643 return 0;
644 }
645 if (uatomic_cmpxchg(&buf->active_readers, 0, 1) != 0)
646 return -EBUSY;
647 cmm_smp_mb();
648 return 0;
649 }
650
651 void lib_ring_buffer_release_read(struct lttng_ust_lib_ring_buffer *buf,
652 struct lttng_ust_shm_handle *handle,
653 int shadow)
654 {
655 struct channel *chan = shmp(handle, buf->backend.chan);
656
657 if (shadow) {
658 CHAN_WARN_ON(chan, uatomic_read(&buf->active_shadow_readers) != 1);
659 cmm_smp_mb();
660 uatomic_dec(&buf->active_shadow_readers);
661 return;
662 }
663 CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1);
664 cmm_smp_mb();
665 uatomic_dec(&buf->active_readers);
666 }
667
668 /**
669 * lib_ring_buffer_snapshot - save subbuffer position snapshot (for read)
670 * @buf: ring buffer
671 * @consumed: consumed count indicating the position where to read
672 * @produced: produced count, indicates position when to stop reading
673 *
674 * Returns -ENODATA if buffer is finalized, -EAGAIN if there is currently no
675 * data to read at consumed position, or 0 if the get operation succeeds.
676 */
677
678 int lib_ring_buffer_snapshot(struct lttng_ust_lib_ring_buffer *buf,
679 unsigned long *consumed, unsigned long *produced,
680 struct lttng_ust_shm_handle *handle)
681 {
682 struct channel *chan = shmp(handle, buf->backend.chan);
683 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
684 unsigned long consumed_cur, write_offset;
685 int finalized;
686
687 finalized = CMM_ACCESS_ONCE(buf->finalized);
688 /*
689 * Read finalized before counters.
690 */
691 cmm_smp_rmb();
692 consumed_cur = uatomic_read(&buf->consumed);
693 /*
694 * No need to issue a memory barrier between consumed count read and
695 * write offset read, because consumed count can only change
696 * concurrently in overwrite mode, and we keep a sequence counter
697 * identifier derived from the write offset to check we are getting
698 * the same sub-buffer we are expecting (the sub-buffers are atomically
699 * "tagged" upon writes, tags are checked upon read).
700 */
701 write_offset = v_read(config, &buf->offset);
702
703 /*
704 * Check that we are not about to read the same subbuffer in
705 * which the writer head is.
706 */
707 if (subbuf_trunc(write_offset, chan) - subbuf_trunc(consumed_cur, chan)
708 == 0)
709 goto nodata;
710
711 *consumed = consumed_cur;
712 *produced = subbuf_trunc(write_offset, chan);
713
714 return 0;
715
716 nodata:
717 /*
718 * The memory barriers __wait_event()/wake_up_interruptible() take care
719 * of "raw_spin_is_locked" memory ordering.
720 */
721 if (finalized)
722 return -ENODATA;
723 else
724 return -EAGAIN;
725 }
726
727 /**
728 * lib_ring_buffer_put_snapshot - move consumed counter forward
729 * @buf: ring buffer
730 * @consumed_new: new consumed count value
731 */
732 void lib_ring_buffer_move_consumer(struct lttng_ust_lib_ring_buffer *buf,
733 unsigned long consumed_new,
734 struct lttng_ust_shm_handle *handle)
735 {
736 struct lttng_ust_lib_ring_buffer_backend *bufb = &buf->backend;
737 struct channel *chan = shmp(handle, bufb->chan);
738 unsigned long consumed;
739
740 CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1
741 && uatomic_read(&buf->active_shadow_readers) != 1);
742
743 /*
744 * Only push the consumed value forward.
745 * If the consumed cmpxchg fails, this is because we have been pushed by
746 * the writer in flight recorder mode.
747 */
748 consumed = uatomic_read(&buf->consumed);
749 while ((long) consumed - (long) consumed_new < 0)
750 consumed = uatomic_cmpxchg(&buf->consumed, consumed,
751 consumed_new);
752 }
753
754 /**
755 * lib_ring_buffer_get_subbuf - get exclusive access to subbuffer for reading
756 * @buf: ring buffer
757 * @consumed: consumed count indicating the position where to read
758 *
759 * Returns -ENODATA if buffer is finalized, -EAGAIN if there is currently no
760 * data to read at consumed position, or 0 if the get operation succeeds.
761 */
762 int lib_ring_buffer_get_subbuf(struct lttng_ust_lib_ring_buffer *buf,
763 unsigned long consumed,
764 struct lttng_ust_shm_handle *handle)
765 {
766 struct channel *chan = shmp(handle, buf->backend.chan);
767 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
768 unsigned long consumed_cur, consumed_idx, commit_count, write_offset;
769 int ret;
770 int finalized;
771
772 retry:
773 finalized = CMM_ACCESS_ONCE(buf->finalized);
774 /*
775 * Read finalized before counters.
776 */
777 cmm_smp_rmb();
778 consumed_cur = uatomic_read(&buf->consumed);
779 consumed_idx = subbuf_index(consumed, chan);
780 commit_count = v_read(config, &shmp_index(handle, buf->commit_cold, consumed_idx)->cc_sb);
781 /*
782 * Make sure we read the commit count before reading the buffer
783 * data and the write offset. Correct consumed offset ordering
784 * wrt commit count is insured by the use of cmpxchg to update
785 * the consumed offset.
786 */
787 /*
788 * Local rmb to match the remote wmb to read the commit count
789 * before the buffer data and the write offset.
790 */
791 cmm_smp_rmb();
792
793 write_offset = v_read(config, &buf->offset);
794
795 /*
796 * Check that the buffer we are getting is after or at consumed_cur
797 * position.
798 */
799 if ((long) subbuf_trunc(consumed, chan)
800 - (long) subbuf_trunc(consumed_cur, chan) < 0)
801 goto nodata;
802
803 /*
804 * Check that the subbuffer we are trying to consume has been
805 * already fully committed.
806 */
807 if (((commit_count - chan->backend.subbuf_size)
808 & chan->commit_count_mask)
809 - (buf_trunc(consumed_cur, chan)
810 >> chan->backend.num_subbuf_order)
811 != 0)
812 goto nodata;
813
814 /*
815 * Check that we are not about to read the same subbuffer in
816 * which the writer head is.
817 */
818 if (subbuf_trunc(write_offset, chan) - subbuf_trunc(consumed_cur, chan)
819 == 0)
820 goto nodata;
821
822 /*
823 * Failure to get the subbuffer causes a busy-loop retry without going
824 * to a wait queue. These are caused by short-lived race windows where
825 * the writer is getting access to a subbuffer we were trying to get
826 * access to. Also checks that the "consumed" buffer count we are
827 * looking for matches the one contained in the subbuffer id.
828 */
829 ret = update_read_sb_index(config, &buf->backend, &chan->backend,
830 consumed_idx, buf_trunc_val(consumed, chan),
831 handle);
832 if (ret)
833 goto retry;
834 subbuffer_id_clear_noref(config, &buf->backend.buf_rsb.id);
835
836 buf->get_subbuf_consumed = consumed;
837 buf->get_subbuf = 1;
838
839 return 0;
840
841 nodata:
842 /*
843 * The memory barriers __wait_event()/wake_up_interruptible() take care
844 * of "raw_spin_is_locked" memory ordering.
845 */
846 if (finalized)
847 return -ENODATA;
848 else
849 return -EAGAIN;
850 }
851
852 /**
853 * lib_ring_buffer_put_subbuf - release exclusive subbuffer access
854 * @buf: ring buffer
855 */
856 void lib_ring_buffer_put_subbuf(struct lttng_ust_lib_ring_buffer *buf,
857 struct lttng_ust_shm_handle *handle)
858 {
859 struct lttng_ust_lib_ring_buffer_backend *bufb = &buf->backend;
860 struct channel *chan = shmp(handle, bufb->chan);
861 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
862 unsigned long read_sb_bindex, consumed_idx, consumed;
863
864 CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1
865 && uatomic_read(&buf->active_shadow_readers) != 1);
866
867 if (!buf->get_subbuf) {
868 /*
869 * Reader puts a subbuffer it did not get.
870 */
871 CHAN_WARN_ON(chan, 1);
872 return;
873 }
874 consumed = buf->get_subbuf_consumed;
875 buf->get_subbuf = 0;
876
877 /*
878 * Clear the records_unread counter. (overruns counter)
879 * Can still be non-zero if a file reader simply grabbed the data
880 * without using iterators.
881 * Can be below zero if an iterator is used on a snapshot more than
882 * once.
883 */
884 read_sb_bindex = subbuffer_id_get_index(config, bufb->buf_rsb.id);
885 v_add(config, v_read(config,
886 &shmp(handle, shmp_index(handle, bufb->array, read_sb_bindex)->shmp)->records_unread),
887 &bufb->records_read);
888 v_set(config, &shmp(handle, shmp_index(handle, bufb->array, read_sb_bindex)->shmp)->records_unread, 0);
889 CHAN_WARN_ON(chan, config->mode == RING_BUFFER_OVERWRITE
890 && subbuffer_id_is_noref(config, bufb->buf_rsb.id));
891 subbuffer_id_set_noref(config, &bufb->buf_rsb.id);
892
893 /*
894 * Exchange the reader subbuffer with the one we put in its place in the
895 * writer subbuffer table. Expect the original consumed count. If
896 * update_read_sb_index fails, this is because the writer updated the
897 * subbuffer concurrently. We should therefore keep the subbuffer we
898 * currently have: it has become invalid to try reading this sub-buffer
899 * consumed count value anyway.
900 */
901 consumed_idx = subbuf_index(consumed, chan);
902 update_read_sb_index(config, &buf->backend, &chan->backend,
903 consumed_idx, buf_trunc_val(consumed, chan),
904 handle);
905 /*
906 * update_read_sb_index return value ignored. Don't exchange sub-buffer
907 * if the writer concurrently updated it.
908 */
909 }
910
911 /*
912 * cons_offset is an iterator on all subbuffer offsets between the reader
913 * position and the writer position. (inclusive)
914 */
915 static
916 void lib_ring_buffer_print_subbuffer_errors(struct lttng_ust_lib_ring_buffer *buf,
917 struct channel *chan,
918 unsigned long cons_offset,
919 int cpu,
920 struct lttng_ust_shm_handle *handle)
921 {
922 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
923 unsigned long cons_idx, commit_count, commit_count_sb;
924
925 cons_idx = subbuf_index(cons_offset, chan);
926 commit_count = v_read(config, &shmp_index(handle, buf->commit_hot, cons_idx)->cc);
927 commit_count_sb = v_read(config, &shmp_index(handle, buf->commit_cold, cons_idx)->cc_sb);
928
929 if (subbuf_offset(commit_count, chan) != 0)
930 DBG("ring buffer %s, cpu %d: "
931 "commit count in subbuffer %lu,\n"
932 "expecting multiples of %lu bytes\n"
933 " [ %lu bytes committed, %lu bytes reader-visible ]\n",
934 chan->backend.name, cpu, cons_idx,
935 chan->backend.subbuf_size,
936 commit_count, commit_count_sb);
937
938 DBG("ring buffer: %s, cpu %d: %lu bytes committed\n",
939 chan->backend.name, cpu, commit_count);
940 }
941
942 static
943 void lib_ring_buffer_print_buffer_errors(struct lttng_ust_lib_ring_buffer *buf,
944 struct channel *chan,
945 void *priv, int cpu,
946 struct lttng_ust_shm_handle *handle)
947 {
948 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
949 unsigned long write_offset, cons_offset;
950
951 /*
952 * No need to order commit_count, write_offset and cons_offset reads
953 * because we execute at teardown when no more writer nor reader
954 * references are left.
955 */
956 write_offset = v_read(config, &buf->offset);
957 cons_offset = uatomic_read(&buf->consumed);
958 if (write_offset != cons_offset)
959 DBG("ring buffer %s, cpu %d: "
960 "non-consumed data\n"
961 " [ %lu bytes written, %lu bytes read ]\n",
962 chan->backend.name, cpu, write_offset, cons_offset);
963
964 for (cons_offset = uatomic_read(&buf->consumed);
965 (long) (subbuf_trunc((unsigned long) v_read(config, &buf->offset),
966 chan)
967 - cons_offset) > 0;
968 cons_offset = subbuf_align(cons_offset, chan))
969 lib_ring_buffer_print_subbuffer_errors(buf, chan, cons_offset,
970 cpu, handle);
971 }
972
973 static
974 void lib_ring_buffer_print_errors(struct channel *chan,
975 struct lttng_ust_lib_ring_buffer *buf, int cpu,
976 struct lttng_ust_shm_handle *handle)
977 {
978 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
979 void *priv = channel_get_private(chan);
980
981 DBG("ring buffer %s, cpu %d: %lu records written, "
982 "%lu records overrun\n",
983 chan->backend.name, cpu,
984 v_read(config, &buf->records_count),
985 v_read(config, &buf->records_overrun));
986
987 if (v_read(config, &buf->records_lost_full)
988 || v_read(config, &buf->records_lost_wrap)
989 || v_read(config, &buf->records_lost_big))
990 DBG("ring buffer %s, cpu %d: records were lost. Caused by:\n"
991 " [ %lu buffer full, %lu nest buffer wrap-around, "
992 "%lu event too big ]\n",
993 chan->backend.name, cpu,
994 v_read(config, &buf->records_lost_full),
995 v_read(config, &buf->records_lost_wrap),
996 v_read(config, &buf->records_lost_big));
997
998 lib_ring_buffer_print_buffer_errors(buf, chan, priv, cpu, handle);
999 }
1000
1001 /*
1002 * lib_ring_buffer_switch_old_start: Populate old subbuffer header.
1003 *
1004 * Only executed when the buffer is finalized, in SWITCH_FLUSH.
1005 */
1006 static
1007 void lib_ring_buffer_switch_old_start(struct lttng_ust_lib_ring_buffer *buf,
1008 struct channel *chan,
1009 struct switch_offsets *offsets,
1010 uint64_t tsc,
1011 struct lttng_ust_shm_handle *handle)
1012 {
1013 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
1014 unsigned long oldidx = subbuf_index(offsets->old, chan);
1015 unsigned long commit_count;
1016
1017 config->cb.buffer_begin(buf, tsc, oldidx, handle);
1018
1019 /*
1020 * Order all writes to buffer before the commit count update that will
1021 * determine that the subbuffer is full.
1022 */
1023 cmm_smp_wmb();
1024 v_add(config, config->cb.subbuffer_header_size(),
1025 &shmp_index(handle, buf->commit_hot, oldidx)->cc);
1026 commit_count = v_read(config, &shmp_index(handle, buf->commit_hot, oldidx)->cc);
1027 /* Check if the written buffer has to be delivered */
1028 lib_ring_buffer_check_deliver(config, buf, chan, offsets->old,
1029 commit_count, oldidx, handle);
1030 lib_ring_buffer_write_commit_counter(config, buf, chan, oldidx,
1031 offsets->old, commit_count,
1032 config->cb.subbuffer_header_size(),
1033 handle);
1034 }
1035
1036 /*
1037 * lib_ring_buffer_switch_old_end: switch old subbuffer
1038 *
1039 * Note : offset_old should never be 0 here. It is ok, because we never perform
1040 * buffer switch on an empty subbuffer in SWITCH_ACTIVE mode. The caller
1041 * increments the offset_old value when doing a SWITCH_FLUSH on an empty
1042 * subbuffer.
1043 */
1044 static
1045 void lib_ring_buffer_switch_old_end(struct lttng_ust_lib_ring_buffer *buf,
1046 struct channel *chan,
1047 struct switch_offsets *offsets,
1048 uint64_t tsc,
1049 struct lttng_ust_shm_handle *handle)
1050 {
1051 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
1052 unsigned long oldidx = subbuf_index(offsets->old - 1, chan);
1053 unsigned long commit_count, padding_size, data_size;
1054
1055 data_size = subbuf_offset(offsets->old - 1, chan) + 1;
1056 padding_size = chan->backend.subbuf_size - data_size;
1057 subbuffer_set_data_size(config, &buf->backend, oldidx, data_size,
1058 handle);
1059
1060 /*
1061 * Order all writes to buffer before the commit count update that will
1062 * determine that the subbuffer is full.
1063 */
1064 cmm_smp_wmb();
1065 v_add(config, padding_size, &shmp_index(handle, buf->commit_hot, oldidx)->cc);
1066 commit_count = v_read(config, &shmp_index(handle, buf->commit_hot, oldidx)->cc);
1067 lib_ring_buffer_check_deliver(config, buf, chan, offsets->old - 1,
1068 commit_count, oldidx, handle);
1069 lib_ring_buffer_write_commit_counter(config, buf, chan, oldidx,
1070 offsets->old, commit_count,
1071 padding_size, handle);
1072 }
1073
1074 /*
1075 * lib_ring_buffer_switch_new_start: Populate new subbuffer.
1076 *
1077 * This code can be executed unordered : writers may already have written to the
1078 * sub-buffer before this code gets executed, caution. The commit makes sure
1079 * that this code is executed before the deliver of this sub-buffer.
1080 */
1081 static
1082 void lib_ring_buffer_switch_new_start(struct lttng_ust_lib_ring_buffer *buf,
1083 struct channel *chan,
1084 struct switch_offsets *offsets,
1085 uint64_t tsc,
1086 struct lttng_ust_shm_handle *handle)
1087 {
1088 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
1089 unsigned long beginidx = subbuf_index(offsets->begin, chan);
1090 unsigned long commit_count;
1091
1092 config->cb.buffer_begin(buf, tsc, beginidx, handle);
1093
1094 /*
1095 * Order all writes to buffer before the commit count update that will
1096 * determine that the subbuffer is full.
1097 */
1098 cmm_smp_wmb();
1099 v_add(config, config->cb.subbuffer_header_size(),
1100 &shmp_index(handle, buf->commit_hot, beginidx)->cc);
1101 commit_count = v_read(config, &shmp_index(handle, buf->commit_hot, beginidx)->cc);
1102 /* Check if the written buffer has to be delivered */
1103 lib_ring_buffer_check_deliver(config, buf, chan, offsets->begin,
1104 commit_count, beginidx, handle);
1105 lib_ring_buffer_write_commit_counter(config, buf, chan, beginidx,
1106 offsets->begin, commit_count,
1107 config->cb.subbuffer_header_size(),
1108 handle);
1109 }
1110
1111 /*
1112 * lib_ring_buffer_switch_new_end: finish switching current subbuffer
1113 *
1114 * The only remaining threads could be the ones with pending commits. They will
1115 * have to do the deliver themselves.
1116 */
1117 static
1118 void lib_ring_buffer_switch_new_end(struct lttng_ust_lib_ring_buffer *buf,
1119 struct channel *chan,
1120 struct switch_offsets *offsets,
1121 uint64_t tsc,
1122 struct lttng_ust_shm_handle *handle)
1123 {
1124 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
1125 unsigned long endidx = subbuf_index(offsets->end - 1, chan);
1126 unsigned long commit_count, padding_size, data_size;
1127
1128 data_size = subbuf_offset(offsets->end - 1, chan) + 1;
1129 padding_size = chan->backend.subbuf_size - data_size;
1130 subbuffer_set_data_size(config, &buf->backend, endidx, data_size,
1131 handle);
1132
1133 /*
1134 * Order all writes to buffer before the commit count update that will
1135 * determine that the subbuffer is full.
1136 */
1137 cmm_smp_wmb();
1138 v_add(config, padding_size, &shmp_index(handle, buf->commit_hot, endidx)->cc);
1139 commit_count = v_read(config, &shmp_index(handle, buf->commit_hot, endidx)->cc);
1140 lib_ring_buffer_check_deliver(config, buf, chan, offsets->end - 1,
1141 commit_count, endidx, handle);
1142 lib_ring_buffer_write_commit_counter(config, buf, chan, endidx,
1143 offsets->end, commit_count,
1144 padding_size, handle);
1145 }
1146
1147 /*
1148 * Returns :
1149 * 0 if ok
1150 * !0 if execution must be aborted.
1151 */
1152 static
1153 int lib_ring_buffer_try_switch_slow(enum switch_mode mode,
1154 struct lttng_ust_lib_ring_buffer *buf,
1155 struct channel *chan,
1156 struct switch_offsets *offsets,
1157 uint64_t *tsc)
1158 {
1159 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
1160 unsigned long off;
1161
1162 offsets->begin = v_read(config, &buf->offset);
1163 offsets->old = offsets->begin;
1164 offsets->switch_old_start = 0;
1165 off = subbuf_offset(offsets->begin, chan);
1166
1167 *tsc = config->cb.ring_buffer_clock_read(chan);
1168
1169 /*
1170 * Ensure we flush the header of an empty subbuffer when doing the
1171 * finalize (SWITCH_FLUSH). This ensures that we end up knowing the
1172 * total data gathering duration even if there were no records saved
1173 * after the last buffer switch.
1174 * In SWITCH_ACTIVE mode, switch the buffer when it contains events.
1175 * SWITCH_ACTIVE only flushes the current subbuffer, dealing with end of
1176 * subbuffer header as appropriate.
1177 * The next record that reserves space will be responsible for
1178 * populating the following subbuffer header. We choose not to populate
1179 * the next subbuffer header here because we want to be able to use
1180 * SWITCH_ACTIVE for periodical buffer flush, which must
1181 * guarantee that all the buffer content (records and header
1182 * timestamps) are visible to the reader. This is required for
1183 * quiescence guarantees for the fusion merge.
1184 */
1185 if (mode == SWITCH_FLUSH || off > 0) {
1186 if (caa_unlikely(off == 0)) {
1187 /*
1188 * The client does not save any header information.
1189 * Don't switch empty subbuffer on finalize, because it
1190 * is invalid to deliver a completely empty subbuffer.
1191 */
1192 if (!config->cb.subbuffer_header_size())
1193 return -1;
1194 /*
1195 * Need to write the subbuffer start header on finalize.
1196 */
1197 offsets->switch_old_start = 1;
1198 }
1199 offsets->begin = subbuf_align(offsets->begin, chan);
1200 } else
1201 return -1; /* we do not have to switch : buffer is empty */
1202 /* Note: old points to the next subbuf at offset 0 */
1203 offsets->end = offsets->begin;
1204 return 0;
1205 }
1206
1207 /*
1208 * Force a sub-buffer switch. This operation is completely reentrant : can be
1209 * called while tracing is active with absolutely no lock held.
1210 *
1211 * Note, however, that as a v_cmpxchg is used for some atomic
1212 * operations, this function must be called from the CPU which owns the buffer
1213 * for a ACTIVE flush.
1214 */
1215 void lib_ring_buffer_switch_slow(struct lttng_ust_lib_ring_buffer *buf, enum switch_mode mode,
1216 struct lttng_ust_shm_handle *handle)
1217 {
1218 struct channel *chan = shmp(handle, buf->backend.chan);
1219 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
1220 struct switch_offsets offsets;
1221 unsigned long oldidx;
1222 uint64_t tsc;
1223
1224 offsets.size = 0;
1225
1226 /*
1227 * Perform retryable operations.
1228 */
1229 do {
1230 if (lib_ring_buffer_try_switch_slow(mode, buf, chan, &offsets,
1231 &tsc))
1232 return; /* Switch not needed */
1233 } while (v_cmpxchg(config, &buf->offset, offsets.old, offsets.end)
1234 != offsets.old);
1235
1236 /*
1237 * Atomically update last_tsc. This update races against concurrent
1238 * atomic updates, but the race will always cause supplementary full TSC
1239 * records, never the opposite (missing a full TSC record when it would
1240 * be needed).
1241 */
1242 save_last_tsc(config, buf, tsc);
1243
1244 /*
1245 * Push the reader if necessary
1246 */
1247 lib_ring_buffer_reserve_push_reader(buf, chan, offsets.old);
1248
1249 oldidx = subbuf_index(offsets.old, chan);
1250 lib_ring_buffer_clear_noref(config, &buf->backend, oldidx, handle);
1251
1252 /*
1253 * May need to populate header start on SWITCH_FLUSH.
1254 */
1255 if (offsets.switch_old_start) {
1256 lib_ring_buffer_switch_old_start(buf, chan, &offsets, tsc, handle);
1257 offsets.old += config->cb.subbuffer_header_size();
1258 }
1259
1260 /*
1261 * Switch old subbuffer.
1262 */
1263 lib_ring_buffer_switch_old_end(buf, chan, &offsets, tsc, handle);
1264 }
1265
1266 /*
1267 * Returns :
1268 * 0 if ok
1269 * -ENOSPC if event size is too large for packet.
1270 * -ENOBUFS if there is currently not enough space in buffer for the event.
1271 * -EIO if data cannot be written into the buffer for any other reason.
1272 */
1273 static
1274 int lib_ring_buffer_try_reserve_slow(struct lttng_ust_lib_ring_buffer *buf,
1275 struct channel *chan,
1276 struct switch_offsets *offsets,
1277 struct lttng_ust_lib_ring_buffer_ctx *ctx)
1278 {
1279 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
1280 struct lttng_ust_shm_handle *handle = ctx->handle;
1281 unsigned long reserve_commit_diff;
1282
1283 offsets->begin = v_read(config, &buf->offset);
1284 offsets->old = offsets->begin;
1285 offsets->switch_new_start = 0;
1286 offsets->switch_new_end = 0;
1287 offsets->switch_old_end = 0;
1288 offsets->pre_header_padding = 0;
1289
1290 ctx->tsc = config->cb.ring_buffer_clock_read(chan);
1291 if ((int64_t) ctx->tsc == -EIO)
1292 return -EIO;
1293
1294 if (last_tsc_overflow(config, buf, ctx->tsc))
1295 ctx->rflags |= RING_BUFFER_RFLAG_FULL_TSC;
1296
1297 if (caa_unlikely(subbuf_offset(offsets->begin, ctx->chan) == 0)) {
1298 offsets->switch_new_start = 1; /* For offsets->begin */
1299 } else {
1300 offsets->size = config->cb.record_header_size(config, chan,
1301 offsets->begin,
1302 &offsets->pre_header_padding,
1303 ctx);
1304 offsets->size +=
1305 lib_ring_buffer_align(offsets->begin + offsets->size,
1306 ctx->largest_align)
1307 + ctx->data_size;
1308 if (caa_unlikely(subbuf_offset(offsets->begin, chan) +
1309 offsets->size > chan->backend.subbuf_size)) {
1310 offsets->switch_old_end = 1; /* For offsets->old */
1311 offsets->switch_new_start = 1; /* For offsets->begin */
1312 }
1313 }
1314 if (caa_unlikely(offsets->switch_new_start)) {
1315 unsigned long sb_index;
1316
1317 /*
1318 * We are typically not filling the previous buffer completely.
1319 */
1320 if (caa_likely(offsets->switch_old_end))
1321 offsets->begin = subbuf_align(offsets->begin, chan);
1322 offsets->begin = offsets->begin
1323 + config->cb.subbuffer_header_size();
1324 /* Test new buffer integrity */
1325 sb_index = subbuf_index(offsets->begin, chan);
1326 reserve_commit_diff =
1327 (buf_trunc(offsets->begin, chan)
1328 >> chan->backend.num_subbuf_order)
1329 - ((unsigned long) v_read(config,
1330 &shmp_index(handle, buf->commit_cold, sb_index)->cc_sb)
1331 & chan->commit_count_mask);
1332 if (caa_likely(reserve_commit_diff == 0)) {
1333 /* Next subbuffer not being written to. */
1334 if (caa_unlikely(config->mode != RING_BUFFER_OVERWRITE &&
1335 subbuf_trunc(offsets->begin, chan)
1336 - subbuf_trunc((unsigned long)
1337 uatomic_read(&buf->consumed), chan)
1338 >= chan->backend.buf_size)) {
1339 /*
1340 * We do not overwrite non consumed buffers
1341 * and we are full : record is lost.
1342 */
1343 v_inc(config, &buf->records_lost_full);
1344 return -ENOBUFS;
1345 } else {
1346 /*
1347 * Next subbuffer not being written to, and we
1348 * are either in overwrite mode or the buffer is
1349 * not full. It's safe to write in this new
1350 * subbuffer.
1351 */
1352 }
1353 } else {
1354 /*
1355 * Next subbuffer reserve offset does not match the
1356 * commit offset. Drop record in producer-consumer and
1357 * overwrite mode. Caused by either a writer OOPS or too
1358 * many nested writes over a reserve/commit pair.
1359 */
1360 v_inc(config, &buf->records_lost_wrap);
1361 return -EIO;
1362 }
1363 offsets->size =
1364 config->cb.record_header_size(config, chan,
1365 offsets->begin,
1366 &offsets->pre_header_padding,
1367 ctx);
1368 offsets->size +=
1369 lib_ring_buffer_align(offsets->begin + offsets->size,
1370 ctx->largest_align)
1371 + ctx->data_size;
1372 if (caa_unlikely(subbuf_offset(offsets->begin, chan)
1373 + offsets->size > chan->backend.subbuf_size)) {
1374 /*
1375 * Record too big for subbuffers, report error, don't
1376 * complete the sub-buffer switch.
1377 */
1378 v_inc(config, &buf->records_lost_big);
1379 return -ENOSPC;
1380 } else {
1381 /*
1382 * We just made a successful buffer switch and the
1383 * record fits in the new subbuffer. Let's write.
1384 */
1385 }
1386 } else {
1387 /*
1388 * Record fits in the current buffer and we are not on a switch
1389 * boundary. It's safe to write.
1390 */
1391 }
1392 offsets->end = offsets->begin + offsets->size;
1393
1394 if (caa_unlikely(subbuf_offset(offsets->end, chan) == 0)) {
1395 /*
1396 * The offset_end will fall at the very beginning of the next
1397 * subbuffer.
1398 */
1399 offsets->switch_new_end = 1; /* For offsets->begin */
1400 }
1401 return 0;
1402 }
1403
1404 /**
1405 * lib_ring_buffer_reserve_slow - Atomic slot reservation in a buffer.
1406 * @ctx: ring buffer context.
1407 *
1408 * Return : -NOBUFS if not enough space, -ENOSPC if event size too large,
1409 * -EIO for other errors, else returns 0.
1410 * It will take care of sub-buffer switching.
1411 */
1412 int lib_ring_buffer_reserve_slow(struct lttng_ust_lib_ring_buffer_ctx *ctx)
1413 {
1414 struct channel *chan = ctx->chan;
1415 struct lttng_ust_shm_handle *handle = ctx->handle;
1416 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
1417 struct lttng_ust_lib_ring_buffer *buf;
1418 struct switch_offsets offsets;
1419 int ret;
1420
1421 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
1422 buf = shmp(handle, chan->backend.buf[ctx->cpu].shmp);
1423 else
1424 buf = shmp(handle, chan->backend.buf[0].shmp);
1425 ctx->buf = buf;
1426
1427 offsets.size = 0;
1428
1429 do {
1430 ret = lib_ring_buffer_try_reserve_slow(buf, chan, &offsets,
1431 ctx);
1432 if (caa_unlikely(ret))
1433 return ret;
1434 } while (caa_unlikely(v_cmpxchg(config, &buf->offset, offsets.old,
1435 offsets.end)
1436 != offsets.old));
1437
1438 /*
1439 * Atomically update last_tsc. This update races against concurrent
1440 * atomic updates, but the race will always cause supplementary full TSC
1441 * records, never the opposite (missing a full TSC record when it would
1442 * be needed).
1443 */
1444 save_last_tsc(config, buf, ctx->tsc);
1445
1446 /*
1447 * Push the reader if necessary
1448 */
1449 lib_ring_buffer_reserve_push_reader(buf, chan, offsets.end - 1);
1450
1451 /*
1452 * Clear noref flag for this subbuffer.
1453 */
1454 lib_ring_buffer_clear_noref(config, &buf->backend,
1455 subbuf_index(offsets.end - 1, chan),
1456 handle);
1457
1458 /*
1459 * Switch old subbuffer if needed.
1460 */
1461 if (caa_unlikely(offsets.switch_old_end)) {
1462 lib_ring_buffer_clear_noref(config, &buf->backend,
1463 subbuf_index(offsets.old - 1, chan),
1464 handle);
1465 lib_ring_buffer_switch_old_end(buf, chan, &offsets, ctx->tsc, handle);
1466 }
1467
1468 /*
1469 * Populate new subbuffer.
1470 */
1471 if (caa_unlikely(offsets.switch_new_start))
1472 lib_ring_buffer_switch_new_start(buf, chan, &offsets, ctx->tsc, handle);
1473
1474 if (caa_unlikely(offsets.switch_new_end))
1475 lib_ring_buffer_switch_new_end(buf, chan, &offsets, ctx->tsc, handle);
1476
1477 ctx->slot_size = offsets.size;
1478 ctx->pre_offset = offsets.begin;
1479 ctx->buf_offset = offsets.begin + offsets.pre_header_padding;
1480 return 0;
1481 }
This page took 0.09401 seconds and 5 git commands to generate.