Move LTTng-UST buffer ownership from application to consumer
[lttng-ust.git] / libringbuffer / ring_buffer_frontend.c
1 /*
2 * ring_buffer_frontend.c
3 *
4 * Copyright (C) 2005-2012 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; only
9 * version 2.1 of the License.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with this library; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19 *
20 *
21 * Ring buffer wait-free buffer synchronization. Producer-consumer and flight
22 * recorder (overwrite) modes. See thesis:
23 *
24 * Desnoyers, Mathieu (2009), "Low-Impact Operating System Tracing", Ph.D.
25 * dissertation, Ecole Polytechnique de Montreal.
26 * http://www.lttng.org/pub/thesis/desnoyers-dissertation-2009-12.pdf
27 *
28 * - Algorithm presentation in Chapter 5:
29 * "Lockless Multi-Core High-Throughput Buffering".
30 * - Algorithm formal verification in Section 8.6:
31 * "Formal verification of LTTng"
32 *
33 * Author:
34 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
35 *
36 * Inspired from LTT and RelayFS:
37 * Karim Yaghmour <karim@opersys.com>
38 * Tom Zanussi <zanussi@us.ibm.com>
39 * Bob Wisniewski <bob@watson.ibm.com>
40 * And from K42 :
41 * Bob Wisniewski <bob@watson.ibm.com>
42 *
43 * Buffer reader semantic :
44 *
45 * - get_subbuf_size
46 * while buffer is not finalized and empty
47 * - get_subbuf
48 * - if return value != 0, continue
49 * - splice one subbuffer worth of data to a pipe
50 * - splice the data from pipe to disk/network
51 * - put_subbuf
52 */
53
54 #define _GNU_SOURCE
55 #include <sys/types.h>
56 #include <sys/mman.h>
57 #include <sys/stat.h>
58 #include <fcntl.h>
59 #include <urcu/compiler.h>
60 #include <urcu/ref.h>
61 #include <helper.h>
62
63 #include "smp.h"
64 #include <lttng/ringbuffer-config.h>
65 #include "vatomic.h"
66 #include "backend.h"
67 #include "frontend.h"
68 #include "shm.h"
69 #include "tlsfixup.h"
70 #include "../liblttng-ust/compat.h" /* For ENODATA */
71
72 #ifndef max
73 #define max(a, b) ((a) > (b) ? (a) : (b))
74 #endif
75
76 /* Print DBG() messages about events lost only every 1048576 hits */
77 #define DBG_PRINT_NR_LOST (1UL << 20)
78
79 /*
80 * Use POSIX SHM: shm_open(3) and shm_unlink(3).
81 * close(2) to close the fd returned by shm_open.
82 * shm_unlink releases the shared memory object name.
83 * ftruncate(2) sets the size of the memory object.
84 * mmap/munmap maps the shared memory obj to a virtual address in the
85 * calling proceess (should be done both in libust and consumer).
86 * See shm_overview(7) for details.
87 * Pass file descriptor returned by shm_open(3) to ltt-sessiond through
88 * a UNIX socket.
89 *
90 * Since we don't need to access the object using its name, we can
91 * immediately shm_unlink(3) it, and only keep the handle with its file
92 * descriptor.
93 */
94
95 /*
96 * Internal structure representing offsets to use at a sub-buffer switch.
97 */
98 struct switch_offsets {
99 unsigned long begin, end, old;
100 size_t pre_header_padding, size;
101 unsigned int switch_new_start:1, switch_new_end:1, switch_old_start:1,
102 switch_old_end:1;
103 };
104
105 __thread unsigned int lib_ring_buffer_nesting;
106
107 /*
108 * TODO: this is unused. Errors are saved within the ring buffer.
109 * Eventually, allow consumerd to print these errors.
110 */
111 static
112 void lib_ring_buffer_print_errors(struct channel *chan,
113 struct lttng_ust_lib_ring_buffer *buf, int cpu,
114 struct lttng_ust_shm_handle *handle)
115 __attribute__((unused));
116
117 /**
118 * lib_ring_buffer_reset - Reset ring buffer to initial values.
119 * @buf: Ring buffer.
120 *
121 * Effectively empty the ring buffer. Should be called when the buffer is not
122 * used for writing. The ring buffer can be opened for reading, but the reader
123 * should not be using the iterator concurrently with reset. The previous
124 * current iterator record is reset.
125 */
126 void lib_ring_buffer_reset(struct lttng_ust_lib_ring_buffer *buf,
127 struct lttng_ust_shm_handle *handle)
128 {
129 struct channel *chan = shmp(handle, buf->backend.chan);
130 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
131 unsigned int i;
132
133 /*
134 * Reset iterator first. It will put the subbuffer if it currently holds
135 * it.
136 */
137 v_set(config, &buf->offset, 0);
138 for (i = 0; i < chan->backend.num_subbuf; i++) {
139 v_set(config, &shmp_index(handle, buf->commit_hot, i)->cc, 0);
140 v_set(config, &shmp_index(handle, buf->commit_hot, i)->seq, 0);
141 v_set(config, &shmp_index(handle, buf->commit_cold, i)->cc_sb, 0);
142 }
143 uatomic_set(&buf->consumed, 0);
144 uatomic_set(&buf->record_disabled, 0);
145 v_set(config, &buf->last_tsc, 0);
146 lib_ring_buffer_backend_reset(&buf->backend, handle);
147 /* Don't reset number of active readers */
148 v_set(config, &buf->records_lost_full, 0);
149 v_set(config, &buf->records_lost_wrap, 0);
150 v_set(config, &buf->records_lost_big, 0);
151 v_set(config, &buf->records_count, 0);
152 v_set(config, &buf->records_overrun, 0);
153 buf->finalized = 0;
154 }
155
156 /**
157 * channel_reset - Reset channel to initial values.
158 * @chan: Channel.
159 *
160 * Effectively empty the channel. Should be called when the channel is not used
161 * for writing. The channel can be opened for reading, but the reader should not
162 * be using the iterator concurrently with reset. The previous current iterator
163 * record is reset.
164 */
165 void channel_reset(struct channel *chan)
166 {
167 /*
168 * Reset iterators first. Will put the subbuffer if held for reading.
169 */
170 uatomic_set(&chan->record_disabled, 0);
171 /* Don't reset commit_count_mask, still valid */
172 channel_backend_reset(&chan->backend);
173 /* Don't reset switch/read timer interval */
174 /* Don't reset notifiers and notifier enable bits */
175 /* Don't reset reader reference count */
176 }
177
178 /*
179 * Must be called under cpu hotplug protection.
180 */
181 int lib_ring_buffer_create(struct lttng_ust_lib_ring_buffer *buf,
182 struct channel_backend *chanb, int cpu,
183 struct lttng_ust_shm_handle *handle,
184 struct shm_object *shmobj)
185 {
186 const struct lttng_ust_lib_ring_buffer_config *config = &chanb->config;
187 struct channel *chan = caa_container_of(chanb, struct channel, backend);
188 void *priv = channel_get_private(chan);
189 size_t subbuf_header_size;
190 uint64_t tsc;
191 int ret;
192
193 /* Test for cpu hotplug */
194 if (buf->backend.allocated)
195 return 0;
196
197 ret = lib_ring_buffer_backend_create(&buf->backend, &chan->backend,
198 cpu, handle, shmobj);
199 if (ret)
200 return ret;
201
202 align_shm(shmobj, __alignof__(struct commit_counters_hot));
203 set_shmp(buf->commit_hot,
204 zalloc_shm(shmobj,
205 sizeof(struct commit_counters_hot) * chan->backend.num_subbuf));
206 if (!shmp(handle, buf->commit_hot)) {
207 ret = -ENOMEM;
208 goto free_chanbuf;
209 }
210
211 align_shm(shmobj, __alignof__(struct commit_counters_cold));
212 set_shmp(buf->commit_cold,
213 zalloc_shm(shmobj,
214 sizeof(struct commit_counters_cold) * chan->backend.num_subbuf));
215 if (!shmp(handle, buf->commit_cold)) {
216 ret = -ENOMEM;
217 goto free_commit;
218 }
219
220 /*
221 * Write the subbuffer header for first subbuffer so we know the total
222 * duration of data gathering.
223 */
224 subbuf_header_size = config->cb.subbuffer_header_size();
225 v_set(config, &buf->offset, subbuf_header_size);
226 subbuffer_id_clear_noref(config, &shmp_index(handle, buf->backend.buf_wsb, 0)->id);
227 tsc = config->cb.ring_buffer_clock_read(shmp(handle, buf->backend.chan));
228 config->cb.buffer_begin(buf, tsc, 0, handle);
229 v_add(config, subbuf_header_size, &shmp_index(handle, buf->commit_hot, 0)->cc);
230
231 if (config->cb.buffer_create) {
232 ret = config->cb.buffer_create(buf, priv, cpu, chanb->name, handle);
233 if (ret)
234 goto free_init;
235 }
236 buf->backend.allocated = 1;
237 return 0;
238
239 /* Error handling */
240 free_init:
241 /* commit_cold will be freed by shm teardown */
242 free_commit:
243 /* commit_hot will be freed by shm teardown */
244 free_chanbuf:
245 return ret;
246 }
247
248 #if 0
249 static void switch_buffer_timer(unsigned long data)
250 {
251 struct lttng_ust_lib_ring_buffer *buf = (struct lttng_ust_lib_ring_buffer *)data;
252 struct channel *chan = shmp(handle, buf->backend.chan);
253 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
254
255 /*
256 * Only flush buffers periodically if readers are active.
257 */
258 if (uatomic_read(&buf->active_readers))
259 lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE, handle);
260
261 //TODO timers
262 //if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
263 // mod_timer_pinned(&buf->switch_timer,
264 // jiffies + chan->switch_timer_interval);
265 //else
266 // mod_timer(&buf->switch_timer,
267 // jiffies + chan->switch_timer_interval);
268 }
269 #endif //0
270
271 static void lib_ring_buffer_start_switch_timer(struct lttng_ust_lib_ring_buffer *buf,
272 struct lttng_ust_shm_handle *handle)
273 {
274 struct channel *chan = shmp(handle, buf->backend.chan);
275 //const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
276
277 if (!chan->switch_timer_interval || buf->switch_timer_enabled)
278 return;
279 //TODO
280 //init_timer(&buf->switch_timer);
281 //buf->switch_timer.function = switch_buffer_timer;
282 //buf->switch_timer.expires = jiffies + chan->switch_timer_interval;
283 //buf->switch_timer.data = (unsigned long)buf;
284 //if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
285 // add_timer_on(&buf->switch_timer, buf->backend.cpu);
286 //else
287 // add_timer(&buf->switch_timer);
288 buf->switch_timer_enabled = 1;
289 }
290
291 static void lib_ring_buffer_stop_switch_timer(struct lttng_ust_lib_ring_buffer *buf,
292 struct lttng_ust_shm_handle *handle)
293 {
294 struct channel *chan = shmp(handle, buf->backend.chan);
295
296 if (!chan->switch_timer_interval || !buf->switch_timer_enabled)
297 return;
298
299 //TODO
300 //del_timer_sync(&buf->switch_timer);
301 buf->switch_timer_enabled = 0;
302 }
303
304 #if 0
305 /*
306 * Polling timer to check the channels for data.
307 */
308 static void read_buffer_timer(unsigned long data)
309 {
310 struct lttng_ust_lib_ring_buffer *buf = (struct lttng_ust_lib_ring_buffer *)data;
311 struct channel *chan = shmp(handle, buf->backend.chan);
312 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
313
314 CHAN_WARN_ON(chan, !buf->backend.allocated);
315
316 if (uatomic_read(&buf->active_readers))
317 && lib_ring_buffer_poll_deliver(config, buf, chan)) {
318 //TODO
319 //wake_up_interruptible(&buf->read_wait);
320 //wake_up_interruptible(&chan->read_wait);
321 }
322
323 //TODO
324 //if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
325 // mod_timer_pinned(&buf->read_timer,
326 // jiffies + chan->read_timer_interval);
327 //else
328 // mod_timer(&buf->read_timer,
329 // jiffies + chan->read_timer_interval);
330 }
331 #endif //0
332
333 static void lib_ring_buffer_start_read_timer(struct lttng_ust_lib_ring_buffer *buf,
334 struct lttng_ust_shm_handle *handle)
335 {
336 struct channel *chan = shmp(handle, buf->backend.chan);
337 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
338
339 if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER
340 || !chan->read_timer_interval
341 || buf->read_timer_enabled)
342 return;
343
344 //TODO
345 //init_timer(&buf->read_timer);
346 //buf->read_timer.function = read_buffer_timer;
347 //buf->read_timer.expires = jiffies + chan->read_timer_interval;
348 //buf->read_timer.data = (unsigned long)buf;
349
350 //if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
351 // add_timer_on(&buf->read_timer, buf->backend.cpu);
352 //else
353 // add_timer(&buf->read_timer);
354 buf->read_timer_enabled = 1;
355 }
356
357 static void lib_ring_buffer_stop_read_timer(struct lttng_ust_lib_ring_buffer *buf,
358 struct lttng_ust_shm_handle *handle)
359 {
360 struct channel *chan = shmp(handle, buf->backend.chan);
361 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
362
363 if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER
364 || !chan->read_timer_interval
365 || !buf->read_timer_enabled)
366 return;
367
368 //TODO
369 //del_timer_sync(&buf->read_timer);
370 /*
371 * do one more check to catch data that has been written in the last
372 * timer period.
373 */
374 if (lib_ring_buffer_poll_deliver(config, buf, chan, handle)) {
375 //TODO
376 //wake_up_interruptible(&buf->read_wait);
377 //wake_up_interruptible(&chan->read_wait);
378 }
379 buf->read_timer_enabled = 0;
380 }
381
382 static void channel_unregister_notifiers(struct channel *chan,
383 struct lttng_ust_shm_handle *handle)
384 {
385 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
386 int cpu;
387
388 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
389 for_each_possible_cpu(cpu) {
390 struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[cpu].shmp);
391
392 lib_ring_buffer_stop_switch_timer(buf, handle);
393 lib_ring_buffer_stop_read_timer(buf, handle);
394 }
395 } else {
396 struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[0].shmp);
397
398 lib_ring_buffer_stop_switch_timer(buf, handle);
399 lib_ring_buffer_stop_read_timer(buf, handle);
400 }
401 //channel_backend_unregister_notifiers(&chan->backend);
402 }
403
404 static void channel_free(struct channel *chan, struct lttng_ust_shm_handle *handle)
405 {
406 channel_backend_free(&chan->backend, handle);
407 /* chan is freed by shm teardown */
408 shm_object_table_destroy(handle->table);
409 free(handle);
410 }
411
412 /**
413 * channel_create - Create channel.
414 * @config: ring buffer instance configuration
415 * @name: name of the channel
416 * @priv_data: ring buffer client private data area pointer (output)
417 * @priv_data_size: length, in bytes, of the private data area.
418 * @priv_data_init: initialization data for private data.
419 * @buf_addr: pointer the the beginning of the preallocated buffer contiguous
420 * address mapping. It is used only by RING_BUFFER_STATIC
421 * configuration. It can be set to NULL for other backends.
422 * @subbuf_size: subbuffer size
423 * @num_subbuf: number of subbuffers
424 * @switch_timer_interval: Time interval (in us) to fill sub-buffers with
425 * padding to let readers get those sub-buffers.
426 * Used for live streaming.
427 * @read_timer_interval: Time interval (in us) to wake up pending readers.
428 *
429 * Holds cpu hotplug.
430 * Returns NULL on failure.
431 */
432 struct lttng_ust_shm_handle *channel_create(const struct lttng_ust_lib_ring_buffer_config *config,
433 const char *name,
434 void **priv_data,
435 size_t priv_data_align,
436 size_t priv_data_size,
437 void *priv_data_init,
438 void *buf_addr, size_t subbuf_size,
439 size_t num_subbuf, unsigned int switch_timer_interval,
440 unsigned int read_timer_interval)
441 {
442 int ret, cpu;
443 size_t shmsize, chansize;
444 struct channel *chan;
445 struct lttng_ust_shm_handle *handle;
446 struct shm_object *shmobj;
447 unsigned int nr_streams;
448
449 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
450 nr_streams = num_possible_cpus();
451 else
452 nr_streams = 1;
453
454 if (lib_ring_buffer_check_config(config, switch_timer_interval,
455 read_timer_interval))
456 return NULL;
457
458 handle = zmalloc(sizeof(struct lttng_ust_shm_handle));
459 if (!handle)
460 return NULL;
461
462 /* Allocate table for channel + per-cpu buffers */
463 handle->table = shm_object_table_create(1 + num_possible_cpus());
464 if (!handle->table)
465 goto error_table_alloc;
466
467 /* Calculate the shm allocation layout */
468 shmsize = sizeof(struct channel);
469 shmsize += offset_align(shmsize, __alignof__(struct lttng_ust_lib_ring_buffer_shmp));
470 shmsize += sizeof(struct lttng_ust_lib_ring_buffer_shmp) * nr_streams;
471 chansize = shmsize;
472 if (priv_data_align)
473 shmsize += offset_align(shmsize, priv_data_align);
474 shmsize += priv_data_size;
475
476 /* Allocate normal memory for channel (not shared) */
477 shmobj = shm_object_table_alloc(handle->table, shmsize, SHM_OBJECT_MEM);
478 if (!shmobj)
479 goto error_append;
480 /* struct channel is at object 0, offset 0 (hardcoded) */
481 set_shmp(handle->chan, zalloc_shm(shmobj, chansize));
482 assert(handle->chan._ref.index == 0);
483 assert(handle->chan._ref.offset == 0);
484 chan = shmp(handle, handle->chan);
485 if (!chan)
486 goto error_append;
487 chan->nr_streams = nr_streams;
488
489 /* space for private data */
490 if (priv_data_size) {
491 DECLARE_SHMP(void, priv_data_alloc);
492
493 align_shm(shmobj, priv_data_align);
494 chan->priv_data_offset = shmobj->allocated_len;
495 set_shmp(priv_data_alloc, zalloc_shm(shmobj, priv_data_size));
496 if (!shmp(handle, priv_data_alloc))
497 goto error_append;
498 *priv_data = channel_get_private(chan);
499 memcpy(*priv_data, priv_data_init, priv_data_size);
500 } else {
501 chan->priv_data_offset = -1;
502 if (priv_data)
503 *priv_data = NULL;
504 }
505
506 ret = channel_backend_init(&chan->backend, name, config,
507 subbuf_size, num_subbuf, handle);
508 if (ret)
509 goto error_backend_init;
510
511 chan->commit_count_mask = (~0UL >> chan->backend.num_subbuf_order);
512 //TODO
513 //chan->switch_timer_interval = usecs_to_jiffies(switch_timer_interval);
514 //chan->read_timer_interval = usecs_to_jiffies(read_timer_interval);
515 //TODO
516 //init_waitqueue_head(&chan->read_wait);
517 //init_waitqueue_head(&chan->hp_wait);
518
519 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
520 /*
521 * In case of non-hotplug cpu, if the ring-buffer is allocated
522 * in early initcall, it will not be notified of secondary cpus.
523 * In that off case, we need to allocate for all possible cpus.
524 */
525 for_each_possible_cpu(cpu) {
526 struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[cpu].shmp);
527 lib_ring_buffer_start_switch_timer(buf, handle);
528 lib_ring_buffer_start_read_timer(buf, handle);
529 }
530 } else {
531 struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[0].shmp);
532
533 lib_ring_buffer_start_switch_timer(buf, handle);
534 lib_ring_buffer_start_read_timer(buf, handle);
535 }
536 return handle;
537
538 error_backend_init:
539 error_append:
540 shm_object_table_destroy(handle->table);
541 error_table_alloc:
542 free(handle);
543 return NULL;
544 }
545
546 struct lttng_ust_shm_handle *channel_handle_create(void *data,
547 uint64_t memory_map_size)
548 {
549 struct lttng_ust_shm_handle *handle;
550 struct shm_object *object;
551
552 handle = zmalloc(sizeof(struct lttng_ust_shm_handle));
553 if (!handle)
554 return NULL;
555
556 /* Allocate table for channel + per-cpu buffers */
557 handle->table = shm_object_table_create(1 + num_possible_cpus());
558 if (!handle->table)
559 goto error_table_alloc;
560 /* Add channel object */
561 object = shm_object_table_append_mem(handle->table, data,
562 memory_map_size);
563 if (!object)
564 goto error_table_object;
565 /* struct channel is at object 0, offset 0 (hardcoded) */
566 handle->chan._ref.index = 0;
567 handle->chan._ref.offset = 0;
568 return handle;
569
570 error_table_object:
571 shm_object_table_destroy(handle->table);
572 error_table_alloc:
573 free(handle);
574 return NULL;
575 }
576
577 int channel_handle_add_stream(struct lttng_ust_shm_handle *handle,
578 int shm_fd, int wakeup_fd, uint32_t stream_nr,
579 uint64_t memory_map_size)
580 {
581 struct shm_object *object;
582
583 /* Add stream object */
584 object = shm_object_table_append_shm(handle->table,
585 shm_fd, wakeup_fd, stream_nr,
586 memory_map_size);
587 if (!object)
588 return -EINVAL;
589 return 0;
590 }
591
592 unsigned int channel_handle_get_nr_streams(struct lttng_ust_shm_handle *handle)
593 {
594 assert(handle->table);
595 return handle->table->allocated_len - 1;
596 }
597
598 static
599 void channel_release(struct channel *chan, struct lttng_ust_shm_handle *handle)
600 {
601 channel_free(chan, handle);
602 }
603
604 /**
605 * channel_destroy - Finalize, wait for q.s. and destroy channel.
606 * @chan: channel to destroy
607 *
608 * Holds cpu hotplug.
609 * Call "destroy" callback, finalize channels, decrement the channel
610 * reference count. Note that when readers have completed data
611 * consumption of finalized channels, get_subbuf() will return -ENODATA.
612 * They should release their handle at that point.
613 */
614 void channel_destroy(struct channel *chan, struct lttng_ust_shm_handle *handle,
615 int consumer)
616 {
617 if (consumer) {
618 /*
619 * Note: the consumer takes care of finalizing and
620 * switching the buffers.
621 */
622 channel_unregister_notifiers(chan, handle);
623 }
624
625 /*
626 * sessiond/consumer are keeping a reference on the shm file
627 * descriptor directly. No need to refcount.
628 */
629 channel_release(chan, handle);
630 return;
631 }
632
633 struct lttng_ust_lib_ring_buffer *channel_get_ring_buffer(
634 const struct lttng_ust_lib_ring_buffer_config *config,
635 struct channel *chan, int cpu,
636 struct lttng_ust_shm_handle *handle,
637 int *shm_fd, int *wait_fd,
638 int *wakeup_fd,
639 uint64_t *memory_map_size)
640 {
641 struct shm_ref *ref;
642
643 if (config->alloc == RING_BUFFER_ALLOC_GLOBAL) {
644 cpu = 0;
645 } else {
646 if (cpu >= num_possible_cpus())
647 return NULL;
648 }
649 ref = &chan->backend.buf[cpu].shmp._ref;
650 *shm_fd = shm_get_shm_fd(handle, ref);
651 *wait_fd = shm_get_wait_fd(handle, ref);
652 *wakeup_fd = shm_get_wakeup_fd(handle, ref);
653 if (shm_get_shm_size(handle, ref, memory_map_size))
654 return NULL;
655 return shmp(handle, chan->backend.buf[cpu].shmp);
656 }
657
658 int ring_buffer_close_wait_fd(const struct lttng_ust_lib_ring_buffer_config *config,
659 struct channel *chan,
660 struct lttng_ust_shm_handle *handle,
661 int cpu)
662 {
663 struct shm_ref *ref;
664
665 if (config->alloc == RING_BUFFER_ALLOC_GLOBAL) {
666 cpu = 0;
667 } else {
668 if (cpu >= num_possible_cpus())
669 return -EINVAL;
670 }
671 ref = &chan->backend.buf[cpu].shmp._ref;
672 return shm_close_wait_fd(handle, ref);
673 }
674
675 int ring_buffer_close_wakeup_fd(const struct lttng_ust_lib_ring_buffer_config *config,
676 struct channel *chan,
677 struct lttng_ust_shm_handle *handle,
678 int cpu)
679 {
680 struct shm_ref *ref;
681
682 if (config->alloc == RING_BUFFER_ALLOC_GLOBAL) {
683 cpu = 0;
684 } else {
685 if (cpu >= num_possible_cpus())
686 return -EINVAL;
687 }
688 ref = &chan->backend.buf[cpu].shmp._ref;
689 return shm_close_wakeup_fd(handle, ref);
690 }
691
692 int lib_ring_buffer_open_read(struct lttng_ust_lib_ring_buffer *buf,
693 struct lttng_ust_shm_handle *handle)
694 {
695 if (uatomic_cmpxchg(&buf->active_readers, 0, 1) != 0)
696 return -EBUSY;
697 cmm_smp_mb();
698 return 0;
699 }
700
701 void lib_ring_buffer_release_read(struct lttng_ust_lib_ring_buffer *buf,
702 struct lttng_ust_shm_handle *handle)
703 {
704 struct channel *chan = shmp(handle, buf->backend.chan);
705
706 CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1);
707 cmm_smp_mb();
708 uatomic_dec(&buf->active_readers);
709 }
710
711 /**
712 * lib_ring_buffer_snapshot - save subbuffer position snapshot (for read)
713 * @buf: ring buffer
714 * @consumed: consumed count indicating the position where to read
715 * @produced: produced count, indicates position when to stop reading
716 *
717 * Returns -ENODATA if buffer is finalized, -EAGAIN if there is currently no
718 * data to read at consumed position, or 0 if the get operation succeeds.
719 */
720
721 int lib_ring_buffer_snapshot(struct lttng_ust_lib_ring_buffer *buf,
722 unsigned long *consumed, unsigned long *produced,
723 struct lttng_ust_shm_handle *handle)
724 {
725 struct channel *chan = shmp(handle, buf->backend.chan);
726 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
727 unsigned long consumed_cur, write_offset;
728 int finalized;
729
730 finalized = CMM_ACCESS_ONCE(buf->finalized);
731 /*
732 * Read finalized before counters.
733 */
734 cmm_smp_rmb();
735 consumed_cur = uatomic_read(&buf->consumed);
736 /*
737 * No need to issue a memory barrier between consumed count read and
738 * write offset read, because consumed count can only change
739 * concurrently in overwrite mode, and we keep a sequence counter
740 * identifier derived from the write offset to check we are getting
741 * the same sub-buffer we are expecting (the sub-buffers are atomically
742 * "tagged" upon writes, tags are checked upon read).
743 */
744 write_offset = v_read(config, &buf->offset);
745
746 /*
747 * Check that we are not about to read the same subbuffer in
748 * which the writer head is.
749 */
750 if (subbuf_trunc(write_offset, chan) - subbuf_trunc(consumed_cur, chan)
751 == 0)
752 goto nodata;
753
754 *consumed = consumed_cur;
755 *produced = subbuf_trunc(write_offset, chan);
756
757 return 0;
758
759 nodata:
760 /*
761 * The memory barriers __wait_event()/wake_up_interruptible() take care
762 * of "raw_spin_is_locked" memory ordering.
763 */
764 if (finalized)
765 return -ENODATA;
766 else
767 return -EAGAIN;
768 }
769
770 /**
771 * lib_ring_buffer_put_snapshot - move consumed counter forward
772 * @buf: ring buffer
773 * @consumed_new: new consumed count value
774 */
775 void lib_ring_buffer_move_consumer(struct lttng_ust_lib_ring_buffer *buf,
776 unsigned long consumed_new,
777 struct lttng_ust_shm_handle *handle)
778 {
779 struct lttng_ust_lib_ring_buffer_backend *bufb = &buf->backend;
780 struct channel *chan = shmp(handle, bufb->chan);
781 unsigned long consumed;
782
783 CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1);
784
785 /*
786 * Only push the consumed value forward.
787 * If the consumed cmpxchg fails, this is because we have been pushed by
788 * the writer in flight recorder mode.
789 */
790 consumed = uatomic_read(&buf->consumed);
791 while ((long) consumed - (long) consumed_new < 0)
792 consumed = uatomic_cmpxchg(&buf->consumed, consumed,
793 consumed_new);
794 }
795
796 /**
797 * lib_ring_buffer_get_subbuf - get exclusive access to subbuffer for reading
798 * @buf: ring buffer
799 * @consumed: consumed count indicating the position where to read
800 *
801 * Returns -ENODATA if buffer is finalized, -EAGAIN if there is currently no
802 * data to read at consumed position, or 0 if the get operation succeeds.
803 */
804 int lib_ring_buffer_get_subbuf(struct lttng_ust_lib_ring_buffer *buf,
805 unsigned long consumed,
806 struct lttng_ust_shm_handle *handle)
807 {
808 struct channel *chan = shmp(handle, buf->backend.chan);
809 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
810 unsigned long consumed_cur, consumed_idx, commit_count, write_offset;
811 int ret;
812 int finalized;
813
814 retry:
815 finalized = CMM_ACCESS_ONCE(buf->finalized);
816 /*
817 * Read finalized before counters.
818 */
819 cmm_smp_rmb();
820 consumed_cur = uatomic_read(&buf->consumed);
821 consumed_idx = subbuf_index(consumed, chan);
822 commit_count = v_read(config, &shmp_index(handle, buf->commit_cold, consumed_idx)->cc_sb);
823 /*
824 * Make sure we read the commit count before reading the buffer
825 * data and the write offset. Correct consumed offset ordering
826 * wrt commit count is insured by the use of cmpxchg to update
827 * the consumed offset.
828 */
829 /*
830 * Local rmb to match the remote wmb to read the commit count
831 * before the buffer data and the write offset.
832 */
833 cmm_smp_rmb();
834
835 write_offset = v_read(config, &buf->offset);
836
837 /*
838 * Check that the buffer we are getting is after or at consumed_cur
839 * position.
840 */
841 if ((long) subbuf_trunc(consumed, chan)
842 - (long) subbuf_trunc(consumed_cur, chan) < 0)
843 goto nodata;
844
845 /*
846 * Check that the subbuffer we are trying to consume has been
847 * already fully committed.
848 */
849 if (((commit_count - chan->backend.subbuf_size)
850 & chan->commit_count_mask)
851 - (buf_trunc(consumed_cur, chan)
852 >> chan->backend.num_subbuf_order)
853 != 0)
854 goto nodata;
855
856 /*
857 * Check that we are not about to read the same subbuffer in
858 * which the writer head is.
859 */
860 if (subbuf_trunc(write_offset, chan) - subbuf_trunc(consumed_cur, chan)
861 == 0)
862 goto nodata;
863
864 /*
865 * Failure to get the subbuffer causes a busy-loop retry without going
866 * to a wait queue. These are caused by short-lived race windows where
867 * the writer is getting access to a subbuffer we were trying to get
868 * access to. Also checks that the "consumed" buffer count we are
869 * looking for matches the one contained in the subbuffer id.
870 */
871 ret = update_read_sb_index(config, &buf->backend, &chan->backend,
872 consumed_idx, buf_trunc_val(consumed, chan),
873 handle);
874 if (ret)
875 goto retry;
876 subbuffer_id_clear_noref(config, &buf->backend.buf_rsb.id);
877
878 buf->get_subbuf_consumed = consumed;
879 buf->get_subbuf = 1;
880
881 return 0;
882
883 nodata:
884 /*
885 * The memory barriers __wait_event()/wake_up_interruptible() take care
886 * of "raw_spin_is_locked" memory ordering.
887 */
888 if (finalized)
889 return -ENODATA;
890 else
891 return -EAGAIN;
892 }
893
894 /**
895 * lib_ring_buffer_put_subbuf - release exclusive subbuffer access
896 * @buf: ring buffer
897 */
898 void lib_ring_buffer_put_subbuf(struct lttng_ust_lib_ring_buffer *buf,
899 struct lttng_ust_shm_handle *handle)
900 {
901 struct lttng_ust_lib_ring_buffer_backend *bufb = &buf->backend;
902 struct channel *chan = shmp(handle, bufb->chan);
903 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
904 unsigned long read_sb_bindex, consumed_idx, consumed;
905
906 CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1);
907
908 if (!buf->get_subbuf) {
909 /*
910 * Reader puts a subbuffer it did not get.
911 */
912 CHAN_WARN_ON(chan, 1);
913 return;
914 }
915 consumed = buf->get_subbuf_consumed;
916 buf->get_subbuf = 0;
917
918 /*
919 * Clear the records_unread counter. (overruns counter)
920 * Can still be non-zero if a file reader simply grabbed the data
921 * without using iterators.
922 * Can be below zero if an iterator is used on a snapshot more than
923 * once.
924 */
925 read_sb_bindex = subbuffer_id_get_index(config, bufb->buf_rsb.id);
926 v_add(config, v_read(config,
927 &shmp(handle, shmp_index(handle, bufb->array, read_sb_bindex)->shmp)->records_unread),
928 &bufb->records_read);
929 v_set(config, &shmp(handle, shmp_index(handle, bufb->array, read_sb_bindex)->shmp)->records_unread, 0);
930 CHAN_WARN_ON(chan, config->mode == RING_BUFFER_OVERWRITE
931 && subbuffer_id_is_noref(config, bufb->buf_rsb.id));
932 subbuffer_id_set_noref(config, &bufb->buf_rsb.id);
933
934 /*
935 * Exchange the reader subbuffer with the one we put in its place in the
936 * writer subbuffer table. Expect the original consumed count. If
937 * update_read_sb_index fails, this is because the writer updated the
938 * subbuffer concurrently. We should therefore keep the subbuffer we
939 * currently have: it has become invalid to try reading this sub-buffer
940 * consumed count value anyway.
941 */
942 consumed_idx = subbuf_index(consumed, chan);
943 update_read_sb_index(config, &buf->backend, &chan->backend,
944 consumed_idx, buf_trunc_val(consumed, chan),
945 handle);
946 /*
947 * update_read_sb_index return value ignored. Don't exchange sub-buffer
948 * if the writer concurrently updated it.
949 */
950 }
951
952 /*
953 * cons_offset is an iterator on all subbuffer offsets between the reader
954 * position and the writer position. (inclusive)
955 */
956 static
957 void lib_ring_buffer_print_subbuffer_errors(struct lttng_ust_lib_ring_buffer *buf,
958 struct channel *chan,
959 unsigned long cons_offset,
960 int cpu,
961 struct lttng_ust_shm_handle *handle)
962 {
963 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
964 unsigned long cons_idx, commit_count, commit_count_sb;
965
966 cons_idx = subbuf_index(cons_offset, chan);
967 commit_count = v_read(config, &shmp_index(handle, buf->commit_hot, cons_idx)->cc);
968 commit_count_sb = v_read(config, &shmp_index(handle, buf->commit_cold, cons_idx)->cc_sb);
969
970 if (subbuf_offset(commit_count, chan) != 0)
971 DBG("ring buffer %s, cpu %d: "
972 "commit count in subbuffer %lu,\n"
973 "expecting multiples of %lu bytes\n"
974 " [ %lu bytes committed, %lu bytes reader-visible ]\n",
975 chan->backend.name, cpu, cons_idx,
976 chan->backend.subbuf_size,
977 commit_count, commit_count_sb);
978
979 DBG("ring buffer: %s, cpu %d: %lu bytes committed\n",
980 chan->backend.name, cpu, commit_count);
981 }
982
983 static
984 void lib_ring_buffer_print_buffer_errors(struct lttng_ust_lib_ring_buffer *buf,
985 struct channel *chan,
986 void *priv, int cpu,
987 struct lttng_ust_shm_handle *handle)
988 {
989 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
990 unsigned long write_offset, cons_offset;
991
992 /*
993 * No need to order commit_count, write_offset and cons_offset reads
994 * because we execute at teardown when no more writer nor reader
995 * references are left.
996 */
997 write_offset = v_read(config, &buf->offset);
998 cons_offset = uatomic_read(&buf->consumed);
999 if (write_offset != cons_offset)
1000 DBG("ring buffer %s, cpu %d: "
1001 "non-consumed data\n"
1002 " [ %lu bytes written, %lu bytes read ]\n",
1003 chan->backend.name, cpu, write_offset, cons_offset);
1004
1005 for (cons_offset = uatomic_read(&buf->consumed);
1006 (long) (subbuf_trunc((unsigned long) v_read(config, &buf->offset),
1007 chan)
1008 - cons_offset) > 0;
1009 cons_offset = subbuf_align(cons_offset, chan))
1010 lib_ring_buffer_print_subbuffer_errors(buf, chan, cons_offset,
1011 cpu, handle);
1012 }
1013
1014 static
1015 void lib_ring_buffer_print_errors(struct channel *chan,
1016 struct lttng_ust_lib_ring_buffer *buf, int cpu,
1017 struct lttng_ust_shm_handle *handle)
1018 {
1019 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
1020 void *priv = channel_get_private(chan);
1021
1022 if (!strcmp(chan->backend.name, "relay-metadata-mmap")) {
1023 DBG("ring buffer %s: %lu records written, "
1024 "%lu records overrun\n",
1025 chan->backend.name,
1026 v_read(config, &buf->records_count),
1027 v_read(config, &buf->records_overrun));
1028 } else {
1029 DBG("ring buffer %s, cpu %d: %lu records written, "
1030 "%lu records overrun\n",
1031 chan->backend.name, cpu,
1032 v_read(config, &buf->records_count),
1033 v_read(config, &buf->records_overrun));
1034
1035 if (v_read(config, &buf->records_lost_full)
1036 || v_read(config, &buf->records_lost_wrap)
1037 || v_read(config, &buf->records_lost_big))
1038 DBG("ring buffer %s, cpu %d: records were lost. Caused by:\n"
1039 " [ %lu buffer full, %lu nest buffer wrap-around, "
1040 "%lu event too big ]\n",
1041 chan->backend.name, cpu,
1042 v_read(config, &buf->records_lost_full),
1043 v_read(config, &buf->records_lost_wrap),
1044 v_read(config, &buf->records_lost_big));
1045 }
1046 lib_ring_buffer_print_buffer_errors(buf, chan, priv, cpu, handle);
1047 }
1048
1049 /*
1050 * lib_ring_buffer_switch_old_start: Populate old subbuffer header.
1051 *
1052 * Only executed when the buffer is finalized, in SWITCH_FLUSH.
1053 */
1054 static
1055 void lib_ring_buffer_switch_old_start(struct lttng_ust_lib_ring_buffer *buf,
1056 struct channel *chan,
1057 struct switch_offsets *offsets,
1058 uint64_t tsc,
1059 struct lttng_ust_shm_handle *handle)
1060 {
1061 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
1062 unsigned long oldidx = subbuf_index(offsets->old, chan);
1063 unsigned long commit_count;
1064
1065 config->cb.buffer_begin(buf, tsc, oldidx, handle);
1066
1067 /*
1068 * Order all writes to buffer before the commit count update that will
1069 * determine that the subbuffer is full.
1070 */
1071 cmm_smp_wmb();
1072 v_add(config, config->cb.subbuffer_header_size(),
1073 &shmp_index(handle, buf->commit_hot, oldidx)->cc);
1074 commit_count = v_read(config, &shmp_index(handle, buf->commit_hot, oldidx)->cc);
1075 /* Check if the written buffer has to be delivered */
1076 lib_ring_buffer_check_deliver(config, buf, chan, offsets->old,
1077 commit_count, oldidx, handle);
1078 lib_ring_buffer_write_commit_counter(config, buf, chan, oldidx,
1079 offsets->old, commit_count,
1080 config->cb.subbuffer_header_size(),
1081 handle);
1082 }
1083
1084 /*
1085 * lib_ring_buffer_switch_old_end: switch old subbuffer
1086 *
1087 * Note : offset_old should never be 0 here. It is ok, because we never perform
1088 * buffer switch on an empty subbuffer in SWITCH_ACTIVE mode. The caller
1089 * increments the offset_old value when doing a SWITCH_FLUSH on an empty
1090 * subbuffer.
1091 */
1092 static
1093 void lib_ring_buffer_switch_old_end(struct lttng_ust_lib_ring_buffer *buf,
1094 struct channel *chan,
1095 struct switch_offsets *offsets,
1096 uint64_t tsc,
1097 struct lttng_ust_shm_handle *handle)
1098 {
1099 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
1100 unsigned long oldidx = subbuf_index(offsets->old - 1, chan);
1101 unsigned long commit_count, padding_size, data_size;
1102
1103 data_size = subbuf_offset(offsets->old - 1, chan) + 1;
1104 padding_size = chan->backend.subbuf_size - data_size;
1105 subbuffer_set_data_size(config, &buf->backend, oldidx, data_size,
1106 handle);
1107
1108 /*
1109 * Order all writes to buffer before the commit count update that will
1110 * determine that the subbuffer is full.
1111 */
1112 cmm_smp_wmb();
1113 v_add(config, padding_size, &shmp_index(handle, buf->commit_hot, oldidx)->cc);
1114 commit_count = v_read(config, &shmp_index(handle, buf->commit_hot, oldidx)->cc);
1115 lib_ring_buffer_check_deliver(config, buf, chan, offsets->old - 1,
1116 commit_count, oldidx, handle);
1117 lib_ring_buffer_write_commit_counter(config, buf, chan, oldidx,
1118 offsets->old, commit_count,
1119 padding_size, handle);
1120 }
1121
1122 /*
1123 * lib_ring_buffer_switch_new_start: Populate new subbuffer.
1124 *
1125 * This code can be executed unordered : writers may already have written to the
1126 * sub-buffer before this code gets executed, caution. The commit makes sure
1127 * that this code is executed before the deliver of this sub-buffer.
1128 */
1129 static
1130 void lib_ring_buffer_switch_new_start(struct lttng_ust_lib_ring_buffer *buf,
1131 struct channel *chan,
1132 struct switch_offsets *offsets,
1133 uint64_t tsc,
1134 struct lttng_ust_shm_handle *handle)
1135 {
1136 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
1137 unsigned long beginidx = subbuf_index(offsets->begin, chan);
1138 unsigned long commit_count;
1139
1140 config->cb.buffer_begin(buf, tsc, beginidx, handle);
1141
1142 /*
1143 * Order all writes to buffer before the commit count update that will
1144 * determine that the subbuffer is full.
1145 */
1146 cmm_smp_wmb();
1147 v_add(config, config->cb.subbuffer_header_size(),
1148 &shmp_index(handle, buf->commit_hot, beginidx)->cc);
1149 commit_count = v_read(config, &shmp_index(handle, buf->commit_hot, beginidx)->cc);
1150 /* Check if the written buffer has to be delivered */
1151 lib_ring_buffer_check_deliver(config, buf, chan, offsets->begin,
1152 commit_count, beginidx, handle);
1153 lib_ring_buffer_write_commit_counter(config, buf, chan, beginidx,
1154 offsets->begin, commit_count,
1155 config->cb.subbuffer_header_size(),
1156 handle);
1157 }
1158
1159 /*
1160 * lib_ring_buffer_switch_new_end: finish switching current subbuffer
1161 *
1162 * The only remaining threads could be the ones with pending commits. They will
1163 * have to do the deliver themselves.
1164 */
1165 static
1166 void lib_ring_buffer_switch_new_end(struct lttng_ust_lib_ring_buffer *buf,
1167 struct channel *chan,
1168 struct switch_offsets *offsets,
1169 uint64_t tsc,
1170 struct lttng_ust_shm_handle *handle)
1171 {
1172 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
1173 unsigned long endidx = subbuf_index(offsets->end - 1, chan);
1174 unsigned long commit_count, padding_size, data_size;
1175
1176 data_size = subbuf_offset(offsets->end - 1, chan) + 1;
1177 padding_size = chan->backend.subbuf_size - data_size;
1178 subbuffer_set_data_size(config, &buf->backend, endidx, data_size,
1179 handle);
1180
1181 /*
1182 * Order all writes to buffer before the commit count update that will
1183 * determine that the subbuffer is full.
1184 */
1185 cmm_smp_wmb();
1186 v_add(config, padding_size, &shmp_index(handle, buf->commit_hot, endidx)->cc);
1187 commit_count = v_read(config, &shmp_index(handle, buf->commit_hot, endidx)->cc);
1188 lib_ring_buffer_check_deliver(config, buf, chan, offsets->end - 1,
1189 commit_count, endidx, handle);
1190 lib_ring_buffer_write_commit_counter(config, buf, chan, endidx,
1191 offsets->end, commit_count,
1192 padding_size, handle);
1193 }
1194
1195 /*
1196 * Returns :
1197 * 0 if ok
1198 * !0 if execution must be aborted.
1199 */
1200 static
1201 int lib_ring_buffer_try_switch_slow(enum switch_mode mode,
1202 struct lttng_ust_lib_ring_buffer *buf,
1203 struct channel *chan,
1204 struct switch_offsets *offsets,
1205 uint64_t *tsc)
1206 {
1207 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
1208 unsigned long off;
1209
1210 offsets->begin = v_read(config, &buf->offset);
1211 offsets->old = offsets->begin;
1212 offsets->switch_old_start = 0;
1213 off = subbuf_offset(offsets->begin, chan);
1214
1215 *tsc = config->cb.ring_buffer_clock_read(chan);
1216
1217 /*
1218 * Ensure we flush the header of an empty subbuffer when doing the
1219 * finalize (SWITCH_FLUSH). This ensures that we end up knowing the
1220 * total data gathering duration even if there were no records saved
1221 * after the last buffer switch.
1222 * In SWITCH_ACTIVE mode, switch the buffer when it contains events.
1223 * SWITCH_ACTIVE only flushes the current subbuffer, dealing with end of
1224 * subbuffer header as appropriate.
1225 * The next record that reserves space will be responsible for
1226 * populating the following subbuffer header. We choose not to populate
1227 * the next subbuffer header here because we want to be able to use
1228 * SWITCH_ACTIVE for periodical buffer flush, which must
1229 * guarantee that all the buffer content (records and header
1230 * timestamps) are visible to the reader. This is required for
1231 * quiescence guarantees for the fusion merge.
1232 */
1233 if (mode == SWITCH_FLUSH || off > 0) {
1234 if (caa_unlikely(off == 0)) {
1235 /*
1236 * The client does not save any header information.
1237 * Don't switch empty subbuffer on finalize, because it
1238 * is invalid to deliver a completely empty subbuffer.
1239 */
1240 if (!config->cb.subbuffer_header_size())
1241 return -1;
1242 /*
1243 * Need to write the subbuffer start header on finalize.
1244 */
1245 offsets->switch_old_start = 1;
1246 }
1247 offsets->begin = subbuf_align(offsets->begin, chan);
1248 } else
1249 return -1; /* we do not have to switch : buffer is empty */
1250 /* Note: old points to the next subbuf at offset 0 */
1251 offsets->end = offsets->begin;
1252 return 0;
1253 }
1254
1255 /*
1256 * Force a sub-buffer switch. This operation is completely reentrant : can be
1257 * called while tracing is active with absolutely no lock held.
1258 *
1259 * Note, however, that as a v_cmpxchg is used for some atomic
1260 * operations, this function must be called from the CPU which owns the buffer
1261 * for a ACTIVE flush.
1262 */
1263 void lib_ring_buffer_switch_slow(struct lttng_ust_lib_ring_buffer *buf, enum switch_mode mode,
1264 struct lttng_ust_shm_handle *handle)
1265 {
1266 struct channel *chan = shmp(handle, buf->backend.chan);
1267 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
1268 struct switch_offsets offsets;
1269 unsigned long oldidx;
1270 uint64_t tsc;
1271
1272 offsets.size = 0;
1273
1274 /*
1275 * Perform retryable operations.
1276 */
1277 do {
1278 if (lib_ring_buffer_try_switch_slow(mode, buf, chan, &offsets,
1279 &tsc))
1280 return; /* Switch not needed */
1281 } while (v_cmpxchg(config, &buf->offset, offsets.old, offsets.end)
1282 != offsets.old);
1283
1284 /*
1285 * Atomically update last_tsc. This update races against concurrent
1286 * atomic updates, but the race will always cause supplementary full TSC
1287 * records, never the opposite (missing a full TSC record when it would
1288 * be needed).
1289 */
1290 save_last_tsc(config, buf, tsc);
1291
1292 /*
1293 * Push the reader if necessary
1294 */
1295 lib_ring_buffer_reserve_push_reader(buf, chan, offsets.old);
1296
1297 oldidx = subbuf_index(offsets.old, chan);
1298 lib_ring_buffer_clear_noref(config, &buf->backend, oldidx, handle);
1299
1300 /*
1301 * May need to populate header start on SWITCH_FLUSH.
1302 */
1303 if (offsets.switch_old_start) {
1304 lib_ring_buffer_switch_old_start(buf, chan, &offsets, tsc, handle);
1305 offsets.old += config->cb.subbuffer_header_size();
1306 }
1307
1308 /*
1309 * Switch old subbuffer.
1310 */
1311 lib_ring_buffer_switch_old_end(buf, chan, &offsets, tsc, handle);
1312 }
1313
1314 /*
1315 * Returns :
1316 * 0 if ok
1317 * -ENOSPC if event size is too large for packet.
1318 * -ENOBUFS if there is currently not enough space in buffer for the event.
1319 * -EIO if data cannot be written into the buffer for any other reason.
1320 */
1321 static
1322 int lib_ring_buffer_try_reserve_slow(struct lttng_ust_lib_ring_buffer *buf,
1323 struct channel *chan,
1324 struct switch_offsets *offsets,
1325 struct lttng_ust_lib_ring_buffer_ctx *ctx)
1326 {
1327 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
1328 struct lttng_ust_shm_handle *handle = ctx->handle;
1329 unsigned long reserve_commit_diff;
1330
1331 offsets->begin = v_read(config, &buf->offset);
1332 offsets->old = offsets->begin;
1333 offsets->switch_new_start = 0;
1334 offsets->switch_new_end = 0;
1335 offsets->switch_old_end = 0;
1336 offsets->pre_header_padding = 0;
1337
1338 ctx->tsc = config->cb.ring_buffer_clock_read(chan);
1339 if ((int64_t) ctx->tsc == -EIO)
1340 return -EIO;
1341
1342 if (last_tsc_overflow(config, buf, ctx->tsc))
1343 ctx->rflags |= RING_BUFFER_RFLAG_FULL_TSC;
1344
1345 if (caa_unlikely(subbuf_offset(offsets->begin, ctx->chan) == 0)) {
1346 offsets->switch_new_start = 1; /* For offsets->begin */
1347 } else {
1348 offsets->size = config->cb.record_header_size(config, chan,
1349 offsets->begin,
1350 &offsets->pre_header_padding,
1351 ctx);
1352 offsets->size +=
1353 lib_ring_buffer_align(offsets->begin + offsets->size,
1354 ctx->largest_align)
1355 + ctx->data_size;
1356 if (caa_unlikely(subbuf_offset(offsets->begin, chan) +
1357 offsets->size > chan->backend.subbuf_size)) {
1358 offsets->switch_old_end = 1; /* For offsets->old */
1359 offsets->switch_new_start = 1; /* For offsets->begin */
1360 }
1361 }
1362 if (caa_unlikely(offsets->switch_new_start)) {
1363 unsigned long sb_index;
1364
1365 /*
1366 * We are typically not filling the previous buffer completely.
1367 */
1368 if (caa_likely(offsets->switch_old_end))
1369 offsets->begin = subbuf_align(offsets->begin, chan);
1370 offsets->begin = offsets->begin
1371 + config->cb.subbuffer_header_size();
1372 /* Test new buffer integrity */
1373 sb_index = subbuf_index(offsets->begin, chan);
1374 reserve_commit_diff =
1375 (buf_trunc(offsets->begin, chan)
1376 >> chan->backend.num_subbuf_order)
1377 - ((unsigned long) v_read(config,
1378 &shmp_index(handle, buf->commit_cold, sb_index)->cc_sb)
1379 & chan->commit_count_mask);
1380 if (caa_likely(reserve_commit_diff == 0)) {
1381 /* Next subbuffer not being written to. */
1382 if (caa_unlikely(config->mode != RING_BUFFER_OVERWRITE &&
1383 subbuf_trunc(offsets->begin, chan)
1384 - subbuf_trunc((unsigned long)
1385 uatomic_read(&buf->consumed), chan)
1386 >= chan->backend.buf_size)) {
1387 unsigned long nr_lost;
1388
1389 /*
1390 * We do not overwrite non consumed buffers
1391 * and we are full : record is lost.
1392 */
1393 nr_lost = v_read(config, &buf->records_lost_full);
1394 v_inc(config, &buf->records_lost_full);
1395 if ((nr_lost & (DBG_PRINT_NR_LOST - 1)) == 0) {
1396 DBG("%lu or more records lost in (%s:%d) (buffer full)\n",
1397 nr_lost + 1, chan->backend.name,
1398 buf->backend.cpu);
1399 }
1400 return -ENOBUFS;
1401 } else {
1402 /*
1403 * Next subbuffer not being written to, and we
1404 * are either in overwrite mode or the buffer is
1405 * not full. It's safe to write in this new
1406 * subbuffer.
1407 */
1408 }
1409 } else {
1410 unsigned long nr_lost;
1411
1412 /*
1413 * Next subbuffer reserve offset does not match the
1414 * commit offset. Drop record in producer-consumer and
1415 * overwrite mode. Caused by either a writer OOPS or too
1416 * many nested writes over a reserve/commit pair.
1417 */
1418 nr_lost = v_read(config, &buf->records_lost_wrap);
1419 v_inc(config, &buf->records_lost_wrap);
1420 if ((nr_lost & (DBG_PRINT_NR_LOST - 1)) == 0) {
1421 DBG("%lu or more records lost in (%s:%d) (wrap-around)\n",
1422 nr_lost + 1, chan->backend.name,
1423 buf->backend.cpu);
1424 }
1425 return -EIO;
1426 }
1427 offsets->size =
1428 config->cb.record_header_size(config, chan,
1429 offsets->begin,
1430 &offsets->pre_header_padding,
1431 ctx);
1432 offsets->size +=
1433 lib_ring_buffer_align(offsets->begin + offsets->size,
1434 ctx->largest_align)
1435 + ctx->data_size;
1436 if (caa_unlikely(subbuf_offset(offsets->begin, chan)
1437 + offsets->size > chan->backend.subbuf_size)) {
1438 unsigned long nr_lost;
1439
1440 /*
1441 * Record too big for subbuffers, report error, don't
1442 * complete the sub-buffer switch.
1443 */
1444 nr_lost = v_read(config, &buf->records_lost_big);
1445 v_inc(config, &buf->records_lost_big);
1446 if ((nr_lost & (DBG_PRINT_NR_LOST - 1)) == 0) {
1447 DBG("%lu or more records lost in (%s:%d) record size "
1448 " of %zu bytes is too large for buffer\n",
1449 nr_lost + 1, chan->backend.name,
1450 buf->backend.cpu, offsets->size);
1451 }
1452 return -ENOSPC;
1453 } else {
1454 /*
1455 * We just made a successful buffer switch and the
1456 * record fits in the new subbuffer. Let's write.
1457 */
1458 }
1459 } else {
1460 /*
1461 * Record fits in the current buffer and we are not on a switch
1462 * boundary. It's safe to write.
1463 */
1464 }
1465 offsets->end = offsets->begin + offsets->size;
1466
1467 if (caa_unlikely(subbuf_offset(offsets->end, chan) == 0)) {
1468 /*
1469 * The offset_end will fall at the very beginning of the next
1470 * subbuffer.
1471 */
1472 offsets->switch_new_end = 1; /* For offsets->begin */
1473 }
1474 return 0;
1475 }
1476
1477 /**
1478 * lib_ring_buffer_reserve_slow - Atomic slot reservation in a buffer.
1479 * @ctx: ring buffer context.
1480 *
1481 * Return : -NOBUFS if not enough space, -ENOSPC if event size too large,
1482 * -EIO for other errors, else returns 0.
1483 * It will take care of sub-buffer switching.
1484 */
1485 int lib_ring_buffer_reserve_slow(struct lttng_ust_lib_ring_buffer_ctx *ctx)
1486 {
1487 struct channel *chan = ctx->chan;
1488 struct lttng_ust_shm_handle *handle = ctx->handle;
1489 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
1490 struct lttng_ust_lib_ring_buffer *buf;
1491 struct switch_offsets offsets;
1492 int ret;
1493
1494 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
1495 buf = shmp(handle, chan->backend.buf[ctx->cpu].shmp);
1496 else
1497 buf = shmp(handle, chan->backend.buf[0].shmp);
1498 ctx->buf = buf;
1499
1500 offsets.size = 0;
1501
1502 do {
1503 ret = lib_ring_buffer_try_reserve_slow(buf, chan, &offsets,
1504 ctx);
1505 if (caa_unlikely(ret))
1506 return ret;
1507 } while (caa_unlikely(v_cmpxchg(config, &buf->offset, offsets.old,
1508 offsets.end)
1509 != offsets.old));
1510
1511 /*
1512 * Atomically update last_tsc. This update races against concurrent
1513 * atomic updates, but the race will always cause supplementary full TSC
1514 * records, never the opposite (missing a full TSC record when it would
1515 * be needed).
1516 */
1517 save_last_tsc(config, buf, ctx->tsc);
1518
1519 /*
1520 * Push the reader if necessary
1521 */
1522 lib_ring_buffer_reserve_push_reader(buf, chan, offsets.end - 1);
1523
1524 /*
1525 * Clear noref flag for this subbuffer.
1526 */
1527 lib_ring_buffer_clear_noref(config, &buf->backend,
1528 subbuf_index(offsets.end - 1, chan),
1529 handle);
1530
1531 /*
1532 * Switch old subbuffer if needed.
1533 */
1534 if (caa_unlikely(offsets.switch_old_end)) {
1535 lib_ring_buffer_clear_noref(config, &buf->backend,
1536 subbuf_index(offsets.old - 1, chan),
1537 handle);
1538 lib_ring_buffer_switch_old_end(buf, chan, &offsets, ctx->tsc, handle);
1539 }
1540
1541 /*
1542 * Populate new subbuffer.
1543 */
1544 if (caa_unlikely(offsets.switch_new_start))
1545 lib_ring_buffer_switch_new_start(buf, chan, &offsets, ctx->tsc, handle);
1546
1547 if (caa_unlikely(offsets.switch_new_end))
1548 lib_ring_buffer_switch_new_end(buf, chan, &offsets, ctx->tsc, handle);
1549
1550 ctx->slot_size = offsets.size;
1551 ctx->pre_offset = offsets.begin;
1552 ctx->buf_offset = offsets.begin + offsets.pre_header_padding;
1553 return 0;
1554 }
1555
1556 /*
1557 * Force a read (imply TLS fixup for dlopen) of TLS variables.
1558 */
1559 void lttng_fixup_ringbuffer_tls(void)
1560 {
1561 asm volatile ("" : : "m" (lib_ring_buffer_nesting));
1562 }
This page took 0.065913 seconds and 5 git commands to generate.