Clean unused vars. warnings
[lttng-ust.git] / libringbuffer / ring_buffer_frontend.c
1 /*
2 * ring_buffer_frontend.c
3 *
4 * (C) Copyright 2005-2010 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
5 *
6 * Ring buffer wait-free buffer synchronization. Producer-consumer and flight
7 * recorder (overwrite) modes. See thesis:
8 *
9 * Desnoyers, Mathieu (2009), "Low-Impact Operating System Tracing", Ph.D.
10 * dissertation, Ecole Polytechnique de Montreal.
11 * http://www.lttng.org/pub/thesis/desnoyers-dissertation-2009-12.pdf
12 *
13 * - Algorithm presentation in Chapter 5:
14 * "Lockless Multi-Core High-Throughput Buffering".
15 * - Algorithm formal verification in Section 8.6:
16 * "Formal verification of LTTng"
17 *
18 * Author:
19 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
20 *
21 * Inspired from LTT and RelayFS:
22 * Karim Yaghmour <karim@opersys.com>
23 * Tom Zanussi <zanussi@us.ibm.com>
24 * Bob Wisniewski <bob@watson.ibm.com>
25 * And from K42 :
26 * Bob Wisniewski <bob@watson.ibm.com>
27 *
28 * Buffer reader semantic :
29 *
30 * - get_subbuf_size
31 * while buffer is not finalized and empty
32 * - get_subbuf
33 * - if return value != 0, continue
34 * - splice one subbuffer worth of data to a pipe
35 * - splice the data from pipe to disk/network
36 * - put_subbuf
37 *
38 * Dual LGPL v2.1/GPL v2 license.
39 */
40
41 #include <sys/types.h>
42 #include <sys/mman.h>
43 #include <sys/stat.h>
44 #include <fcntl.h>
45 #include <urcu/compiler.h>
46 #include <urcu/ref.h>
47
48 #include "smp.h"
49 #include <lttng/ringbuffer-config.h>
50 #include "backend.h"
51 #include "frontend.h"
52 #include "shm.h"
53
54 #ifndef max
55 #define max(a, b) ((a) > (b) ? (a) : (b))
56 #endif
57
58 /*
59 * Use POSIX SHM: shm_open(3) and shm_unlink(3).
60 * close(2) to close the fd returned by shm_open.
61 * shm_unlink releases the shared memory object name.
62 * ftruncate(2) sets the size of the memory object.
63 * mmap/munmap maps the shared memory obj to a virtual address in the
64 * calling proceess (should be done both in libust and consumer).
65 * See shm_overview(7) for details.
66 * Pass file descriptor returned by shm_open(3) to ltt-sessiond through
67 * a UNIX socket.
68 *
69 * Since we don't need to access the object using its name, we can
70 * immediately shm_unlink(3) it, and only keep the handle with its file
71 * descriptor.
72 */
73
74 /*
75 * Internal structure representing offsets to use at a sub-buffer switch.
76 */
77 struct switch_offsets {
78 unsigned long begin, end, old;
79 size_t pre_header_padding, size;
80 unsigned int switch_new_start:1, switch_new_end:1, switch_old_start:1,
81 switch_old_end:1;
82 };
83
84 __thread unsigned int lib_ring_buffer_nesting;
85
86 static
87 void lib_ring_buffer_print_errors(struct channel *chan,
88 struct lttng_ust_lib_ring_buffer *buf, int cpu,
89 struct lttng_ust_shm_handle *handle);
90
91 /*
92 * Must be called under cpu hotplug protection.
93 */
94 void lib_ring_buffer_free(struct lttng_ust_lib_ring_buffer *buf,
95 struct lttng_ust_shm_handle *handle)
96 {
97 struct channel *chan = shmp(handle, buf->backend.chan);
98
99 lib_ring_buffer_print_errors(chan, buf, buf->backend.cpu, handle);
100 /* buf->commit_hot will be freed by shm teardown */
101 /* buf->commit_cold will be freed by shm teardown */
102
103 lib_ring_buffer_backend_free(&buf->backend);
104 }
105
106 /**
107 * lib_ring_buffer_reset - Reset ring buffer to initial values.
108 * @buf: Ring buffer.
109 *
110 * Effectively empty the ring buffer. Should be called when the buffer is not
111 * used for writing. The ring buffer can be opened for reading, but the reader
112 * should not be using the iterator concurrently with reset. The previous
113 * current iterator record is reset.
114 */
115 void lib_ring_buffer_reset(struct lttng_ust_lib_ring_buffer *buf,
116 struct lttng_ust_shm_handle *handle)
117 {
118 struct channel *chan = shmp(handle, buf->backend.chan);
119 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
120 unsigned int i;
121
122 /*
123 * Reset iterator first. It will put the subbuffer if it currently holds
124 * it.
125 */
126 v_set(config, &buf->offset, 0);
127 for (i = 0; i < chan->backend.num_subbuf; i++) {
128 v_set(config, &shmp_index(handle, buf->commit_hot, i)->cc, 0);
129 v_set(config, &shmp_index(handle, buf->commit_hot, i)->seq, 0);
130 v_set(config, &shmp_index(handle, buf->commit_cold, i)->cc_sb, 0);
131 }
132 uatomic_set(&buf->consumed, 0);
133 uatomic_set(&buf->record_disabled, 0);
134 v_set(config, &buf->last_tsc, 0);
135 lib_ring_buffer_backend_reset(&buf->backend, handle);
136 /* Don't reset number of active readers */
137 v_set(config, &buf->records_lost_full, 0);
138 v_set(config, &buf->records_lost_wrap, 0);
139 v_set(config, &buf->records_lost_big, 0);
140 v_set(config, &buf->records_count, 0);
141 v_set(config, &buf->records_overrun, 0);
142 buf->finalized = 0;
143 }
144
145 /**
146 * channel_reset - Reset channel to initial values.
147 * @chan: Channel.
148 *
149 * Effectively empty the channel. Should be called when the channel is not used
150 * for writing. The channel can be opened for reading, but the reader should not
151 * be using the iterator concurrently with reset. The previous current iterator
152 * record is reset.
153 */
154 void channel_reset(struct channel *chan)
155 {
156 /*
157 * Reset iterators first. Will put the subbuffer if held for reading.
158 */
159 uatomic_set(&chan->record_disabled, 0);
160 /* Don't reset commit_count_mask, still valid */
161 channel_backend_reset(&chan->backend);
162 /* Don't reset switch/read timer interval */
163 /* Don't reset notifiers and notifier enable bits */
164 /* Don't reset reader reference count */
165 }
166
167 /*
168 * Must be called under cpu hotplug protection.
169 */
170 int lib_ring_buffer_create(struct lttng_ust_lib_ring_buffer *buf,
171 struct channel_backend *chanb, int cpu,
172 struct lttng_ust_shm_handle *handle,
173 struct shm_object *shmobj)
174 {
175 const struct lttng_ust_lib_ring_buffer_config *config = &chanb->config;
176 struct channel *chan = caa_container_of(chanb, struct channel, backend);
177 void *priv = channel_get_private(chan);
178 size_t subbuf_header_size;
179 u64 tsc;
180 int ret;
181
182 /* Test for cpu hotplug */
183 if (buf->backend.allocated)
184 return 0;
185
186 ret = lib_ring_buffer_backend_create(&buf->backend, &chan->backend,
187 cpu, handle, shmobj);
188 if (ret)
189 return ret;
190
191 align_shm(shmobj, __alignof__(struct commit_counters_hot));
192 set_shmp(buf->commit_hot,
193 zalloc_shm(shmobj,
194 sizeof(struct commit_counters_hot) * chan->backend.num_subbuf));
195 if (!shmp(handle, buf->commit_hot)) {
196 ret = -ENOMEM;
197 goto free_chanbuf;
198 }
199
200 align_shm(shmobj, __alignof__(struct commit_counters_cold));
201 set_shmp(buf->commit_cold,
202 zalloc_shm(shmobj,
203 sizeof(struct commit_counters_cold) * chan->backend.num_subbuf));
204 if (!shmp(handle, buf->commit_cold)) {
205 ret = -ENOMEM;
206 goto free_commit;
207 }
208
209 /*
210 * Write the subbuffer header for first subbuffer so we know the total
211 * duration of data gathering.
212 */
213 subbuf_header_size = config->cb.subbuffer_header_size();
214 v_set(config, &buf->offset, subbuf_header_size);
215 subbuffer_id_clear_noref(config, &shmp_index(handle, buf->backend.buf_wsb, 0)->id);
216 tsc = config->cb.ring_buffer_clock_read(shmp(handle, buf->backend.chan));
217 config->cb.buffer_begin(buf, tsc, 0, handle);
218 v_add(config, subbuf_header_size, &shmp_index(handle, buf->commit_hot, 0)->cc);
219
220 if (config->cb.buffer_create) {
221 ret = config->cb.buffer_create(buf, priv, cpu, chanb->name, handle);
222 if (ret)
223 goto free_init;
224 }
225 buf->backend.allocated = 1;
226 return 0;
227
228 /* Error handling */
229 free_init:
230 /* commit_cold will be freed by shm teardown */
231 free_commit:
232 /* commit_hot will be freed by shm teardown */
233 free_chanbuf:
234 lib_ring_buffer_backend_free(&buf->backend);
235 return ret;
236 }
237
238 #if 0
239 static void switch_buffer_timer(unsigned long data)
240 {
241 struct lttng_ust_lib_ring_buffer *buf = (struct lttng_ust_lib_ring_buffer *)data;
242 struct channel *chan = shmp(handle, buf->backend.chan);
243 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
244
245 /*
246 * Only flush buffers periodically if readers are active.
247 */
248 if (uatomic_read(&buf->active_readers) || uatomic_read(&buf->active_shadow_readers))
249 lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE, handle);
250
251 //TODO timers
252 //if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
253 // mod_timer_pinned(&buf->switch_timer,
254 // jiffies + chan->switch_timer_interval);
255 //else
256 // mod_timer(&buf->switch_timer,
257 // jiffies + chan->switch_timer_interval);
258 }
259 #endif //0
260
261 static void lib_ring_buffer_start_switch_timer(struct lttng_ust_lib_ring_buffer *buf,
262 struct lttng_ust_shm_handle *handle)
263 {
264 struct channel *chan = shmp(handle, buf->backend.chan);
265 //const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
266
267 if (!chan->switch_timer_interval || buf->switch_timer_enabled)
268 return;
269 //TODO
270 //init_timer(&buf->switch_timer);
271 //buf->switch_timer.function = switch_buffer_timer;
272 //buf->switch_timer.expires = jiffies + chan->switch_timer_interval;
273 //buf->switch_timer.data = (unsigned long)buf;
274 //if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
275 // add_timer_on(&buf->switch_timer, buf->backend.cpu);
276 //else
277 // add_timer(&buf->switch_timer);
278 buf->switch_timer_enabled = 1;
279 }
280
281 static void lib_ring_buffer_stop_switch_timer(struct lttng_ust_lib_ring_buffer *buf,
282 struct lttng_ust_shm_handle *handle)
283 {
284 struct channel *chan = shmp(handle, buf->backend.chan);
285
286 if (!chan->switch_timer_interval || !buf->switch_timer_enabled)
287 return;
288
289 //TODO
290 //del_timer_sync(&buf->switch_timer);
291 buf->switch_timer_enabled = 0;
292 }
293
294 #if 0
295 /*
296 * Polling timer to check the channels for data.
297 */
298 static void read_buffer_timer(unsigned long data)
299 {
300 struct lttng_ust_lib_ring_buffer *buf = (struct lttng_ust_lib_ring_buffer *)data;
301 struct channel *chan = shmp(handle, buf->backend.chan);
302 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
303
304 CHAN_WARN_ON(chan, !buf->backend.allocated);
305
306 if (uatomic_read(&buf->active_readers) || uatomic_read(&buf->active_shadow_readers))
307 && lib_ring_buffer_poll_deliver(config, buf, chan)) {
308 //TODO
309 //wake_up_interruptible(&buf->read_wait);
310 //wake_up_interruptible(&chan->read_wait);
311 }
312
313 //TODO
314 //if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
315 // mod_timer_pinned(&buf->read_timer,
316 // jiffies + chan->read_timer_interval);
317 //else
318 // mod_timer(&buf->read_timer,
319 // jiffies + chan->read_timer_interval);
320 }
321 #endif //0
322
323 static void lib_ring_buffer_start_read_timer(struct lttng_ust_lib_ring_buffer *buf,
324 struct lttng_ust_shm_handle *handle)
325 {
326 struct channel *chan = shmp(handle, buf->backend.chan);
327 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
328
329 if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER
330 || !chan->read_timer_interval
331 || buf->read_timer_enabled)
332 return;
333
334 //TODO
335 //init_timer(&buf->read_timer);
336 //buf->read_timer.function = read_buffer_timer;
337 //buf->read_timer.expires = jiffies + chan->read_timer_interval;
338 //buf->read_timer.data = (unsigned long)buf;
339
340 //if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
341 // add_timer_on(&buf->read_timer, buf->backend.cpu);
342 //else
343 // add_timer(&buf->read_timer);
344 buf->read_timer_enabled = 1;
345 }
346
347 static void lib_ring_buffer_stop_read_timer(struct lttng_ust_lib_ring_buffer *buf,
348 struct lttng_ust_shm_handle *handle)
349 {
350 struct channel *chan = shmp(handle, buf->backend.chan);
351 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
352
353 if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER
354 || !chan->read_timer_interval
355 || !buf->read_timer_enabled)
356 return;
357
358 //TODO
359 //del_timer_sync(&buf->read_timer);
360 /*
361 * do one more check to catch data that has been written in the last
362 * timer period.
363 */
364 if (lib_ring_buffer_poll_deliver(config, buf, chan, handle)) {
365 //TODO
366 //wake_up_interruptible(&buf->read_wait);
367 //wake_up_interruptible(&chan->read_wait);
368 }
369 buf->read_timer_enabled = 0;
370 }
371
372 static void channel_unregister_notifiers(struct channel *chan,
373 struct lttng_ust_shm_handle *handle)
374 {
375 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
376 int cpu;
377
378 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
379 for_each_possible_cpu(cpu) {
380 struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[cpu].shmp);
381
382 lib_ring_buffer_stop_switch_timer(buf, handle);
383 lib_ring_buffer_stop_read_timer(buf, handle);
384 }
385 } else {
386 struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[0].shmp);
387
388 lib_ring_buffer_stop_switch_timer(buf, handle);
389 lib_ring_buffer_stop_read_timer(buf, handle);
390 }
391 //channel_backend_unregister_notifiers(&chan->backend);
392 }
393
394 static void channel_free(struct channel *chan, struct lttng_ust_shm_handle *handle,
395 int shadow)
396 {
397 if (!shadow)
398 channel_backend_free(&chan->backend, handle);
399 /* chan is freed by shm teardown */
400 shm_object_table_destroy(handle->table);
401 free(handle);
402 }
403
404 /**
405 * channel_create - Create channel.
406 * @config: ring buffer instance configuration
407 * @name: name of the channel
408 * @priv_data: ring buffer client private data area pointer (output)
409 * @priv_data_size: length, in bytes, of the private data area.
410 * @priv_data_init: initialization data for private data.
411 * @buf_addr: pointer the the beginning of the preallocated buffer contiguous
412 * address mapping. It is used only by RING_BUFFER_STATIC
413 * configuration. It can be set to NULL for other backends.
414 * @subbuf_size: subbuffer size
415 * @num_subbuf: number of subbuffers
416 * @switch_timer_interval: Time interval (in us) to fill sub-buffers with
417 * padding to let readers get those sub-buffers.
418 * Used for live streaming.
419 * @read_timer_interval: Time interval (in us) to wake up pending readers.
420 *
421 * Holds cpu hotplug.
422 * Returns NULL on failure.
423 */
424 struct lttng_ust_shm_handle *channel_create(const struct lttng_ust_lib_ring_buffer_config *config,
425 const char *name,
426 void **priv_data,
427 size_t priv_data_align,
428 size_t priv_data_size,
429 void *priv_data_init,
430 void *buf_addr, size_t subbuf_size,
431 size_t num_subbuf, unsigned int switch_timer_interval,
432 unsigned int read_timer_interval,
433 int *shm_fd, int *wait_fd, uint64_t *memory_map_size)
434 {
435 int ret, cpu;
436 size_t shmsize, chansize;
437 struct channel *chan;
438 struct lttng_ust_shm_handle *handle;
439 struct shm_object *shmobj;
440 struct shm_ref *ref;
441
442 if (lib_ring_buffer_check_config(config, switch_timer_interval,
443 read_timer_interval))
444 return NULL;
445
446 handle = zmalloc(sizeof(struct lttng_ust_shm_handle));
447 if (!handle)
448 return NULL;
449
450 /* Allocate table for channel + per-cpu buffers */
451 handle->table = shm_object_table_create(1 + num_possible_cpus());
452 if (!handle->table)
453 goto error_table_alloc;
454
455 /* Calculate the shm allocation layout */
456 shmsize = sizeof(struct channel);
457 shmsize += offset_align(shmsize, __alignof__(struct lttng_ust_lib_ring_buffer_shmp));
458 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
459 shmsize += sizeof(struct lttng_ust_lib_ring_buffer_shmp) * num_possible_cpus();
460 else
461 shmsize += sizeof(struct lttng_ust_lib_ring_buffer_shmp);
462 chansize = shmsize;
463 shmsize += offset_align(shmsize, priv_data_align);
464 shmsize += priv_data_size;
465
466 shmobj = shm_object_table_append(handle->table, shmsize);
467 if (!shmobj)
468 goto error_append;
469 /* struct channel is at object 0, offset 0 (hardcoded) */
470 set_shmp(handle->chan, zalloc_shm(shmobj, chansize));
471 assert(handle->chan._ref.index == 0);
472 assert(handle->chan._ref.offset == 0);
473 chan = shmp(handle, handle->chan);
474 if (!chan)
475 goto error_append;
476
477 /* space for private data */
478 if (priv_data_size) {
479 DECLARE_SHMP(void, priv_data_alloc);
480
481 align_shm(shmobj, priv_data_align);
482 chan->priv_data_offset = shmobj->allocated_len;
483 set_shmp(priv_data_alloc, zalloc_shm(shmobj, priv_data_size));
484 if (!shmp(handle, priv_data_alloc))
485 goto error_append;
486 *priv_data = channel_get_private(chan);
487 memcpy(*priv_data, priv_data_init, priv_data_size);
488 } else {
489 chan->priv_data_offset = -1;
490 *priv_data = NULL;
491 }
492
493 ret = channel_backend_init(&chan->backend, name, config,
494 subbuf_size, num_subbuf, handle);
495 if (ret)
496 goto error_backend_init;
497
498 chan->commit_count_mask = (~0UL >> chan->backend.num_subbuf_order);
499 //TODO
500 //chan->switch_timer_interval = usecs_to_jiffies(switch_timer_interval);
501 //chan->read_timer_interval = usecs_to_jiffies(read_timer_interval);
502 //TODO
503 //init_waitqueue_head(&chan->read_wait);
504 //init_waitqueue_head(&chan->hp_wait);
505
506 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
507 /*
508 * In case of non-hotplug cpu, if the ring-buffer is allocated
509 * in early initcall, it will not be notified of secondary cpus.
510 * In that off case, we need to allocate for all possible cpus.
511 */
512 for_each_possible_cpu(cpu) {
513 struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[cpu].shmp);
514 lib_ring_buffer_start_switch_timer(buf, handle);
515 lib_ring_buffer_start_read_timer(buf, handle);
516 }
517 } else {
518 struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[0].shmp);
519
520 lib_ring_buffer_start_switch_timer(buf, handle);
521 lib_ring_buffer_start_read_timer(buf, handle);
522 }
523 ref = &handle->chan._ref;
524 shm_get_object_data(handle, ref, shm_fd, wait_fd, memory_map_size);
525 return handle;
526
527 error_backend_init:
528 error_append:
529 shm_object_table_destroy(handle->table);
530 error_table_alloc:
531 free(handle);
532 return NULL;
533 }
534
535 struct lttng_ust_shm_handle *channel_handle_create(int shm_fd, int wait_fd,
536 uint64_t memory_map_size)
537 {
538 struct lttng_ust_shm_handle *handle;
539 struct shm_object *object;
540
541 handle = zmalloc(sizeof(struct lttng_ust_shm_handle));
542 if (!handle)
543 return NULL;
544
545 /* Allocate table for channel + per-cpu buffers */
546 handle->table = shm_object_table_create(1 + num_possible_cpus());
547 if (!handle->table)
548 goto error_table_alloc;
549 /* Add channel object */
550 object = shm_object_table_append_shadow(handle->table,
551 shm_fd, wait_fd, memory_map_size);
552 if (!object)
553 goto error_table_object;
554 /* struct channel is at object 0, offset 0 (hardcoded) */
555 handle->chan._ref.index = 0;
556 handle->chan._ref.offset = 0;
557 return handle;
558
559 error_table_object:
560 shm_object_table_destroy(handle->table);
561 error_table_alloc:
562 free(handle);
563 return NULL;
564 }
565
566 int channel_handle_add_stream(struct lttng_ust_shm_handle *handle,
567 int shm_fd, int wait_fd, uint64_t memory_map_size)
568 {
569 struct shm_object *object;
570
571 /* Add stream object */
572 object = shm_object_table_append_shadow(handle->table,
573 shm_fd, wait_fd, memory_map_size);
574 if (!object)
575 return -1;
576 return 0;
577 }
578
579 static
580 void channel_release(struct channel *chan, struct lttng_ust_shm_handle *handle,
581 int shadow)
582 {
583 channel_free(chan, handle, shadow);
584 }
585
586 /**
587 * channel_destroy - Finalize, wait for q.s. and destroy channel.
588 * @chan: channel to destroy
589 *
590 * Holds cpu hotplug.
591 * Call "destroy" callback, finalize channels, decrement the channel
592 * reference count. Note that when readers have completed data
593 * consumption of finalized channels, get_subbuf() will return -ENODATA.
594 * They should release their handle at that point.
595 */
596 void channel_destroy(struct channel *chan, struct lttng_ust_shm_handle *handle,
597 int shadow)
598 {
599 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
600 int cpu;
601
602 if (shadow) {
603 channel_release(chan, handle, shadow);
604 return;
605 }
606
607 channel_unregister_notifiers(chan, handle);
608
609 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
610 for_each_channel_cpu(cpu, chan) {
611 struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[cpu].shmp);
612
613 if (config->cb.buffer_finalize)
614 config->cb.buffer_finalize(buf,
615 channel_get_private(chan),
616 cpu, handle);
617 if (buf->backend.allocated)
618 lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH,
619 handle);
620 /*
621 * Perform flush before writing to finalized.
622 */
623 cmm_smp_wmb();
624 CMM_ACCESS_ONCE(buf->finalized) = 1;
625 //wake_up_interruptible(&buf->read_wait);
626 }
627 } else {
628 struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[0].shmp);
629
630 if (config->cb.buffer_finalize)
631 config->cb.buffer_finalize(buf, channel_get_private(chan), -1, handle);
632 if (buf->backend.allocated)
633 lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH,
634 handle);
635 /*
636 * Perform flush before writing to finalized.
637 */
638 cmm_smp_wmb();
639 CMM_ACCESS_ONCE(buf->finalized) = 1;
640 //wake_up_interruptible(&buf->read_wait);
641 }
642 CMM_ACCESS_ONCE(chan->finalized) = 1;
643 //wake_up_interruptible(&chan->hp_wait);
644 //wake_up_interruptible(&chan->read_wait);
645 /*
646 * sessiond/consumer are keeping a reference on the shm file
647 * descriptor directly. No need to refcount.
648 */
649 channel_release(chan, handle, shadow);
650 return;
651 }
652
653 struct lttng_ust_lib_ring_buffer *channel_get_ring_buffer(
654 const struct lttng_ust_lib_ring_buffer_config *config,
655 struct channel *chan, int cpu,
656 struct lttng_ust_shm_handle *handle,
657 int *shm_fd, int *wait_fd,
658 uint64_t *memory_map_size)
659 {
660 struct shm_ref *ref;
661
662 if (config->alloc == RING_BUFFER_ALLOC_GLOBAL) {
663 ref = &chan->backend.buf[0].shmp._ref;
664 shm_get_object_data(handle, ref, shm_fd, wait_fd,
665 memory_map_size);
666 return shmp(handle, chan->backend.buf[0].shmp);
667 } else {
668 if (cpu >= num_possible_cpus())
669 return NULL;
670 ref = &chan->backend.buf[cpu].shmp._ref;
671 shm_get_object_data(handle, ref, shm_fd, wait_fd,
672 memory_map_size);
673 return shmp(handle, chan->backend.buf[cpu].shmp);
674 }
675 }
676
677 int lib_ring_buffer_open_read(struct lttng_ust_lib_ring_buffer *buf,
678 struct lttng_ust_shm_handle *handle,
679 int shadow)
680 {
681 if (shadow) {
682 if (uatomic_cmpxchg(&buf->active_shadow_readers, 0, 1) != 0)
683 return -EBUSY;
684 cmm_smp_mb();
685 return 0;
686 }
687 if (uatomic_cmpxchg(&buf->active_readers, 0, 1) != 0)
688 return -EBUSY;
689 cmm_smp_mb();
690 return 0;
691 }
692
693 void lib_ring_buffer_release_read(struct lttng_ust_lib_ring_buffer *buf,
694 struct lttng_ust_shm_handle *handle,
695 int shadow)
696 {
697 struct channel *chan = shmp(handle, buf->backend.chan);
698
699 if (shadow) {
700 CHAN_WARN_ON(chan, uatomic_read(&buf->active_shadow_readers) != 1);
701 cmm_smp_mb();
702 uatomic_dec(&buf->active_shadow_readers);
703 return;
704 }
705 CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1);
706 cmm_smp_mb();
707 uatomic_dec(&buf->active_readers);
708 }
709
710 /**
711 * lib_ring_buffer_snapshot - save subbuffer position snapshot (for read)
712 * @buf: ring buffer
713 * @consumed: consumed count indicating the position where to read
714 * @produced: produced count, indicates position when to stop reading
715 *
716 * Returns -ENODATA if buffer is finalized, -EAGAIN if there is currently no
717 * data to read at consumed position, or 0 if the get operation succeeds.
718 */
719
720 int lib_ring_buffer_snapshot(struct lttng_ust_lib_ring_buffer *buf,
721 unsigned long *consumed, unsigned long *produced,
722 struct lttng_ust_shm_handle *handle)
723 {
724 struct channel *chan = shmp(handle, buf->backend.chan);
725 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
726 unsigned long consumed_cur, write_offset;
727 int finalized;
728
729 finalized = CMM_ACCESS_ONCE(buf->finalized);
730 /*
731 * Read finalized before counters.
732 */
733 cmm_smp_rmb();
734 consumed_cur = uatomic_read(&buf->consumed);
735 /*
736 * No need to issue a memory barrier between consumed count read and
737 * write offset read, because consumed count can only change
738 * concurrently in overwrite mode, and we keep a sequence counter
739 * identifier derived from the write offset to check we are getting
740 * the same sub-buffer we are expecting (the sub-buffers are atomically
741 * "tagged" upon writes, tags are checked upon read).
742 */
743 write_offset = v_read(config, &buf->offset);
744
745 /*
746 * Check that we are not about to read the same subbuffer in
747 * which the writer head is.
748 */
749 if (subbuf_trunc(write_offset, chan) - subbuf_trunc(consumed_cur, chan)
750 == 0)
751 goto nodata;
752
753 *consumed = consumed_cur;
754 *produced = subbuf_trunc(write_offset, chan);
755
756 return 0;
757
758 nodata:
759 /*
760 * The memory barriers __wait_event()/wake_up_interruptible() take care
761 * of "raw_spin_is_locked" memory ordering.
762 */
763 if (finalized)
764 return -ENODATA;
765 else
766 return -EAGAIN;
767 }
768
769 /**
770 * lib_ring_buffer_put_snapshot - move consumed counter forward
771 * @buf: ring buffer
772 * @consumed_new: new consumed count value
773 */
774 void lib_ring_buffer_move_consumer(struct lttng_ust_lib_ring_buffer *buf,
775 unsigned long consumed_new,
776 struct lttng_ust_shm_handle *handle)
777 {
778 struct lttng_ust_lib_ring_buffer_backend *bufb = &buf->backend;
779 struct channel *chan = shmp(handle, bufb->chan);
780 unsigned long consumed;
781
782 CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1
783 && uatomic_read(&buf->active_shadow_readers) != 1);
784
785 /*
786 * Only push the consumed value forward.
787 * If the consumed cmpxchg fails, this is because we have been pushed by
788 * the writer in flight recorder mode.
789 */
790 consumed = uatomic_read(&buf->consumed);
791 while ((long) consumed - (long) consumed_new < 0)
792 consumed = uatomic_cmpxchg(&buf->consumed, consumed,
793 consumed_new);
794 }
795
796 /**
797 * lib_ring_buffer_get_subbuf - get exclusive access to subbuffer for reading
798 * @buf: ring buffer
799 * @consumed: consumed count indicating the position where to read
800 *
801 * Returns -ENODATA if buffer is finalized, -EAGAIN if there is currently no
802 * data to read at consumed position, or 0 if the get operation succeeds.
803 */
804 int lib_ring_buffer_get_subbuf(struct lttng_ust_lib_ring_buffer *buf,
805 unsigned long consumed,
806 struct lttng_ust_shm_handle *handle)
807 {
808 struct channel *chan = shmp(handle, buf->backend.chan);
809 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
810 unsigned long consumed_cur, consumed_idx, commit_count, write_offset;
811 int ret;
812 int finalized;
813
814 retry:
815 finalized = CMM_ACCESS_ONCE(buf->finalized);
816 /*
817 * Read finalized before counters.
818 */
819 cmm_smp_rmb();
820 consumed_cur = uatomic_read(&buf->consumed);
821 consumed_idx = subbuf_index(consumed, chan);
822 commit_count = v_read(config, &shmp_index(handle, buf->commit_cold, consumed_idx)->cc_sb);
823 /*
824 * Make sure we read the commit count before reading the buffer
825 * data and the write offset. Correct consumed offset ordering
826 * wrt commit count is insured by the use of cmpxchg to update
827 * the consumed offset.
828 */
829 /*
830 * Local rmb to match the remote wmb to read the commit count
831 * before the buffer data and the write offset.
832 */
833 cmm_smp_rmb();
834
835 write_offset = v_read(config, &buf->offset);
836
837 /*
838 * Check that the buffer we are getting is after or at consumed_cur
839 * position.
840 */
841 if ((long) subbuf_trunc(consumed, chan)
842 - (long) subbuf_trunc(consumed_cur, chan) < 0)
843 goto nodata;
844
845 /*
846 * Check that the subbuffer we are trying to consume has been
847 * already fully committed.
848 */
849 if (((commit_count - chan->backend.subbuf_size)
850 & chan->commit_count_mask)
851 - (buf_trunc(consumed_cur, chan)
852 >> chan->backend.num_subbuf_order)
853 != 0)
854 goto nodata;
855
856 /*
857 * Check that we are not about to read the same subbuffer in
858 * which the writer head is.
859 */
860 if (subbuf_trunc(write_offset, chan) - subbuf_trunc(consumed_cur, chan)
861 == 0)
862 goto nodata;
863
864 /*
865 * Failure to get the subbuffer causes a busy-loop retry without going
866 * to a wait queue. These are caused by short-lived race windows where
867 * the writer is getting access to a subbuffer we were trying to get
868 * access to. Also checks that the "consumed" buffer count we are
869 * looking for matches the one contained in the subbuffer id.
870 */
871 ret = update_read_sb_index(config, &buf->backend, &chan->backend,
872 consumed_idx, buf_trunc_val(consumed, chan),
873 handle);
874 if (ret)
875 goto retry;
876 subbuffer_id_clear_noref(config, &buf->backend.buf_rsb.id);
877
878 buf->get_subbuf_consumed = consumed;
879 buf->get_subbuf = 1;
880
881 return 0;
882
883 nodata:
884 /*
885 * The memory barriers __wait_event()/wake_up_interruptible() take care
886 * of "raw_spin_is_locked" memory ordering.
887 */
888 if (finalized)
889 return -ENODATA;
890 else
891 return -EAGAIN;
892 }
893
894 /**
895 * lib_ring_buffer_put_subbuf - release exclusive subbuffer access
896 * @buf: ring buffer
897 */
898 void lib_ring_buffer_put_subbuf(struct lttng_ust_lib_ring_buffer *buf,
899 struct lttng_ust_shm_handle *handle)
900 {
901 struct lttng_ust_lib_ring_buffer_backend *bufb = &buf->backend;
902 struct channel *chan = shmp(handle, bufb->chan);
903 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
904 unsigned long read_sb_bindex, consumed_idx, consumed;
905
906 CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1
907 && uatomic_read(&buf->active_shadow_readers) != 1);
908
909 if (!buf->get_subbuf) {
910 /*
911 * Reader puts a subbuffer it did not get.
912 */
913 CHAN_WARN_ON(chan, 1);
914 return;
915 }
916 consumed = buf->get_subbuf_consumed;
917 buf->get_subbuf = 0;
918
919 /*
920 * Clear the records_unread counter. (overruns counter)
921 * Can still be non-zero if a file reader simply grabbed the data
922 * without using iterators.
923 * Can be below zero if an iterator is used on a snapshot more than
924 * once.
925 */
926 read_sb_bindex = subbuffer_id_get_index(config, bufb->buf_rsb.id);
927 v_add(config, v_read(config,
928 &shmp(handle, shmp_index(handle, bufb->array, read_sb_bindex)->shmp)->records_unread),
929 &bufb->records_read);
930 v_set(config, &shmp(handle, shmp_index(handle, bufb->array, read_sb_bindex)->shmp)->records_unread, 0);
931 CHAN_WARN_ON(chan, config->mode == RING_BUFFER_OVERWRITE
932 && subbuffer_id_is_noref(config, bufb->buf_rsb.id));
933 subbuffer_id_set_noref(config, &bufb->buf_rsb.id);
934
935 /*
936 * Exchange the reader subbuffer with the one we put in its place in the
937 * writer subbuffer table. Expect the original consumed count. If
938 * update_read_sb_index fails, this is because the writer updated the
939 * subbuffer concurrently. We should therefore keep the subbuffer we
940 * currently have: it has become invalid to try reading this sub-buffer
941 * consumed count value anyway.
942 */
943 consumed_idx = subbuf_index(consumed, chan);
944 update_read_sb_index(config, &buf->backend, &chan->backend,
945 consumed_idx, buf_trunc_val(consumed, chan),
946 handle);
947 /*
948 * update_read_sb_index return value ignored. Don't exchange sub-buffer
949 * if the writer concurrently updated it.
950 */
951 }
952
953 /*
954 * cons_offset is an iterator on all subbuffer offsets between the reader
955 * position and the writer position. (inclusive)
956 */
957 static
958 void lib_ring_buffer_print_subbuffer_errors(struct lttng_ust_lib_ring_buffer *buf,
959 struct channel *chan,
960 unsigned long cons_offset,
961 int cpu,
962 struct lttng_ust_shm_handle *handle)
963 {
964 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
965 unsigned long cons_idx, commit_count, commit_count_sb;
966
967 cons_idx = subbuf_index(cons_offset, chan);
968 commit_count = v_read(config, &shmp_index(handle, buf->commit_hot, cons_idx)->cc);
969 commit_count_sb = v_read(config, &shmp_index(handle, buf->commit_cold, cons_idx)->cc_sb);
970
971 if (subbuf_offset(commit_count, chan) != 0)
972 DBG("ring buffer %s, cpu %d: "
973 "commit count in subbuffer %lu,\n"
974 "expecting multiples of %lu bytes\n"
975 " [ %lu bytes committed, %lu bytes reader-visible ]\n",
976 chan->backend.name, cpu, cons_idx,
977 chan->backend.subbuf_size,
978 commit_count, commit_count_sb);
979
980 DBG("ring buffer: %s, cpu %d: %lu bytes committed\n",
981 chan->backend.name, cpu, commit_count);
982 }
983
984 static
985 void lib_ring_buffer_print_buffer_errors(struct lttng_ust_lib_ring_buffer *buf,
986 struct channel *chan,
987 void *priv, int cpu,
988 struct lttng_ust_shm_handle *handle)
989 {
990 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
991 unsigned long write_offset, cons_offset;
992
993 /*
994 * Can be called in the error path of allocation when
995 * trans_channel_data is not yet set.
996 */
997 if (!chan)
998 return;
999 /*
1000 * No need to order commit_count, write_offset and cons_offset reads
1001 * because we execute at teardown when no more writer nor reader
1002 * references are left.
1003 */
1004 write_offset = v_read(config, &buf->offset);
1005 cons_offset = uatomic_read(&buf->consumed);
1006 if (write_offset != cons_offset)
1007 DBG("ring buffer %s, cpu %d: "
1008 "non-consumed data\n"
1009 " [ %lu bytes written, %lu bytes read ]\n",
1010 chan->backend.name, cpu, write_offset, cons_offset);
1011
1012 for (cons_offset = uatomic_read(&buf->consumed);
1013 (long) (subbuf_trunc((unsigned long) v_read(config, &buf->offset),
1014 chan)
1015 - cons_offset) > 0;
1016 cons_offset = subbuf_align(cons_offset, chan))
1017 lib_ring_buffer_print_subbuffer_errors(buf, chan, cons_offset,
1018 cpu, handle);
1019 }
1020
1021 static
1022 void lib_ring_buffer_print_errors(struct channel *chan,
1023 struct lttng_ust_lib_ring_buffer *buf, int cpu,
1024 struct lttng_ust_shm_handle *handle)
1025 {
1026 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
1027 void *priv = channel_get_private(chan);
1028
1029 DBG("ring buffer %s, cpu %d: %lu records written, "
1030 "%lu records overrun\n",
1031 chan->backend.name, cpu,
1032 v_read(config, &buf->records_count),
1033 v_read(config, &buf->records_overrun));
1034
1035 if (v_read(config, &buf->records_lost_full)
1036 || v_read(config, &buf->records_lost_wrap)
1037 || v_read(config, &buf->records_lost_big))
1038 DBG("ring buffer %s, cpu %d: records were lost. Caused by:\n"
1039 " [ %lu buffer full, %lu nest buffer wrap-around, "
1040 "%lu event too big ]\n",
1041 chan->backend.name, cpu,
1042 v_read(config, &buf->records_lost_full),
1043 v_read(config, &buf->records_lost_wrap),
1044 v_read(config, &buf->records_lost_big));
1045
1046 lib_ring_buffer_print_buffer_errors(buf, chan, priv, cpu, handle);
1047 }
1048
1049 /*
1050 * lib_ring_buffer_switch_old_start: Populate old subbuffer header.
1051 *
1052 * Only executed when the buffer is finalized, in SWITCH_FLUSH.
1053 */
1054 static
1055 void lib_ring_buffer_switch_old_start(struct lttng_ust_lib_ring_buffer *buf,
1056 struct channel *chan,
1057 struct switch_offsets *offsets,
1058 u64 tsc,
1059 struct lttng_ust_shm_handle *handle)
1060 {
1061 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
1062 unsigned long oldidx = subbuf_index(offsets->old, chan);
1063 unsigned long commit_count;
1064
1065 config->cb.buffer_begin(buf, tsc, oldidx, handle);
1066
1067 /*
1068 * Order all writes to buffer before the commit count update that will
1069 * determine that the subbuffer is full.
1070 */
1071 cmm_smp_wmb();
1072 v_add(config, config->cb.subbuffer_header_size(),
1073 &shmp_index(handle, buf->commit_hot, oldidx)->cc);
1074 commit_count = v_read(config, &shmp_index(handle, buf->commit_hot, oldidx)->cc);
1075 /* Check if the written buffer has to be delivered */
1076 lib_ring_buffer_check_deliver(config, buf, chan, offsets->old,
1077 commit_count, oldidx, handle);
1078 lib_ring_buffer_write_commit_counter(config, buf, chan, oldidx,
1079 offsets->old, commit_count,
1080 config->cb.subbuffer_header_size(),
1081 handle);
1082 }
1083
1084 /*
1085 * lib_ring_buffer_switch_old_end: switch old subbuffer
1086 *
1087 * Note : offset_old should never be 0 here. It is ok, because we never perform
1088 * buffer switch on an empty subbuffer in SWITCH_ACTIVE mode. The caller
1089 * increments the offset_old value when doing a SWITCH_FLUSH on an empty
1090 * subbuffer.
1091 */
1092 static
1093 void lib_ring_buffer_switch_old_end(struct lttng_ust_lib_ring_buffer *buf,
1094 struct channel *chan,
1095 struct switch_offsets *offsets,
1096 u64 tsc,
1097 struct lttng_ust_shm_handle *handle)
1098 {
1099 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
1100 unsigned long oldidx = subbuf_index(offsets->old - 1, chan);
1101 unsigned long commit_count, padding_size, data_size;
1102
1103 data_size = subbuf_offset(offsets->old - 1, chan) + 1;
1104 padding_size = chan->backend.subbuf_size - data_size;
1105 subbuffer_set_data_size(config, &buf->backend, oldidx, data_size,
1106 handle);
1107
1108 /*
1109 * Order all writes to buffer before the commit count update that will
1110 * determine that the subbuffer is full.
1111 */
1112 cmm_smp_wmb();
1113 v_add(config, padding_size, &shmp_index(handle, buf->commit_hot, oldidx)->cc);
1114 commit_count = v_read(config, &shmp_index(handle, buf->commit_hot, oldidx)->cc);
1115 lib_ring_buffer_check_deliver(config, buf, chan, offsets->old - 1,
1116 commit_count, oldidx, handle);
1117 lib_ring_buffer_write_commit_counter(config, buf, chan, oldidx,
1118 offsets->old, commit_count,
1119 padding_size, handle);
1120 }
1121
1122 /*
1123 * lib_ring_buffer_switch_new_start: Populate new subbuffer.
1124 *
1125 * This code can be executed unordered : writers may already have written to the
1126 * sub-buffer before this code gets executed, caution. The commit makes sure
1127 * that this code is executed before the deliver of this sub-buffer.
1128 */
1129 static
1130 void lib_ring_buffer_switch_new_start(struct lttng_ust_lib_ring_buffer *buf,
1131 struct channel *chan,
1132 struct switch_offsets *offsets,
1133 u64 tsc,
1134 struct lttng_ust_shm_handle *handle)
1135 {
1136 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
1137 unsigned long beginidx = subbuf_index(offsets->begin, chan);
1138 unsigned long commit_count;
1139
1140 config->cb.buffer_begin(buf, tsc, beginidx, handle);
1141
1142 /*
1143 * Order all writes to buffer before the commit count update that will
1144 * determine that the subbuffer is full.
1145 */
1146 cmm_smp_wmb();
1147 v_add(config, config->cb.subbuffer_header_size(),
1148 &shmp_index(handle, buf->commit_hot, beginidx)->cc);
1149 commit_count = v_read(config, &shmp_index(handle, buf->commit_hot, beginidx)->cc);
1150 /* Check if the written buffer has to be delivered */
1151 lib_ring_buffer_check_deliver(config, buf, chan, offsets->begin,
1152 commit_count, beginidx, handle);
1153 lib_ring_buffer_write_commit_counter(config, buf, chan, beginidx,
1154 offsets->begin, commit_count,
1155 config->cb.subbuffer_header_size(),
1156 handle);
1157 }
1158
1159 /*
1160 * lib_ring_buffer_switch_new_end: finish switching current subbuffer
1161 *
1162 * The only remaining threads could be the ones with pending commits. They will
1163 * have to do the deliver themselves.
1164 */
1165 static
1166 void lib_ring_buffer_switch_new_end(struct lttng_ust_lib_ring_buffer *buf,
1167 struct channel *chan,
1168 struct switch_offsets *offsets,
1169 u64 tsc,
1170 struct lttng_ust_shm_handle *handle)
1171 {
1172 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
1173 unsigned long endidx = subbuf_index(offsets->end - 1, chan);
1174 unsigned long commit_count, padding_size, data_size;
1175
1176 data_size = subbuf_offset(offsets->end - 1, chan) + 1;
1177 padding_size = chan->backend.subbuf_size - data_size;
1178 subbuffer_set_data_size(config, &buf->backend, endidx, data_size,
1179 handle);
1180
1181 /*
1182 * Order all writes to buffer before the commit count update that will
1183 * determine that the subbuffer is full.
1184 */
1185 cmm_smp_wmb();
1186 v_add(config, padding_size, &shmp_index(handle, buf->commit_hot, endidx)->cc);
1187 commit_count = v_read(config, &shmp_index(handle, buf->commit_hot, endidx)->cc);
1188 lib_ring_buffer_check_deliver(config, buf, chan, offsets->end - 1,
1189 commit_count, endidx, handle);
1190 lib_ring_buffer_write_commit_counter(config, buf, chan, endidx,
1191 offsets->end, commit_count,
1192 padding_size, handle);
1193 }
1194
1195 /*
1196 * Returns :
1197 * 0 if ok
1198 * !0 if execution must be aborted.
1199 */
1200 static
1201 int lib_ring_buffer_try_switch_slow(enum switch_mode mode,
1202 struct lttng_ust_lib_ring_buffer *buf,
1203 struct channel *chan,
1204 struct switch_offsets *offsets,
1205 u64 *tsc)
1206 {
1207 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
1208 unsigned long off;
1209
1210 offsets->begin = v_read(config, &buf->offset);
1211 offsets->old = offsets->begin;
1212 offsets->switch_old_start = 0;
1213 off = subbuf_offset(offsets->begin, chan);
1214
1215 *tsc = config->cb.ring_buffer_clock_read(chan);
1216
1217 /*
1218 * Ensure we flush the header of an empty subbuffer when doing the
1219 * finalize (SWITCH_FLUSH). This ensures that we end up knowing the
1220 * total data gathering duration even if there were no records saved
1221 * after the last buffer switch.
1222 * In SWITCH_ACTIVE mode, switch the buffer when it contains events.
1223 * SWITCH_ACTIVE only flushes the current subbuffer, dealing with end of
1224 * subbuffer header as appropriate.
1225 * The next record that reserves space will be responsible for
1226 * populating the following subbuffer header. We choose not to populate
1227 * the next subbuffer header here because we want to be able to use
1228 * SWITCH_ACTIVE for periodical buffer flush, which must
1229 * guarantee that all the buffer content (records and header
1230 * timestamps) are visible to the reader. This is required for
1231 * quiescence guarantees for the fusion merge.
1232 */
1233 if (mode == SWITCH_FLUSH || off > 0) {
1234 if (caa_unlikely(off == 0)) {
1235 /*
1236 * The client does not save any header information.
1237 * Don't switch empty subbuffer on finalize, because it
1238 * is invalid to deliver a completely empty subbuffer.
1239 */
1240 if (!config->cb.subbuffer_header_size())
1241 return -1;
1242 /*
1243 * Need to write the subbuffer start header on finalize.
1244 */
1245 offsets->switch_old_start = 1;
1246 }
1247 offsets->begin = subbuf_align(offsets->begin, chan);
1248 } else
1249 return -1; /* we do not have to switch : buffer is empty */
1250 /* Note: old points to the next subbuf at offset 0 */
1251 offsets->end = offsets->begin;
1252 return 0;
1253 }
1254
1255 /*
1256 * Force a sub-buffer switch. This operation is completely reentrant : can be
1257 * called while tracing is active with absolutely no lock held.
1258 *
1259 * Note, however, that as a v_cmpxchg is used for some atomic
1260 * operations, this function must be called from the CPU which owns the buffer
1261 * for a ACTIVE flush.
1262 */
1263 void lib_ring_buffer_switch_slow(struct lttng_ust_lib_ring_buffer *buf, enum switch_mode mode,
1264 struct lttng_ust_shm_handle *handle)
1265 {
1266 struct channel *chan = shmp(handle, buf->backend.chan);
1267 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
1268 struct switch_offsets offsets;
1269 unsigned long oldidx;
1270 u64 tsc;
1271
1272 offsets.size = 0;
1273
1274 /*
1275 * Perform retryable operations.
1276 */
1277 do {
1278 if (lib_ring_buffer_try_switch_slow(mode, buf, chan, &offsets,
1279 &tsc))
1280 return; /* Switch not needed */
1281 } while (v_cmpxchg(config, &buf->offset, offsets.old, offsets.end)
1282 != offsets.old);
1283
1284 /*
1285 * Atomically update last_tsc. This update races against concurrent
1286 * atomic updates, but the race will always cause supplementary full TSC
1287 * records, never the opposite (missing a full TSC record when it would
1288 * be needed).
1289 */
1290 save_last_tsc(config, buf, tsc);
1291
1292 /*
1293 * Push the reader if necessary
1294 */
1295 lib_ring_buffer_reserve_push_reader(buf, chan, offsets.old);
1296
1297 oldidx = subbuf_index(offsets.old, chan);
1298 lib_ring_buffer_clear_noref(config, &buf->backend, oldidx, handle);
1299
1300 /*
1301 * May need to populate header start on SWITCH_FLUSH.
1302 */
1303 if (offsets.switch_old_start) {
1304 lib_ring_buffer_switch_old_start(buf, chan, &offsets, tsc, handle);
1305 offsets.old += config->cb.subbuffer_header_size();
1306 }
1307
1308 /*
1309 * Switch old subbuffer.
1310 */
1311 lib_ring_buffer_switch_old_end(buf, chan, &offsets, tsc, handle);
1312 }
1313
1314 /*
1315 * Returns :
1316 * 0 if ok
1317 * -ENOSPC if event size is too large for packet.
1318 * -ENOBUFS if there is currently not enough space in buffer for the event.
1319 * -EIO if data cannot be written into the buffer for any other reason.
1320 */
1321 static
1322 int lib_ring_buffer_try_reserve_slow(struct lttng_ust_lib_ring_buffer *buf,
1323 struct channel *chan,
1324 struct switch_offsets *offsets,
1325 struct lttng_ust_lib_ring_buffer_ctx *ctx)
1326 {
1327 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
1328 struct lttng_ust_shm_handle *handle = ctx->handle;
1329 unsigned long reserve_commit_diff;
1330
1331 offsets->begin = v_read(config, &buf->offset);
1332 offsets->old = offsets->begin;
1333 offsets->switch_new_start = 0;
1334 offsets->switch_new_end = 0;
1335 offsets->switch_old_end = 0;
1336 offsets->pre_header_padding = 0;
1337
1338 ctx->tsc = config->cb.ring_buffer_clock_read(chan);
1339 if ((int64_t) ctx->tsc == -EIO)
1340 return -EIO;
1341
1342 if (last_tsc_overflow(config, buf, ctx->tsc))
1343 ctx->rflags |= RING_BUFFER_RFLAG_FULL_TSC;
1344
1345 if (caa_unlikely(subbuf_offset(offsets->begin, ctx->chan) == 0)) {
1346 offsets->switch_new_start = 1; /* For offsets->begin */
1347 } else {
1348 offsets->size = config->cb.record_header_size(config, chan,
1349 offsets->begin,
1350 &offsets->pre_header_padding,
1351 ctx);
1352 offsets->size +=
1353 lib_ring_buffer_align(offsets->begin + offsets->size,
1354 ctx->largest_align)
1355 + ctx->data_size;
1356 if (caa_unlikely(subbuf_offset(offsets->begin, chan) +
1357 offsets->size > chan->backend.subbuf_size)) {
1358 offsets->switch_old_end = 1; /* For offsets->old */
1359 offsets->switch_new_start = 1; /* For offsets->begin */
1360 }
1361 }
1362 if (caa_unlikely(offsets->switch_new_start)) {
1363 unsigned long sb_index;
1364
1365 /*
1366 * We are typically not filling the previous buffer completely.
1367 */
1368 if (caa_likely(offsets->switch_old_end))
1369 offsets->begin = subbuf_align(offsets->begin, chan);
1370 offsets->begin = offsets->begin
1371 + config->cb.subbuffer_header_size();
1372 /* Test new buffer integrity */
1373 sb_index = subbuf_index(offsets->begin, chan);
1374 reserve_commit_diff =
1375 (buf_trunc(offsets->begin, chan)
1376 >> chan->backend.num_subbuf_order)
1377 - ((unsigned long) v_read(config,
1378 &shmp_index(handle, buf->commit_cold, sb_index)->cc_sb)
1379 & chan->commit_count_mask);
1380 if (caa_likely(reserve_commit_diff == 0)) {
1381 /* Next subbuffer not being written to. */
1382 if (caa_unlikely(config->mode != RING_BUFFER_OVERWRITE &&
1383 subbuf_trunc(offsets->begin, chan)
1384 - subbuf_trunc((unsigned long)
1385 uatomic_read(&buf->consumed), chan)
1386 >= chan->backend.buf_size)) {
1387 /*
1388 * We do not overwrite non consumed buffers
1389 * and we are full : record is lost.
1390 */
1391 v_inc(config, &buf->records_lost_full);
1392 return -ENOBUFS;
1393 } else {
1394 /*
1395 * Next subbuffer not being written to, and we
1396 * are either in overwrite mode or the buffer is
1397 * not full. It's safe to write in this new
1398 * subbuffer.
1399 */
1400 }
1401 } else {
1402 /*
1403 * Next subbuffer reserve offset does not match the
1404 * commit offset. Drop record in producer-consumer and
1405 * overwrite mode. Caused by either a writer OOPS or too
1406 * many nested writes over a reserve/commit pair.
1407 */
1408 v_inc(config, &buf->records_lost_wrap);
1409 return -EIO;
1410 }
1411 offsets->size =
1412 config->cb.record_header_size(config, chan,
1413 offsets->begin,
1414 &offsets->pre_header_padding,
1415 ctx);
1416 offsets->size +=
1417 lib_ring_buffer_align(offsets->begin + offsets->size,
1418 ctx->largest_align)
1419 + ctx->data_size;
1420 if (caa_unlikely(subbuf_offset(offsets->begin, chan)
1421 + offsets->size > chan->backend.subbuf_size)) {
1422 /*
1423 * Record too big for subbuffers, report error, don't
1424 * complete the sub-buffer switch.
1425 */
1426 v_inc(config, &buf->records_lost_big);
1427 return -ENOSPC;
1428 } else {
1429 /*
1430 * We just made a successful buffer switch and the
1431 * record fits in the new subbuffer. Let's write.
1432 */
1433 }
1434 } else {
1435 /*
1436 * Record fits in the current buffer and we are not on a switch
1437 * boundary. It's safe to write.
1438 */
1439 }
1440 offsets->end = offsets->begin + offsets->size;
1441
1442 if (caa_unlikely(subbuf_offset(offsets->end, chan) == 0)) {
1443 /*
1444 * The offset_end will fall at the very beginning of the next
1445 * subbuffer.
1446 */
1447 offsets->switch_new_end = 1; /* For offsets->begin */
1448 }
1449 return 0;
1450 }
1451
1452 /**
1453 * lib_ring_buffer_reserve_slow - Atomic slot reservation in a buffer.
1454 * @ctx: ring buffer context.
1455 *
1456 * Return : -NOBUFS if not enough space, -ENOSPC if event size too large,
1457 * -EIO for other errors, else returns 0.
1458 * It will take care of sub-buffer switching.
1459 */
1460 int lib_ring_buffer_reserve_slow(struct lttng_ust_lib_ring_buffer_ctx *ctx)
1461 {
1462 struct channel *chan = ctx->chan;
1463 struct lttng_ust_shm_handle *handle = ctx->handle;
1464 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
1465 struct lttng_ust_lib_ring_buffer *buf;
1466 struct switch_offsets offsets;
1467 int ret;
1468
1469 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
1470 buf = shmp(handle, chan->backend.buf[ctx->cpu].shmp);
1471 else
1472 buf = shmp(handle, chan->backend.buf[0].shmp);
1473 ctx->buf = buf;
1474
1475 offsets.size = 0;
1476
1477 do {
1478 ret = lib_ring_buffer_try_reserve_slow(buf, chan, &offsets,
1479 ctx);
1480 if (caa_unlikely(ret))
1481 return ret;
1482 } while (caa_unlikely(v_cmpxchg(config, &buf->offset, offsets.old,
1483 offsets.end)
1484 != offsets.old));
1485
1486 /*
1487 * Atomically update last_tsc. This update races against concurrent
1488 * atomic updates, but the race will always cause supplementary full TSC
1489 * records, never the opposite (missing a full TSC record when it would
1490 * be needed).
1491 */
1492 save_last_tsc(config, buf, ctx->tsc);
1493
1494 /*
1495 * Push the reader if necessary
1496 */
1497 lib_ring_buffer_reserve_push_reader(buf, chan, offsets.end - 1);
1498
1499 /*
1500 * Clear noref flag for this subbuffer.
1501 */
1502 lib_ring_buffer_clear_noref(config, &buf->backend,
1503 subbuf_index(offsets.end - 1, chan),
1504 handle);
1505
1506 /*
1507 * Switch old subbuffer if needed.
1508 */
1509 if (caa_unlikely(offsets.switch_old_end)) {
1510 lib_ring_buffer_clear_noref(config, &buf->backend,
1511 subbuf_index(offsets.old - 1, chan),
1512 handle);
1513 lib_ring_buffer_switch_old_end(buf, chan, &offsets, ctx->tsc, handle);
1514 }
1515
1516 /*
1517 * Populate new subbuffer.
1518 */
1519 if (caa_unlikely(offsets.switch_new_start))
1520 lib_ring_buffer_switch_new_start(buf, chan, &offsets, ctx->tsc, handle);
1521
1522 if (caa_unlikely(offsets.switch_new_end))
1523 lib_ring_buffer_switch_new_end(buf, chan, &offsets, ctx->tsc, handle);
1524
1525 ctx->slot_size = offsets.size;
1526 ctx->pre_offset = offsets.begin;
1527 ctx->buf_offset = offsets.begin + offsets.pre_header_padding;
1528 return 0;
1529 }
This page took 0.090525 seconds and 5 git commands to generate.