Cleanup: ring buffer: remove lib_ring_buffer_switch_new_end()
[lttng-ust.git] / libringbuffer / ring_buffer_frontend.c
1 /*
2 * ring_buffer_frontend.c
3 *
4 * Copyright (C) 2005-2012 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; only
9 * version 2.1 of the License.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with this library; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19 *
20 *
21 * Ring buffer wait-free buffer synchronization. Producer-consumer and flight
22 * recorder (overwrite) modes. See thesis:
23 *
24 * Desnoyers, Mathieu (2009), "Low-Impact Operating System Tracing", Ph.D.
25 * dissertation, Ecole Polytechnique de Montreal.
26 * http://www.lttng.org/pub/thesis/desnoyers-dissertation-2009-12.pdf
27 *
28 * - Algorithm presentation in Chapter 5:
29 * "Lockless Multi-Core High-Throughput Buffering".
30 * - Algorithm formal verification in Section 8.6:
31 * "Formal verification of LTTng"
32 *
33 * Author:
34 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
35 *
36 * Inspired from LTT and RelayFS:
37 * Karim Yaghmour <karim@opersys.com>
38 * Tom Zanussi <zanussi@us.ibm.com>
39 * Bob Wisniewski <bob@watson.ibm.com>
40 * And from K42 :
41 * Bob Wisniewski <bob@watson.ibm.com>
42 *
43 * Buffer reader semantic :
44 *
45 * - get_subbuf_size
46 * while buffer is not finalized and empty
47 * - get_subbuf
48 * - if return value != 0, continue
49 * - splice one subbuffer worth of data to a pipe
50 * - splice the data from pipe to disk/network
51 * - put_subbuf
52 */
53
54 #define _GNU_SOURCE
55 #include <sys/types.h>
56 #include <sys/mman.h>
57 #include <sys/stat.h>
58 #include <unistd.h>
59 #include <fcntl.h>
60 #include <signal.h>
61 #include <time.h>
62 #include <urcu/compiler.h>
63 #include <urcu/ref.h>
64 #include <urcu/tls-compat.h>
65 #include <helper.h>
66
67 #include "smp.h"
68 #include <lttng/ringbuffer-config.h>
69 #include "vatomic.h"
70 #include "backend.h"
71 #include "frontend.h"
72 #include "shm.h"
73 #include "tlsfixup.h"
74 #include "../liblttng-ust/compat.h" /* For ENODATA */
75
76 #ifndef max
77 #define max(a, b) ((a) > (b) ? (a) : (b))
78 #endif
79
80 /* Print DBG() messages about events lost only every 1048576 hits */
81 #define DBG_PRINT_NR_LOST (1UL << 20)
82
83 #define LTTNG_UST_RB_SIG_FLUSH SIGRTMIN
84 #define LTTNG_UST_RB_SIG_READ SIGRTMIN + 1
85 #define LTTNG_UST_RB_SIG_TEARDOWN SIGRTMIN + 2
86 #define CLOCKID CLOCK_MONOTONIC
87
88 /*
89 * Use POSIX SHM: shm_open(3) and shm_unlink(3).
90 * close(2) to close the fd returned by shm_open.
91 * shm_unlink releases the shared memory object name.
92 * ftruncate(2) sets the size of the memory object.
93 * mmap/munmap maps the shared memory obj to a virtual address in the
94 * calling proceess (should be done both in libust and consumer).
95 * See shm_overview(7) for details.
96 * Pass file descriptor returned by shm_open(3) to ltt-sessiond through
97 * a UNIX socket.
98 *
99 * Since we don't need to access the object using its name, we can
100 * immediately shm_unlink(3) it, and only keep the handle with its file
101 * descriptor.
102 */
103
104 /*
105 * Internal structure representing offsets to use at a sub-buffer switch.
106 */
107 struct switch_offsets {
108 unsigned long begin, end, old;
109 size_t pre_header_padding, size;
110 unsigned int switch_new_start:1, switch_old_start:1, switch_old_end:1;
111 };
112
113 DEFINE_URCU_TLS(unsigned int, lib_ring_buffer_nesting);
114
115 /*
116 * wakeup_fd_mutex protects wakeup fd use by timer from concurrent
117 * close.
118 */
119 static pthread_mutex_t wakeup_fd_mutex = PTHREAD_MUTEX_INITIALIZER;
120
121 static
122 void lib_ring_buffer_print_errors(struct channel *chan,
123 struct lttng_ust_lib_ring_buffer *buf, int cpu,
124 struct lttng_ust_shm_handle *handle);
125
126 /*
127 * Handle timer teardown race wrt memory free of private data by
128 * ring buffer signals are handled by a single thread, which permits
129 * a synchronization point between handling of each signal.
130 * Protected by the lock within the structure.
131 */
132 struct timer_signal_data {
133 pthread_t tid; /* thread id managing signals */
134 int setup_done;
135 int qs_done;
136 pthread_mutex_t lock;
137 };
138
139 static struct timer_signal_data timer_signal = {
140 .tid = 0,
141 .setup_done = 0,
142 .qs_done = 0,
143 .lock = PTHREAD_MUTEX_INITIALIZER,
144 };
145
146 /**
147 * lib_ring_buffer_reset - Reset ring buffer to initial values.
148 * @buf: Ring buffer.
149 *
150 * Effectively empty the ring buffer. Should be called when the buffer is not
151 * used for writing. The ring buffer can be opened for reading, but the reader
152 * should not be using the iterator concurrently with reset. The previous
153 * current iterator record is reset.
154 */
155 void lib_ring_buffer_reset(struct lttng_ust_lib_ring_buffer *buf,
156 struct lttng_ust_shm_handle *handle)
157 {
158 struct channel *chan = shmp(handle, buf->backend.chan);
159 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
160 unsigned int i;
161
162 /*
163 * Reset iterator first. It will put the subbuffer if it currently holds
164 * it.
165 */
166 v_set(config, &buf->offset, 0);
167 for (i = 0; i < chan->backend.num_subbuf; i++) {
168 v_set(config, &shmp_index(handle, buf->commit_hot, i)->cc, 0);
169 v_set(config, &shmp_index(handle, buf->commit_hot, i)->seq, 0);
170 v_set(config, &shmp_index(handle, buf->commit_cold, i)->cc_sb, 0);
171 }
172 uatomic_set(&buf->consumed, 0);
173 uatomic_set(&buf->record_disabled, 0);
174 v_set(config, &buf->last_tsc, 0);
175 lib_ring_buffer_backend_reset(&buf->backend, handle);
176 /* Don't reset number of active readers */
177 v_set(config, &buf->records_lost_full, 0);
178 v_set(config, &buf->records_lost_wrap, 0);
179 v_set(config, &buf->records_lost_big, 0);
180 v_set(config, &buf->records_count, 0);
181 v_set(config, &buf->records_overrun, 0);
182 buf->finalized = 0;
183 }
184
185 /**
186 * channel_reset - Reset channel to initial values.
187 * @chan: Channel.
188 *
189 * Effectively empty the channel. Should be called when the channel is not used
190 * for writing. The channel can be opened for reading, but the reader should not
191 * be using the iterator concurrently with reset. The previous current iterator
192 * record is reset.
193 */
194 void channel_reset(struct channel *chan)
195 {
196 /*
197 * Reset iterators first. Will put the subbuffer if held for reading.
198 */
199 uatomic_set(&chan->record_disabled, 0);
200 /* Don't reset commit_count_mask, still valid */
201 channel_backend_reset(&chan->backend);
202 /* Don't reset switch/read timer interval */
203 /* Don't reset notifiers and notifier enable bits */
204 /* Don't reset reader reference count */
205 }
206
207 /*
208 * Must be called under cpu hotplug protection.
209 */
210 int lib_ring_buffer_create(struct lttng_ust_lib_ring_buffer *buf,
211 struct channel_backend *chanb, int cpu,
212 struct lttng_ust_shm_handle *handle,
213 struct shm_object *shmobj)
214 {
215 const struct lttng_ust_lib_ring_buffer_config *config = &chanb->config;
216 struct channel *chan = caa_container_of(chanb, struct channel, backend);
217 void *priv = channel_get_private(chan);
218 size_t subbuf_header_size;
219 uint64_t tsc;
220 int ret;
221
222 /* Test for cpu hotplug */
223 if (buf->backend.allocated)
224 return 0;
225
226 ret = lib_ring_buffer_backend_create(&buf->backend, &chan->backend,
227 cpu, handle, shmobj);
228 if (ret)
229 return ret;
230
231 align_shm(shmobj, __alignof__(struct commit_counters_hot));
232 set_shmp(buf->commit_hot,
233 zalloc_shm(shmobj,
234 sizeof(struct commit_counters_hot) * chan->backend.num_subbuf));
235 if (!shmp(handle, buf->commit_hot)) {
236 ret = -ENOMEM;
237 goto free_chanbuf;
238 }
239
240 align_shm(shmobj, __alignof__(struct commit_counters_cold));
241 set_shmp(buf->commit_cold,
242 zalloc_shm(shmobj,
243 sizeof(struct commit_counters_cold) * chan->backend.num_subbuf));
244 if (!shmp(handle, buf->commit_cold)) {
245 ret = -ENOMEM;
246 goto free_commit;
247 }
248
249 /*
250 * Write the subbuffer header for first subbuffer so we know the total
251 * duration of data gathering.
252 */
253 subbuf_header_size = config->cb.subbuffer_header_size();
254 v_set(config, &buf->offset, subbuf_header_size);
255 subbuffer_id_clear_noref(config, &shmp_index(handle, buf->backend.buf_wsb, 0)->id);
256 tsc = config->cb.ring_buffer_clock_read(shmp(handle, buf->backend.chan));
257 config->cb.buffer_begin(buf, tsc, 0, handle);
258 v_add(config, subbuf_header_size, &shmp_index(handle, buf->commit_hot, 0)->cc);
259
260 if (config->cb.buffer_create) {
261 ret = config->cb.buffer_create(buf, priv, cpu, chanb->name, handle);
262 if (ret)
263 goto free_init;
264 }
265 buf->backend.allocated = 1;
266 return 0;
267
268 /* Error handling */
269 free_init:
270 /* commit_cold will be freed by shm teardown */
271 free_commit:
272 /* commit_hot will be freed by shm teardown */
273 free_chanbuf:
274 return ret;
275 }
276
277 static
278 void lib_ring_buffer_channel_switch_timer(int sig, siginfo_t *si, void *uc)
279 {
280 const struct lttng_ust_lib_ring_buffer_config *config;
281 struct lttng_ust_shm_handle *handle;
282 struct channel *chan;
283 int cpu;
284
285 assert(CMM_LOAD_SHARED(timer_signal.tid) == pthread_self());
286
287 chan = si->si_value.sival_ptr;
288 handle = chan->handle;
289 config = &chan->backend.config;
290
291 DBG("Switch timer for channel %p\n", chan);
292
293 /*
294 * Only flush buffers periodically if readers are active.
295 */
296 pthread_mutex_lock(&wakeup_fd_mutex);
297 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
298 for_each_possible_cpu(cpu) {
299 struct lttng_ust_lib_ring_buffer *buf =
300 shmp(handle, chan->backend.buf[cpu].shmp);
301 if (uatomic_read(&buf->active_readers))
302 lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE,
303 chan->handle);
304 }
305 } else {
306 struct lttng_ust_lib_ring_buffer *buf =
307 shmp(handle, chan->backend.buf[0].shmp);
308
309 if (uatomic_read(&buf->active_readers))
310 lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE,
311 chan->handle);
312 }
313 pthread_mutex_unlock(&wakeup_fd_mutex);
314 return;
315 }
316
317 static
318 void lib_ring_buffer_channel_do_read(struct channel *chan)
319 {
320 const struct lttng_ust_lib_ring_buffer_config *config;
321 struct lttng_ust_shm_handle *handle;
322 int cpu;
323
324 handle = chan->handle;
325 config = &chan->backend.config;
326
327 /*
328 * Only flush buffers periodically if readers are active.
329 */
330 pthread_mutex_lock(&wakeup_fd_mutex);
331 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
332 for_each_possible_cpu(cpu) {
333 struct lttng_ust_lib_ring_buffer *buf =
334 shmp(handle, chan->backend.buf[cpu].shmp);
335
336 if (uatomic_read(&buf->active_readers)
337 && lib_ring_buffer_poll_deliver(config, buf,
338 chan, handle)) {
339 lib_ring_buffer_wakeup(buf, handle);
340 }
341 }
342 } else {
343 struct lttng_ust_lib_ring_buffer *buf =
344 shmp(handle, chan->backend.buf[0].shmp);
345
346 if (uatomic_read(&buf->active_readers)
347 && lib_ring_buffer_poll_deliver(config, buf,
348 chan, handle)) {
349 lib_ring_buffer_wakeup(buf, handle);
350 }
351 }
352 pthread_mutex_unlock(&wakeup_fd_mutex);
353 }
354
355 static
356 void lib_ring_buffer_channel_read_timer(int sig, siginfo_t *si, void *uc)
357 {
358 struct channel *chan;
359
360 assert(CMM_LOAD_SHARED(timer_signal.tid) == pthread_self());
361 chan = si->si_value.sival_ptr;
362 DBG("Read timer for channel %p\n", chan);
363 lib_ring_buffer_channel_do_read(chan);
364 return;
365 }
366
367 static
368 void rb_setmask(sigset_t *mask)
369 {
370 int ret;
371
372 ret = sigemptyset(mask);
373 if (ret) {
374 PERROR("sigemptyset");
375 }
376 ret = sigaddset(mask, LTTNG_UST_RB_SIG_FLUSH);
377 if (ret) {
378 PERROR("sigaddset");
379 }
380 ret = sigaddset(mask, LTTNG_UST_RB_SIG_READ);
381 if (ret) {
382 PERROR("sigaddset");
383 }
384 ret = sigaddset(mask, LTTNG_UST_RB_SIG_TEARDOWN);
385 if (ret) {
386 PERROR("sigaddset");
387 }
388 }
389
390 static
391 void *sig_thread(void *arg)
392 {
393 sigset_t mask;
394 siginfo_t info;
395 int signr;
396
397 /* Only self thread will receive signal mask. */
398 rb_setmask(&mask);
399 CMM_STORE_SHARED(timer_signal.tid, pthread_self());
400
401 for (;;) {
402 signr = sigwaitinfo(&mask, &info);
403 if (signr == -1) {
404 if (errno != EINTR)
405 PERROR("sigwaitinfo");
406 continue;
407 }
408 if (signr == LTTNG_UST_RB_SIG_FLUSH) {
409 lib_ring_buffer_channel_switch_timer(info.si_signo,
410 &info, NULL);
411 } else if (signr == LTTNG_UST_RB_SIG_READ) {
412 lib_ring_buffer_channel_read_timer(info.si_signo,
413 &info, NULL);
414 } else if (signr == LTTNG_UST_RB_SIG_TEARDOWN) {
415 cmm_smp_mb();
416 CMM_STORE_SHARED(timer_signal.qs_done, 1);
417 cmm_smp_mb();
418 } else {
419 ERR("Unexptected signal %d\n", info.si_signo);
420 }
421 }
422 return NULL;
423 }
424
425 /*
426 * Ensure only a single thread listens on the timer signal.
427 */
428 static
429 void lib_ring_buffer_setup_timer_thread(void)
430 {
431 pthread_t thread;
432 int ret;
433
434 pthread_mutex_lock(&timer_signal.lock);
435 if (timer_signal.setup_done)
436 goto end;
437
438 ret = pthread_create(&thread, NULL, &sig_thread, NULL);
439 if (ret) {
440 errno = ret;
441 PERROR("pthread_create");
442 }
443 ret = pthread_detach(thread);
444 if (ret) {
445 errno = ret;
446 PERROR("pthread_detach");
447 }
448 timer_signal.setup_done = 1;
449 end:
450 pthread_mutex_unlock(&timer_signal.lock);
451 }
452
453 /*
454 * Wait for signal-handling thread quiescent state.
455 */
456 static
457 void lib_ring_buffer_wait_signal_thread_qs(unsigned int signr)
458 {
459 sigset_t pending_set;
460 int ret;
461
462 /*
463 * We need to be the only thread interacting with the thread
464 * that manages signals for teardown synchronization.
465 */
466 pthread_mutex_lock(&timer_signal.lock);
467
468 /*
469 * Ensure we don't have any signal queued for this channel.
470 */
471 for (;;) {
472 ret = sigemptyset(&pending_set);
473 if (ret == -1) {
474 PERROR("sigemptyset");
475 }
476 ret = sigpending(&pending_set);
477 if (ret == -1) {
478 PERROR("sigpending");
479 }
480 if (!sigismember(&pending_set, signr))
481 break;
482 caa_cpu_relax();
483 }
484
485 /*
486 * From this point, no new signal handler will be fired that
487 * would try to access "chan". However, we still need to wait
488 * for any currently executing handler to complete.
489 */
490 cmm_smp_mb();
491 CMM_STORE_SHARED(timer_signal.qs_done, 0);
492 cmm_smp_mb();
493
494 /*
495 * Kill with LTTNG_UST_RB_SIG_TEARDOWN, so signal management
496 * thread wakes up.
497 */
498 kill(getpid(), LTTNG_UST_RB_SIG_TEARDOWN);
499
500 while (!CMM_LOAD_SHARED(timer_signal.qs_done))
501 caa_cpu_relax();
502 cmm_smp_mb();
503
504 pthread_mutex_unlock(&timer_signal.lock);
505 }
506
507 static
508 void lib_ring_buffer_channel_switch_timer_start(struct channel *chan)
509 {
510 struct sigevent sev;
511 struct itimerspec its;
512 int ret;
513
514 if (!chan->switch_timer_interval || chan->switch_timer_enabled)
515 return;
516
517 chan->switch_timer_enabled = 1;
518
519 lib_ring_buffer_setup_timer_thread();
520
521 sev.sigev_notify = SIGEV_SIGNAL;
522 sev.sigev_signo = LTTNG_UST_RB_SIG_FLUSH;
523 sev.sigev_value.sival_ptr = chan;
524 ret = timer_create(CLOCKID, &sev, &chan->switch_timer);
525 if (ret == -1) {
526 PERROR("timer_create");
527 }
528
529 its.it_value.tv_sec = chan->switch_timer_interval / 1000000;
530 its.it_value.tv_nsec = chan->switch_timer_interval % 1000000;
531 its.it_interval.tv_sec = its.it_value.tv_sec;
532 its.it_interval.tv_nsec = its.it_value.tv_nsec;
533
534 ret = timer_settime(chan->switch_timer, 0, &its, NULL);
535 if (ret == -1) {
536 PERROR("timer_settime");
537 }
538 }
539
540 static
541 void lib_ring_buffer_channel_switch_timer_stop(struct channel *chan)
542 {
543 int ret;
544
545 if (!chan->switch_timer_interval || !chan->switch_timer_enabled)
546 return;
547
548 ret = timer_delete(chan->switch_timer);
549 if (ret == -1) {
550 PERROR("timer_delete");
551 }
552
553 lib_ring_buffer_wait_signal_thread_qs(LTTNG_UST_RB_SIG_FLUSH);
554
555 chan->switch_timer = 0;
556 chan->switch_timer_enabled = 0;
557 }
558
559 static
560 void lib_ring_buffer_channel_read_timer_start(struct channel *chan)
561 {
562 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
563 struct sigevent sev;
564 struct itimerspec its;
565 int ret;
566
567 if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER
568 || !chan->read_timer_interval || chan->read_timer_enabled)
569 return;
570
571 chan->read_timer_enabled = 1;
572
573 lib_ring_buffer_setup_timer_thread();
574
575 sev.sigev_notify = SIGEV_SIGNAL;
576 sev.sigev_signo = LTTNG_UST_RB_SIG_READ;
577 sev.sigev_value.sival_ptr = chan;
578 ret = timer_create(CLOCKID, &sev, &chan->read_timer);
579 if (ret == -1) {
580 PERROR("timer_create");
581 }
582
583 its.it_value.tv_sec = chan->read_timer_interval / 1000000;
584 its.it_value.tv_nsec = chan->read_timer_interval % 1000000;
585 its.it_interval.tv_sec = its.it_value.tv_sec;
586 its.it_interval.tv_nsec = its.it_value.tv_nsec;
587
588 ret = timer_settime(chan->read_timer, 0, &its, NULL);
589 if (ret == -1) {
590 PERROR("timer_settime");
591 }
592 }
593
594 static
595 void lib_ring_buffer_channel_read_timer_stop(struct channel *chan)
596 {
597 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
598 int ret;
599
600 if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER
601 || !chan->read_timer_interval || !chan->read_timer_enabled)
602 return;
603
604 ret = timer_delete(chan->read_timer);
605 if (ret == -1) {
606 PERROR("timer_delete");
607 }
608
609 /*
610 * do one more check to catch data that has been written in the last
611 * timer period.
612 */
613 lib_ring_buffer_channel_do_read(chan);
614
615 lib_ring_buffer_wait_signal_thread_qs(LTTNG_UST_RB_SIG_READ);
616
617 chan->read_timer = 0;
618 chan->read_timer_enabled = 0;
619 }
620
621 static void channel_unregister_notifiers(struct channel *chan,
622 struct lttng_ust_shm_handle *handle)
623 {
624 lib_ring_buffer_channel_switch_timer_stop(chan);
625 lib_ring_buffer_channel_read_timer_stop(chan);
626 }
627
628 static void channel_print_errors(struct channel *chan,
629 struct lttng_ust_shm_handle *handle)
630 {
631 const struct lttng_ust_lib_ring_buffer_config *config =
632 &chan->backend.config;
633 int cpu;
634
635 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
636 for_each_possible_cpu(cpu) {
637 struct lttng_ust_lib_ring_buffer *buf =
638 shmp(handle, chan->backend.buf[cpu].shmp);
639 lib_ring_buffer_print_errors(chan, buf, cpu, handle);
640 }
641 } else {
642 struct lttng_ust_lib_ring_buffer *buf =
643 shmp(handle, chan->backend.buf[0].shmp);
644
645 lib_ring_buffer_print_errors(chan, buf, -1, handle);
646 }
647 }
648
649 static void channel_free(struct channel *chan,
650 struct lttng_ust_shm_handle *handle)
651 {
652 channel_backend_free(&chan->backend, handle);
653 /* chan is freed by shm teardown */
654 shm_object_table_destroy(handle->table);
655 free(handle);
656 }
657
658 /**
659 * channel_create - Create channel.
660 * @config: ring buffer instance configuration
661 * @name: name of the channel
662 * @priv_data: ring buffer client private data area pointer (output)
663 * @priv_data_size: length, in bytes, of the private data area.
664 * @priv_data_init: initialization data for private data.
665 * @buf_addr: pointer the the beginning of the preallocated buffer contiguous
666 * address mapping. It is used only by RING_BUFFER_STATIC
667 * configuration. It can be set to NULL for other backends.
668 * @subbuf_size: subbuffer size
669 * @num_subbuf: number of subbuffers
670 * @switch_timer_interval: Time interval (in us) to fill sub-buffers with
671 * padding to let readers get those sub-buffers.
672 * Used for live streaming.
673 * @read_timer_interval: Time interval (in us) to wake up pending readers.
674 *
675 * Holds cpu hotplug.
676 * Returns NULL on failure.
677 */
678 struct lttng_ust_shm_handle *channel_create(const struct lttng_ust_lib_ring_buffer_config *config,
679 const char *name,
680 void **priv_data,
681 size_t priv_data_align,
682 size_t priv_data_size,
683 void *priv_data_init,
684 void *buf_addr, size_t subbuf_size,
685 size_t num_subbuf, unsigned int switch_timer_interval,
686 unsigned int read_timer_interval)
687 {
688 int ret;
689 size_t shmsize, chansize;
690 struct channel *chan;
691 struct lttng_ust_shm_handle *handle;
692 struct shm_object *shmobj;
693 unsigned int nr_streams;
694
695 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
696 nr_streams = num_possible_cpus();
697 else
698 nr_streams = 1;
699
700 if (lib_ring_buffer_check_config(config, switch_timer_interval,
701 read_timer_interval))
702 return NULL;
703
704 handle = zmalloc(sizeof(struct lttng_ust_shm_handle));
705 if (!handle)
706 return NULL;
707
708 /* Allocate table for channel + per-cpu buffers */
709 handle->table = shm_object_table_create(1 + num_possible_cpus());
710 if (!handle->table)
711 goto error_table_alloc;
712
713 /* Calculate the shm allocation layout */
714 shmsize = sizeof(struct channel);
715 shmsize += offset_align(shmsize, __alignof__(struct lttng_ust_lib_ring_buffer_shmp));
716 shmsize += sizeof(struct lttng_ust_lib_ring_buffer_shmp) * nr_streams;
717 chansize = shmsize;
718 if (priv_data_align)
719 shmsize += offset_align(shmsize, priv_data_align);
720 shmsize += priv_data_size;
721
722 /* Allocate normal memory for channel (not shared) */
723 shmobj = shm_object_table_alloc(handle->table, shmsize, SHM_OBJECT_MEM);
724 if (!shmobj)
725 goto error_append;
726 /* struct channel is at object 0, offset 0 (hardcoded) */
727 set_shmp(handle->chan, zalloc_shm(shmobj, chansize));
728 assert(handle->chan._ref.index == 0);
729 assert(handle->chan._ref.offset == 0);
730 chan = shmp(handle, handle->chan);
731 if (!chan)
732 goto error_append;
733 chan->nr_streams = nr_streams;
734
735 /* space for private data */
736 if (priv_data_size) {
737 DECLARE_SHMP(void, priv_data_alloc);
738
739 align_shm(shmobj, priv_data_align);
740 chan->priv_data_offset = shmobj->allocated_len;
741 set_shmp(priv_data_alloc, zalloc_shm(shmobj, priv_data_size));
742 if (!shmp(handle, priv_data_alloc))
743 goto error_append;
744 *priv_data = channel_get_private(chan);
745 memcpy(*priv_data, priv_data_init, priv_data_size);
746 } else {
747 chan->priv_data_offset = -1;
748 if (priv_data)
749 *priv_data = NULL;
750 }
751
752 ret = channel_backend_init(&chan->backend, name, config,
753 subbuf_size, num_subbuf, handle);
754 if (ret)
755 goto error_backend_init;
756
757 chan->handle = handle;
758 chan->commit_count_mask = (~0UL >> chan->backend.num_subbuf_order);
759
760 chan->switch_timer_interval = switch_timer_interval;
761 chan->read_timer_interval = read_timer_interval;
762 lib_ring_buffer_channel_switch_timer_start(chan);
763 lib_ring_buffer_channel_read_timer_start(chan);
764
765 return handle;
766
767 error_backend_init:
768 error_append:
769 shm_object_table_destroy(handle->table);
770 error_table_alloc:
771 free(handle);
772 return NULL;
773 }
774
775 struct lttng_ust_shm_handle *channel_handle_create(void *data,
776 uint64_t memory_map_size,
777 int wakeup_fd)
778 {
779 struct lttng_ust_shm_handle *handle;
780 struct shm_object *object;
781
782 handle = zmalloc(sizeof(struct lttng_ust_shm_handle));
783 if (!handle)
784 return NULL;
785
786 /* Allocate table for channel + per-cpu buffers */
787 handle->table = shm_object_table_create(1 + num_possible_cpus());
788 if (!handle->table)
789 goto error_table_alloc;
790 /* Add channel object */
791 object = shm_object_table_append_mem(handle->table, data,
792 memory_map_size, wakeup_fd);
793 if (!object)
794 goto error_table_object;
795 /* struct channel is at object 0, offset 0 (hardcoded) */
796 handle->chan._ref.index = 0;
797 handle->chan._ref.offset = 0;
798 return handle;
799
800 error_table_object:
801 shm_object_table_destroy(handle->table);
802 error_table_alloc:
803 free(handle);
804 return NULL;
805 }
806
807 int channel_handle_add_stream(struct lttng_ust_shm_handle *handle,
808 int shm_fd, int wakeup_fd, uint32_t stream_nr,
809 uint64_t memory_map_size)
810 {
811 struct shm_object *object;
812
813 /* Add stream object */
814 object = shm_object_table_append_shm(handle->table,
815 shm_fd, wakeup_fd, stream_nr,
816 memory_map_size);
817 if (!object)
818 return -EINVAL;
819 return 0;
820 }
821
822 unsigned int channel_handle_get_nr_streams(struct lttng_ust_shm_handle *handle)
823 {
824 assert(handle->table);
825 return handle->table->allocated_len - 1;
826 }
827
828 static
829 void channel_release(struct channel *chan, struct lttng_ust_shm_handle *handle)
830 {
831 channel_free(chan, handle);
832 }
833
834 /**
835 * channel_destroy - Finalize, wait for q.s. and destroy channel.
836 * @chan: channel to destroy
837 *
838 * Holds cpu hotplug.
839 * Call "destroy" callback, finalize channels, decrement the channel
840 * reference count. Note that when readers have completed data
841 * consumption of finalized channels, get_subbuf() will return -ENODATA.
842 * They should release their handle at that point.
843 */
844 void channel_destroy(struct channel *chan, struct lttng_ust_shm_handle *handle,
845 int consumer)
846 {
847 if (consumer) {
848 /*
849 * Note: the consumer takes care of finalizing and
850 * switching the buffers.
851 */
852 channel_unregister_notifiers(chan, handle);
853 /*
854 * The consumer prints errors.
855 */
856 channel_print_errors(chan, handle);
857 }
858
859 /*
860 * sessiond/consumer are keeping a reference on the shm file
861 * descriptor directly. No need to refcount.
862 */
863 channel_release(chan, handle);
864 return;
865 }
866
867 struct lttng_ust_lib_ring_buffer *channel_get_ring_buffer(
868 const struct lttng_ust_lib_ring_buffer_config *config,
869 struct channel *chan, int cpu,
870 struct lttng_ust_shm_handle *handle,
871 int *shm_fd, int *wait_fd,
872 int *wakeup_fd,
873 uint64_t *memory_map_size)
874 {
875 struct shm_ref *ref;
876
877 if (config->alloc == RING_BUFFER_ALLOC_GLOBAL) {
878 cpu = 0;
879 } else {
880 if (cpu >= num_possible_cpus())
881 return NULL;
882 }
883 ref = &chan->backend.buf[cpu].shmp._ref;
884 *shm_fd = shm_get_shm_fd(handle, ref);
885 *wait_fd = shm_get_wait_fd(handle, ref);
886 *wakeup_fd = shm_get_wakeup_fd(handle, ref);
887 if (shm_get_shm_size(handle, ref, memory_map_size))
888 return NULL;
889 return shmp(handle, chan->backend.buf[cpu].shmp);
890 }
891
892 int ring_buffer_channel_close_wait_fd(const struct lttng_ust_lib_ring_buffer_config *config,
893 struct channel *chan,
894 struct lttng_ust_shm_handle *handle)
895 {
896 struct shm_ref *ref;
897
898 ref = &handle->chan._ref;
899 return shm_close_wait_fd(handle, ref);
900 }
901
902 int ring_buffer_channel_close_wakeup_fd(const struct lttng_ust_lib_ring_buffer_config *config,
903 struct channel *chan,
904 struct lttng_ust_shm_handle *handle)
905 {
906 struct shm_ref *ref;
907
908 ref = &handle->chan._ref;
909 return shm_close_wakeup_fd(handle, ref);
910 }
911
912 int ring_buffer_stream_close_wait_fd(const struct lttng_ust_lib_ring_buffer_config *config,
913 struct channel *chan,
914 struct lttng_ust_shm_handle *handle,
915 int cpu)
916 {
917 struct shm_ref *ref;
918
919 if (config->alloc == RING_BUFFER_ALLOC_GLOBAL) {
920 cpu = 0;
921 } else {
922 if (cpu >= num_possible_cpus())
923 return -EINVAL;
924 }
925 ref = &chan->backend.buf[cpu].shmp._ref;
926 return shm_close_wait_fd(handle, ref);
927 }
928
929 int ring_buffer_stream_close_wakeup_fd(const struct lttng_ust_lib_ring_buffer_config *config,
930 struct channel *chan,
931 struct lttng_ust_shm_handle *handle,
932 int cpu)
933 {
934 struct shm_ref *ref;
935 int ret;
936
937 if (config->alloc == RING_BUFFER_ALLOC_GLOBAL) {
938 cpu = 0;
939 } else {
940 if (cpu >= num_possible_cpus())
941 return -EINVAL;
942 }
943 ref = &chan->backend.buf[cpu].shmp._ref;
944 pthread_mutex_lock(&wakeup_fd_mutex);
945 ret = shm_close_wakeup_fd(handle, ref);
946 pthread_mutex_unlock(&wakeup_fd_mutex);
947 return ret;
948 }
949
950 int lib_ring_buffer_open_read(struct lttng_ust_lib_ring_buffer *buf,
951 struct lttng_ust_shm_handle *handle)
952 {
953 if (uatomic_cmpxchg(&buf->active_readers, 0, 1) != 0)
954 return -EBUSY;
955 cmm_smp_mb();
956 return 0;
957 }
958
959 void lib_ring_buffer_release_read(struct lttng_ust_lib_ring_buffer *buf,
960 struct lttng_ust_shm_handle *handle)
961 {
962 struct channel *chan = shmp(handle, buf->backend.chan);
963
964 CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1);
965 cmm_smp_mb();
966 uatomic_dec(&buf->active_readers);
967 }
968
969 /**
970 * lib_ring_buffer_snapshot - save subbuffer position snapshot (for read)
971 * @buf: ring buffer
972 * @consumed: consumed count indicating the position where to read
973 * @produced: produced count, indicates position when to stop reading
974 *
975 * Returns -ENODATA if buffer is finalized, -EAGAIN if there is currently no
976 * data to read at consumed position, or 0 if the get operation succeeds.
977 */
978
979 int lib_ring_buffer_snapshot(struct lttng_ust_lib_ring_buffer *buf,
980 unsigned long *consumed, unsigned long *produced,
981 struct lttng_ust_shm_handle *handle)
982 {
983 struct channel *chan = shmp(handle, buf->backend.chan);
984 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
985 unsigned long consumed_cur, write_offset;
986 int finalized;
987
988 finalized = CMM_ACCESS_ONCE(buf->finalized);
989 /*
990 * Read finalized before counters.
991 */
992 cmm_smp_rmb();
993 consumed_cur = uatomic_read(&buf->consumed);
994 /*
995 * No need to issue a memory barrier between consumed count read and
996 * write offset read, because consumed count can only change
997 * concurrently in overwrite mode, and we keep a sequence counter
998 * identifier derived from the write offset to check we are getting
999 * the same sub-buffer we are expecting (the sub-buffers are atomically
1000 * "tagged" upon writes, tags are checked upon read).
1001 */
1002 write_offset = v_read(config, &buf->offset);
1003
1004 /*
1005 * Check that we are not about to read the same subbuffer in
1006 * which the writer head is.
1007 */
1008 if (subbuf_trunc(write_offset, chan) - subbuf_trunc(consumed_cur, chan)
1009 == 0)
1010 goto nodata;
1011
1012 *consumed = consumed_cur;
1013 *produced = subbuf_trunc(write_offset, chan);
1014
1015 return 0;
1016
1017 nodata:
1018 /*
1019 * The memory barriers __wait_event()/wake_up_interruptible() take care
1020 * of "raw_spin_is_locked" memory ordering.
1021 */
1022 if (finalized)
1023 return -ENODATA;
1024 else
1025 return -EAGAIN;
1026 }
1027
1028 /**
1029 * lib_ring_buffer_move_consumer - move consumed counter forward
1030 * @buf: ring buffer
1031 * @consumed_new: new consumed count value
1032 */
1033 void lib_ring_buffer_move_consumer(struct lttng_ust_lib_ring_buffer *buf,
1034 unsigned long consumed_new,
1035 struct lttng_ust_shm_handle *handle)
1036 {
1037 struct lttng_ust_lib_ring_buffer_backend *bufb = &buf->backend;
1038 struct channel *chan = shmp(handle, bufb->chan);
1039 unsigned long consumed;
1040
1041 CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1);
1042
1043 /*
1044 * Only push the consumed value forward.
1045 * If the consumed cmpxchg fails, this is because we have been pushed by
1046 * the writer in flight recorder mode.
1047 */
1048 consumed = uatomic_read(&buf->consumed);
1049 while ((long) consumed - (long) consumed_new < 0)
1050 consumed = uatomic_cmpxchg(&buf->consumed, consumed,
1051 consumed_new);
1052 }
1053
1054 /**
1055 * lib_ring_buffer_get_subbuf - get exclusive access to subbuffer for reading
1056 * @buf: ring buffer
1057 * @consumed: consumed count indicating the position where to read
1058 *
1059 * Returns -ENODATA if buffer is finalized, -EAGAIN if there is currently no
1060 * data to read at consumed position, or 0 if the get operation succeeds.
1061 */
1062 int lib_ring_buffer_get_subbuf(struct lttng_ust_lib_ring_buffer *buf,
1063 unsigned long consumed,
1064 struct lttng_ust_shm_handle *handle)
1065 {
1066 struct channel *chan = shmp(handle, buf->backend.chan);
1067 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
1068 unsigned long consumed_cur, consumed_idx, commit_count, write_offset;
1069 int ret;
1070 int finalized;
1071
1072 retry:
1073 finalized = CMM_ACCESS_ONCE(buf->finalized);
1074 /*
1075 * Read finalized before counters.
1076 */
1077 cmm_smp_rmb();
1078 consumed_cur = uatomic_read(&buf->consumed);
1079 consumed_idx = subbuf_index(consumed, chan);
1080 commit_count = v_read(config, &shmp_index(handle, buf->commit_cold, consumed_idx)->cc_sb);
1081 /*
1082 * Make sure we read the commit count before reading the buffer
1083 * data and the write offset. Correct consumed offset ordering
1084 * wrt commit count is insured by the use of cmpxchg to update
1085 * the consumed offset.
1086 */
1087 /*
1088 * Local rmb to match the remote wmb to read the commit count
1089 * before the buffer data and the write offset.
1090 */
1091 cmm_smp_rmb();
1092
1093 write_offset = v_read(config, &buf->offset);
1094
1095 /*
1096 * Check that the buffer we are getting is after or at consumed_cur
1097 * position.
1098 */
1099 if ((long) subbuf_trunc(consumed, chan)
1100 - (long) subbuf_trunc(consumed_cur, chan) < 0)
1101 goto nodata;
1102
1103 /*
1104 * Check that the subbuffer we are trying to consume has been
1105 * already fully committed.
1106 */
1107 if (((commit_count - chan->backend.subbuf_size)
1108 & chan->commit_count_mask)
1109 - (buf_trunc(consumed_cur, chan)
1110 >> chan->backend.num_subbuf_order)
1111 != 0)
1112 goto nodata;
1113
1114 /*
1115 * Check that we are not about to read the same subbuffer in
1116 * which the writer head is.
1117 */
1118 if (subbuf_trunc(write_offset, chan) - subbuf_trunc(consumed_cur, chan)
1119 == 0)
1120 goto nodata;
1121
1122 /*
1123 * Failure to get the subbuffer causes a busy-loop retry without going
1124 * to a wait queue. These are caused by short-lived race windows where
1125 * the writer is getting access to a subbuffer we were trying to get
1126 * access to. Also checks that the "consumed" buffer count we are
1127 * looking for matches the one contained in the subbuffer id.
1128 */
1129 ret = update_read_sb_index(config, &buf->backend, &chan->backend,
1130 consumed_idx, buf_trunc_val(consumed, chan),
1131 handle);
1132 if (ret)
1133 goto retry;
1134 subbuffer_id_clear_noref(config, &buf->backend.buf_rsb.id);
1135
1136 buf->get_subbuf_consumed = consumed;
1137 buf->get_subbuf = 1;
1138
1139 return 0;
1140
1141 nodata:
1142 /*
1143 * The memory barriers __wait_event()/wake_up_interruptible() take care
1144 * of "raw_spin_is_locked" memory ordering.
1145 */
1146 if (finalized)
1147 return -ENODATA;
1148 else
1149 return -EAGAIN;
1150 }
1151
1152 /**
1153 * lib_ring_buffer_put_subbuf - release exclusive subbuffer access
1154 * @buf: ring buffer
1155 */
1156 void lib_ring_buffer_put_subbuf(struct lttng_ust_lib_ring_buffer *buf,
1157 struct lttng_ust_shm_handle *handle)
1158 {
1159 struct lttng_ust_lib_ring_buffer_backend *bufb = &buf->backend;
1160 struct channel *chan = shmp(handle, bufb->chan);
1161 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
1162 unsigned long read_sb_bindex, consumed_idx, consumed;
1163
1164 CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1);
1165
1166 if (!buf->get_subbuf) {
1167 /*
1168 * Reader puts a subbuffer it did not get.
1169 */
1170 CHAN_WARN_ON(chan, 1);
1171 return;
1172 }
1173 consumed = buf->get_subbuf_consumed;
1174 buf->get_subbuf = 0;
1175
1176 /*
1177 * Clear the records_unread counter. (overruns counter)
1178 * Can still be non-zero if a file reader simply grabbed the data
1179 * without using iterators.
1180 * Can be below zero if an iterator is used on a snapshot more than
1181 * once.
1182 */
1183 read_sb_bindex = subbuffer_id_get_index(config, bufb->buf_rsb.id);
1184 v_add(config, v_read(config,
1185 &shmp(handle, shmp_index(handle, bufb->array, read_sb_bindex)->shmp)->records_unread),
1186 &bufb->records_read);
1187 v_set(config, &shmp(handle, shmp_index(handle, bufb->array, read_sb_bindex)->shmp)->records_unread, 0);
1188 CHAN_WARN_ON(chan, config->mode == RING_BUFFER_OVERWRITE
1189 && subbuffer_id_is_noref(config, bufb->buf_rsb.id));
1190 subbuffer_id_set_noref(config, &bufb->buf_rsb.id);
1191
1192 /*
1193 * Exchange the reader subbuffer with the one we put in its place in the
1194 * writer subbuffer table. Expect the original consumed count. If
1195 * update_read_sb_index fails, this is because the writer updated the
1196 * subbuffer concurrently. We should therefore keep the subbuffer we
1197 * currently have: it has become invalid to try reading this sub-buffer
1198 * consumed count value anyway.
1199 */
1200 consumed_idx = subbuf_index(consumed, chan);
1201 update_read_sb_index(config, &buf->backend, &chan->backend,
1202 consumed_idx, buf_trunc_val(consumed, chan),
1203 handle);
1204 /*
1205 * update_read_sb_index return value ignored. Don't exchange sub-buffer
1206 * if the writer concurrently updated it.
1207 */
1208 }
1209
1210 /*
1211 * cons_offset is an iterator on all subbuffer offsets between the reader
1212 * position and the writer position. (inclusive)
1213 */
1214 static
1215 void lib_ring_buffer_print_subbuffer_errors(struct lttng_ust_lib_ring_buffer *buf,
1216 struct channel *chan,
1217 unsigned long cons_offset,
1218 int cpu,
1219 struct lttng_ust_shm_handle *handle)
1220 {
1221 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
1222 unsigned long cons_idx, commit_count, commit_count_sb;
1223
1224 cons_idx = subbuf_index(cons_offset, chan);
1225 commit_count = v_read(config, &shmp_index(handle, buf->commit_hot, cons_idx)->cc);
1226 commit_count_sb = v_read(config, &shmp_index(handle, buf->commit_cold, cons_idx)->cc_sb);
1227
1228 if (subbuf_offset(commit_count, chan) != 0)
1229 DBG("ring buffer %s, cpu %d: "
1230 "commit count in subbuffer %lu,\n"
1231 "expecting multiples of %lu bytes\n"
1232 " [ %lu bytes committed, %lu bytes reader-visible ]\n",
1233 chan->backend.name, cpu, cons_idx,
1234 chan->backend.subbuf_size,
1235 commit_count, commit_count_sb);
1236
1237 DBG("ring buffer: %s, cpu %d: %lu bytes committed\n",
1238 chan->backend.name, cpu, commit_count);
1239 }
1240
1241 static
1242 void lib_ring_buffer_print_buffer_errors(struct lttng_ust_lib_ring_buffer *buf,
1243 struct channel *chan,
1244 void *priv, int cpu,
1245 struct lttng_ust_shm_handle *handle)
1246 {
1247 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
1248 unsigned long write_offset, cons_offset;
1249
1250 /*
1251 * No need to order commit_count, write_offset and cons_offset reads
1252 * because we execute at teardown when no more writer nor reader
1253 * references are left.
1254 */
1255 write_offset = v_read(config, &buf->offset);
1256 cons_offset = uatomic_read(&buf->consumed);
1257 if (write_offset != cons_offset)
1258 DBG("ring buffer %s, cpu %d: "
1259 "non-consumed data\n"
1260 " [ %lu bytes written, %lu bytes read ]\n",
1261 chan->backend.name, cpu, write_offset, cons_offset);
1262
1263 for (cons_offset = uatomic_read(&buf->consumed);
1264 (long) (subbuf_trunc((unsigned long) v_read(config, &buf->offset),
1265 chan)
1266 - cons_offset) > 0;
1267 cons_offset = subbuf_align(cons_offset, chan))
1268 lib_ring_buffer_print_subbuffer_errors(buf, chan, cons_offset,
1269 cpu, handle);
1270 }
1271
1272 static
1273 void lib_ring_buffer_print_errors(struct channel *chan,
1274 struct lttng_ust_lib_ring_buffer *buf, int cpu,
1275 struct lttng_ust_shm_handle *handle)
1276 {
1277 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
1278 void *priv = channel_get_private(chan);
1279
1280 if (!strcmp(chan->backend.name, "relay-metadata-mmap")) {
1281 DBG("ring buffer %s: %lu records written, "
1282 "%lu records overrun\n",
1283 chan->backend.name,
1284 v_read(config, &buf->records_count),
1285 v_read(config, &buf->records_overrun));
1286 } else {
1287 DBG("ring buffer %s, cpu %d: %lu records written, "
1288 "%lu records overrun\n",
1289 chan->backend.name, cpu,
1290 v_read(config, &buf->records_count),
1291 v_read(config, &buf->records_overrun));
1292
1293 if (v_read(config, &buf->records_lost_full)
1294 || v_read(config, &buf->records_lost_wrap)
1295 || v_read(config, &buf->records_lost_big))
1296 DBG("ring buffer %s, cpu %d: records were lost. Caused by:\n"
1297 " [ %lu buffer full, %lu nest buffer wrap-around, "
1298 "%lu event too big ]\n",
1299 chan->backend.name, cpu,
1300 v_read(config, &buf->records_lost_full),
1301 v_read(config, &buf->records_lost_wrap),
1302 v_read(config, &buf->records_lost_big));
1303 }
1304 lib_ring_buffer_print_buffer_errors(buf, chan, priv, cpu, handle);
1305 }
1306
1307 /*
1308 * lib_ring_buffer_switch_old_start: Populate old subbuffer header.
1309 *
1310 * Only executed when the buffer is finalized, in SWITCH_FLUSH.
1311 */
1312 static
1313 void lib_ring_buffer_switch_old_start(struct lttng_ust_lib_ring_buffer *buf,
1314 struct channel *chan,
1315 struct switch_offsets *offsets,
1316 uint64_t tsc,
1317 struct lttng_ust_shm_handle *handle)
1318 {
1319 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
1320 unsigned long oldidx = subbuf_index(offsets->old, chan);
1321 unsigned long commit_count;
1322
1323 config->cb.buffer_begin(buf, tsc, oldidx, handle);
1324
1325 /*
1326 * Order all writes to buffer before the commit count update that will
1327 * determine that the subbuffer is full.
1328 */
1329 cmm_smp_wmb();
1330 v_add(config, config->cb.subbuffer_header_size(),
1331 &shmp_index(handle, buf->commit_hot, oldidx)->cc);
1332 commit_count = v_read(config, &shmp_index(handle, buf->commit_hot, oldidx)->cc);
1333 /* Check if the written buffer has to be delivered */
1334 lib_ring_buffer_check_deliver(config, buf, chan, offsets->old,
1335 commit_count, oldidx, handle);
1336 lib_ring_buffer_write_commit_counter(config, buf, chan, oldidx,
1337 offsets->old, commit_count,
1338 config->cb.subbuffer_header_size(),
1339 handle);
1340 }
1341
1342 /*
1343 * lib_ring_buffer_switch_old_end: switch old subbuffer
1344 *
1345 * Note : offset_old should never be 0 here. It is ok, because we never perform
1346 * buffer switch on an empty subbuffer in SWITCH_ACTIVE mode. The caller
1347 * increments the offset_old value when doing a SWITCH_FLUSH on an empty
1348 * subbuffer.
1349 */
1350 static
1351 void lib_ring_buffer_switch_old_end(struct lttng_ust_lib_ring_buffer *buf,
1352 struct channel *chan,
1353 struct switch_offsets *offsets,
1354 uint64_t tsc,
1355 struct lttng_ust_shm_handle *handle)
1356 {
1357 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
1358 unsigned long oldidx = subbuf_index(offsets->old - 1, chan);
1359 unsigned long commit_count, padding_size, data_size;
1360
1361 data_size = subbuf_offset(offsets->old - 1, chan) + 1;
1362 padding_size = chan->backend.subbuf_size - data_size;
1363 subbuffer_set_data_size(config, &buf->backend, oldidx, data_size,
1364 handle);
1365
1366 /*
1367 * Order all writes to buffer before the commit count update that will
1368 * determine that the subbuffer is full.
1369 */
1370 cmm_smp_wmb();
1371 v_add(config, padding_size, &shmp_index(handle, buf->commit_hot, oldidx)->cc);
1372 commit_count = v_read(config, &shmp_index(handle, buf->commit_hot, oldidx)->cc);
1373 lib_ring_buffer_check_deliver(config, buf, chan, offsets->old - 1,
1374 commit_count, oldidx, handle);
1375 lib_ring_buffer_write_commit_counter(config, buf, chan, oldidx,
1376 offsets->old, commit_count,
1377 padding_size, handle);
1378 }
1379
1380 /*
1381 * lib_ring_buffer_switch_new_start: Populate new subbuffer.
1382 *
1383 * This code can be executed unordered : writers may already have written to the
1384 * sub-buffer before this code gets executed, caution. The commit makes sure
1385 * that this code is executed before the deliver of this sub-buffer.
1386 */
1387 static
1388 void lib_ring_buffer_switch_new_start(struct lttng_ust_lib_ring_buffer *buf,
1389 struct channel *chan,
1390 struct switch_offsets *offsets,
1391 uint64_t tsc,
1392 struct lttng_ust_shm_handle *handle)
1393 {
1394 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
1395 unsigned long beginidx = subbuf_index(offsets->begin, chan);
1396 unsigned long commit_count;
1397
1398 config->cb.buffer_begin(buf, tsc, beginidx, handle);
1399
1400 /*
1401 * Order all writes to buffer before the commit count update that will
1402 * determine that the subbuffer is full.
1403 */
1404 cmm_smp_wmb();
1405 v_add(config, config->cb.subbuffer_header_size(),
1406 &shmp_index(handle, buf->commit_hot, beginidx)->cc);
1407 commit_count = v_read(config, &shmp_index(handle, buf->commit_hot, beginidx)->cc);
1408 /* Check if the written buffer has to be delivered */
1409 lib_ring_buffer_check_deliver(config, buf, chan, offsets->begin,
1410 commit_count, beginidx, handle);
1411 lib_ring_buffer_write_commit_counter(config, buf, chan, beginidx,
1412 offsets->begin, commit_count,
1413 config->cb.subbuffer_header_size(),
1414 handle);
1415 }
1416
1417 /*
1418 * Returns :
1419 * 0 if ok
1420 * !0 if execution must be aborted.
1421 */
1422 static
1423 int lib_ring_buffer_try_switch_slow(enum switch_mode mode,
1424 struct lttng_ust_lib_ring_buffer *buf,
1425 struct channel *chan,
1426 struct switch_offsets *offsets,
1427 uint64_t *tsc)
1428 {
1429 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
1430 unsigned long off;
1431
1432 offsets->begin = v_read(config, &buf->offset);
1433 offsets->old = offsets->begin;
1434 offsets->switch_old_start = 0;
1435 off = subbuf_offset(offsets->begin, chan);
1436
1437 *tsc = config->cb.ring_buffer_clock_read(chan);
1438
1439 /*
1440 * Ensure we flush the header of an empty subbuffer when doing the
1441 * finalize (SWITCH_FLUSH). This ensures that we end up knowing the
1442 * total data gathering duration even if there were no records saved
1443 * after the last buffer switch.
1444 * In SWITCH_ACTIVE mode, switch the buffer when it contains events.
1445 * SWITCH_ACTIVE only flushes the current subbuffer, dealing with end of
1446 * subbuffer header as appropriate.
1447 * The next record that reserves space will be responsible for
1448 * populating the following subbuffer header. We choose not to populate
1449 * the next subbuffer header here because we want to be able to use
1450 * SWITCH_ACTIVE for periodical buffer flush, which must
1451 * guarantee that all the buffer content (records and header
1452 * timestamps) are visible to the reader. This is required for
1453 * quiescence guarantees for the fusion merge.
1454 */
1455 if (mode == SWITCH_FLUSH || off > 0) {
1456 if (caa_unlikely(off == 0)) {
1457 /*
1458 * A final flush that encounters an empty
1459 * sub-buffer cannot switch buffer if a
1460 * reader is located within this sub-buffer.
1461 * Anyway, the purpose of final flushing of a
1462 * sub-buffer at offset 0 is to handle the case
1463 * of entirely empty stream.
1464 */
1465 if (caa_unlikely(subbuf_trunc(offsets->begin, chan)
1466 - subbuf_trunc((unsigned long)
1467 uatomic_read(&buf->consumed), chan)
1468 >= chan->backend.buf_size))
1469 return -1;
1470 /*
1471 * The client does not save any header information.
1472 * Don't switch empty subbuffer on finalize, because it
1473 * is invalid to deliver a completely empty subbuffer.
1474 */
1475 if (!config->cb.subbuffer_header_size())
1476 return -1;
1477 /*
1478 * Need to write the subbuffer start header on finalize.
1479 */
1480 offsets->switch_old_start = 1;
1481 }
1482 offsets->begin = subbuf_align(offsets->begin, chan);
1483 } else
1484 return -1; /* we do not have to switch : buffer is empty */
1485 /* Note: old points to the next subbuf at offset 0 */
1486 offsets->end = offsets->begin;
1487 return 0;
1488 }
1489
1490 /*
1491 * Force a sub-buffer switch. This operation is completely reentrant : can be
1492 * called while tracing is active with absolutely no lock held.
1493 *
1494 * Note, however, that as a v_cmpxchg is used for some atomic
1495 * operations, this function must be called from the CPU which owns the buffer
1496 * for a ACTIVE flush.
1497 */
1498 void lib_ring_buffer_switch_slow(struct lttng_ust_lib_ring_buffer *buf, enum switch_mode mode,
1499 struct lttng_ust_shm_handle *handle)
1500 {
1501 struct channel *chan = shmp(handle, buf->backend.chan);
1502 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
1503 struct switch_offsets offsets;
1504 unsigned long oldidx;
1505 uint64_t tsc;
1506
1507 offsets.size = 0;
1508
1509 /*
1510 * Perform retryable operations.
1511 */
1512 do {
1513 if (lib_ring_buffer_try_switch_slow(mode, buf, chan, &offsets,
1514 &tsc))
1515 return; /* Switch not needed */
1516 } while (v_cmpxchg(config, &buf->offset, offsets.old, offsets.end)
1517 != offsets.old);
1518
1519 /*
1520 * Atomically update last_tsc. This update races against concurrent
1521 * atomic updates, but the race will always cause supplementary full TSC
1522 * records, never the opposite (missing a full TSC record when it would
1523 * be needed).
1524 */
1525 save_last_tsc(config, buf, tsc);
1526
1527 /*
1528 * Push the reader if necessary
1529 */
1530 lib_ring_buffer_reserve_push_reader(buf, chan, offsets.old);
1531
1532 oldidx = subbuf_index(offsets.old, chan);
1533 lib_ring_buffer_clear_noref(config, &buf->backend, oldidx, handle);
1534
1535 /*
1536 * May need to populate header start on SWITCH_FLUSH.
1537 */
1538 if (offsets.switch_old_start) {
1539 lib_ring_buffer_switch_old_start(buf, chan, &offsets, tsc, handle);
1540 offsets.old += config->cb.subbuffer_header_size();
1541 }
1542
1543 /*
1544 * Switch old subbuffer.
1545 */
1546 lib_ring_buffer_switch_old_end(buf, chan, &offsets, tsc, handle);
1547 }
1548
1549 /*
1550 * Returns :
1551 * 0 if ok
1552 * -ENOSPC if event size is too large for packet.
1553 * -ENOBUFS if there is currently not enough space in buffer for the event.
1554 * -EIO if data cannot be written into the buffer for any other reason.
1555 */
1556 static
1557 int lib_ring_buffer_try_reserve_slow(struct lttng_ust_lib_ring_buffer *buf,
1558 struct channel *chan,
1559 struct switch_offsets *offsets,
1560 struct lttng_ust_lib_ring_buffer_ctx *ctx)
1561 {
1562 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
1563 struct lttng_ust_shm_handle *handle = ctx->handle;
1564 unsigned long reserve_commit_diff;
1565
1566 offsets->begin = v_read(config, &buf->offset);
1567 offsets->old = offsets->begin;
1568 offsets->switch_new_start = 0;
1569 offsets->switch_old_end = 0;
1570 offsets->pre_header_padding = 0;
1571
1572 ctx->tsc = config->cb.ring_buffer_clock_read(chan);
1573 if ((int64_t) ctx->tsc == -EIO)
1574 return -EIO;
1575
1576 if (last_tsc_overflow(config, buf, ctx->tsc))
1577 ctx->rflags |= RING_BUFFER_RFLAG_FULL_TSC;
1578
1579 if (caa_unlikely(subbuf_offset(offsets->begin, ctx->chan) == 0)) {
1580 offsets->switch_new_start = 1; /* For offsets->begin */
1581 } else {
1582 offsets->size = config->cb.record_header_size(config, chan,
1583 offsets->begin,
1584 &offsets->pre_header_padding,
1585 ctx);
1586 offsets->size +=
1587 lib_ring_buffer_align(offsets->begin + offsets->size,
1588 ctx->largest_align)
1589 + ctx->data_size;
1590 if (caa_unlikely(subbuf_offset(offsets->begin, chan) +
1591 offsets->size > chan->backend.subbuf_size)) {
1592 offsets->switch_old_end = 1; /* For offsets->old */
1593 offsets->switch_new_start = 1; /* For offsets->begin */
1594 }
1595 }
1596 if (caa_unlikely(offsets->switch_new_start)) {
1597 unsigned long sb_index;
1598
1599 /*
1600 * We are typically not filling the previous buffer completely.
1601 */
1602 if (caa_likely(offsets->switch_old_end))
1603 offsets->begin = subbuf_align(offsets->begin, chan);
1604 offsets->begin = offsets->begin
1605 + config->cb.subbuffer_header_size();
1606 /* Test new buffer integrity */
1607 sb_index = subbuf_index(offsets->begin, chan);
1608 reserve_commit_diff =
1609 (buf_trunc(offsets->begin, chan)
1610 >> chan->backend.num_subbuf_order)
1611 - ((unsigned long) v_read(config,
1612 &shmp_index(handle, buf->commit_cold, sb_index)->cc_sb)
1613 & chan->commit_count_mask);
1614 if (caa_likely(reserve_commit_diff == 0)) {
1615 /* Next subbuffer not being written to. */
1616 if (caa_unlikely(config->mode != RING_BUFFER_OVERWRITE &&
1617 subbuf_trunc(offsets->begin, chan)
1618 - subbuf_trunc((unsigned long)
1619 uatomic_read(&buf->consumed), chan)
1620 >= chan->backend.buf_size)) {
1621 unsigned long nr_lost;
1622
1623 /*
1624 * We do not overwrite non consumed buffers
1625 * and we are full : record is lost.
1626 */
1627 nr_lost = v_read(config, &buf->records_lost_full);
1628 v_inc(config, &buf->records_lost_full);
1629 if ((nr_lost & (DBG_PRINT_NR_LOST - 1)) == 0) {
1630 DBG("%lu or more records lost in (%s:%d) (buffer full)\n",
1631 nr_lost + 1, chan->backend.name,
1632 buf->backend.cpu);
1633 }
1634 return -ENOBUFS;
1635 } else {
1636 /*
1637 * Next subbuffer not being written to, and we
1638 * are either in overwrite mode or the buffer is
1639 * not full. It's safe to write in this new
1640 * subbuffer.
1641 */
1642 }
1643 } else {
1644 unsigned long nr_lost;
1645
1646 /*
1647 * Next subbuffer reserve offset does not match the
1648 * commit offset. Drop record in producer-consumer and
1649 * overwrite mode. Caused by either a writer OOPS or too
1650 * many nested writes over a reserve/commit pair.
1651 */
1652 nr_lost = v_read(config, &buf->records_lost_wrap);
1653 v_inc(config, &buf->records_lost_wrap);
1654 if ((nr_lost & (DBG_PRINT_NR_LOST - 1)) == 0) {
1655 DBG("%lu or more records lost in (%s:%d) (wrap-around)\n",
1656 nr_lost + 1, chan->backend.name,
1657 buf->backend.cpu);
1658 }
1659 return -EIO;
1660 }
1661 offsets->size =
1662 config->cb.record_header_size(config, chan,
1663 offsets->begin,
1664 &offsets->pre_header_padding,
1665 ctx);
1666 offsets->size +=
1667 lib_ring_buffer_align(offsets->begin + offsets->size,
1668 ctx->largest_align)
1669 + ctx->data_size;
1670 if (caa_unlikely(subbuf_offset(offsets->begin, chan)
1671 + offsets->size > chan->backend.subbuf_size)) {
1672 unsigned long nr_lost;
1673
1674 /*
1675 * Record too big for subbuffers, report error, don't
1676 * complete the sub-buffer switch.
1677 */
1678 nr_lost = v_read(config, &buf->records_lost_big);
1679 v_inc(config, &buf->records_lost_big);
1680 if ((nr_lost & (DBG_PRINT_NR_LOST - 1)) == 0) {
1681 DBG("%lu or more records lost in (%s:%d) record size "
1682 " of %zu bytes is too large for buffer\n",
1683 nr_lost + 1, chan->backend.name,
1684 buf->backend.cpu, offsets->size);
1685 }
1686 return -ENOSPC;
1687 } else {
1688 /*
1689 * We just made a successful buffer switch and the
1690 * record fits in the new subbuffer. Let's write.
1691 */
1692 }
1693 } else {
1694 /*
1695 * Record fits in the current buffer and we are not on a switch
1696 * boundary. It's safe to write.
1697 */
1698 }
1699 offsets->end = offsets->begin + offsets->size;
1700 return 0;
1701 }
1702
1703 /**
1704 * lib_ring_buffer_reserve_slow - Atomic slot reservation in a buffer.
1705 * @ctx: ring buffer context.
1706 *
1707 * Return : -NOBUFS if not enough space, -ENOSPC if event size too large,
1708 * -EIO for other errors, else returns 0.
1709 * It will take care of sub-buffer switching.
1710 */
1711 int lib_ring_buffer_reserve_slow(struct lttng_ust_lib_ring_buffer_ctx *ctx)
1712 {
1713 struct channel *chan = ctx->chan;
1714 struct lttng_ust_shm_handle *handle = ctx->handle;
1715 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
1716 struct lttng_ust_lib_ring_buffer *buf;
1717 struct switch_offsets offsets;
1718 int ret;
1719
1720 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
1721 buf = shmp(handle, chan->backend.buf[ctx->cpu].shmp);
1722 else
1723 buf = shmp(handle, chan->backend.buf[0].shmp);
1724 ctx->buf = buf;
1725
1726 offsets.size = 0;
1727
1728 do {
1729 ret = lib_ring_buffer_try_reserve_slow(buf, chan, &offsets,
1730 ctx);
1731 if (caa_unlikely(ret))
1732 return ret;
1733 } while (caa_unlikely(v_cmpxchg(config, &buf->offset, offsets.old,
1734 offsets.end)
1735 != offsets.old));
1736
1737 /*
1738 * Atomically update last_tsc. This update races against concurrent
1739 * atomic updates, but the race will always cause supplementary full TSC
1740 * records, never the opposite (missing a full TSC record when it would
1741 * be needed).
1742 */
1743 save_last_tsc(config, buf, ctx->tsc);
1744
1745 /*
1746 * Push the reader if necessary
1747 */
1748 lib_ring_buffer_reserve_push_reader(buf, chan, offsets.end - 1);
1749
1750 /*
1751 * Clear noref flag for this subbuffer.
1752 */
1753 lib_ring_buffer_clear_noref(config, &buf->backend,
1754 subbuf_index(offsets.end - 1, chan),
1755 handle);
1756
1757 /*
1758 * Switch old subbuffer if needed.
1759 */
1760 if (caa_unlikely(offsets.switch_old_end)) {
1761 lib_ring_buffer_clear_noref(config, &buf->backend,
1762 subbuf_index(offsets.old - 1, chan),
1763 handle);
1764 lib_ring_buffer_switch_old_end(buf, chan, &offsets, ctx->tsc, handle);
1765 }
1766
1767 /*
1768 * Populate new subbuffer.
1769 */
1770 if (caa_unlikely(offsets.switch_new_start))
1771 lib_ring_buffer_switch_new_start(buf, chan, &offsets, ctx->tsc, handle);
1772
1773 ctx->slot_size = offsets.size;
1774 ctx->pre_offset = offsets.begin;
1775 ctx->buf_offset = offsets.begin + offsets.pre_header_padding;
1776 return 0;
1777 }
1778
1779 /*
1780 * Force a read (imply TLS fixup for dlopen) of TLS variables.
1781 */
1782 void lttng_fixup_ringbuffer_tls(void)
1783 {
1784 asm volatile ("" : : "m" (URCU_TLS(lib_ring_buffer_nesting)));
1785 }
1786
1787 void lib_ringbuffer_signal_init(void)
1788 {
1789 sigset_t mask;
1790 int ret;
1791
1792 /*
1793 * Block signal for entire process, so only our thread processes
1794 * it.
1795 */
1796 rb_setmask(&mask);
1797 ret = pthread_sigmask(SIG_BLOCK, &mask, NULL);
1798 if (ret) {
1799 errno = ret;
1800 PERROR("pthread_sigmask");
1801 }
1802 }
This page took 0.065125 seconds and 5 git commands to generate.