Remove unused libringbuffer/ring_buffer_abi.c
[lttng-ust.git] / libringbuffer / ring_buffer_frontend.c
CommitLineData
852c2936
MD
1/*
2 * ring_buffer_frontend.c
3 *
4 * (C) Copyright 2005-2010 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
5 *
6 * Ring buffer wait-free buffer synchronization. Producer-consumer and flight
7 * recorder (overwrite) modes. See thesis:
8 *
9 * Desnoyers, Mathieu (2009), "Low-Impact Operating System Tracing", Ph.D.
10 * dissertation, Ecole Polytechnique de Montreal.
11 * http://www.lttng.org/pub/thesis/desnoyers-dissertation-2009-12.pdf
12 *
13 * - Algorithm presentation in Chapter 5:
14 * "Lockless Multi-Core High-Throughput Buffering".
15 * - Algorithm formal verification in Section 8.6:
16 * "Formal verification of LTTng"
17 *
18 * Author:
19 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
20 *
21 * Inspired from LTT and RelayFS:
22 * Karim Yaghmour <karim@opersys.com>
23 * Tom Zanussi <zanussi@us.ibm.com>
24 * Bob Wisniewski <bob@watson.ibm.com>
25 * And from K42 :
26 * Bob Wisniewski <bob@watson.ibm.com>
27 *
28 * Buffer reader semantic :
29 *
30 * - get_subbuf_size
31 * while buffer is not finalized and empty
32 * - get_subbuf
33 * - if return value != 0, continue
34 * - splice one subbuffer worth of data to a pipe
35 * - splice the data from pipe to disk/network
36 * - put_subbuf
37 *
38 * Dual LGPL v2.1/GPL v2 license.
39 */
40
a6352fd4 41#include <sys/types.h>
431d5cf0
MD
42#include <sys/mman.h>
43#include <sys/stat.h>
44#include <fcntl.h>
14641deb 45#include <urcu/compiler.h>
a6352fd4 46#include <urcu/ref.h>
14641deb 47
a6352fd4 48#include "smp.h"
4318ae1b 49#include <lttng/ringbuffer-config.h>
4931a13e
MD
50#include "backend.h"
51#include "frontend.h"
a6352fd4 52#include "shm.h"
852c2936 53
431d5cf0
MD
54#ifndef max
55#define max(a, b) ((a) > (b) ? (a) : (b))
56#endif
57
2432c3c9
MD
58/*
59 * Use POSIX SHM: shm_open(3) and shm_unlink(3).
60 * close(2) to close the fd returned by shm_open.
61 * shm_unlink releases the shared memory object name.
62 * ftruncate(2) sets the size of the memory object.
63 * mmap/munmap maps the shared memory obj to a virtual address in the
64 * calling proceess (should be done both in libust and consumer).
65 * See shm_overview(7) for details.
66 * Pass file descriptor returned by shm_open(3) to ltt-sessiond through
67 * a UNIX socket.
68 *
69 * Since we don't need to access the object using its name, we can
70 * immediately shm_unlink(3) it, and only keep the handle with its file
71 * descriptor.
72 */
73
852c2936
MD
74/*
75 * Internal structure representing offsets to use at a sub-buffer switch.
76 */
77struct switch_offsets {
78 unsigned long begin, end, old;
79 size_t pre_header_padding, size;
80 unsigned int switch_new_start:1, switch_new_end:1, switch_old_start:1,
81 switch_old_end:1;
82};
83
a6352fd4 84__thread unsigned int lib_ring_buffer_nesting;
852c2936
MD
85
86static
87void lib_ring_buffer_print_errors(struct channel *chan,
4cfec15c 88 struct lttng_ust_lib_ring_buffer *buf, int cpu,
38fae1d3 89 struct lttng_ust_shm_handle *handle);
852c2936
MD
90
91/*
92 * Must be called under cpu hotplug protection.
93 */
4cfec15c 94void lib_ring_buffer_free(struct lttng_ust_lib_ring_buffer *buf,
38fae1d3 95 struct lttng_ust_shm_handle *handle)
852c2936 96{
1d498196 97 struct channel *chan = shmp(handle, buf->backend.chan);
852c2936 98
1d498196 99 lib_ring_buffer_print_errors(chan, buf, buf->backend.cpu, handle);
431d5cf0
MD
100 /* buf->commit_hot will be freed by shm teardown */
101 /* buf->commit_cold will be freed by shm teardown */
852c2936
MD
102
103 lib_ring_buffer_backend_free(&buf->backend);
104}
105
106/**
107 * lib_ring_buffer_reset - Reset ring buffer to initial values.
108 * @buf: Ring buffer.
109 *
110 * Effectively empty the ring buffer. Should be called when the buffer is not
111 * used for writing. The ring buffer can be opened for reading, but the reader
112 * should not be using the iterator concurrently with reset. The previous
113 * current iterator record is reset.
114 */
4cfec15c 115void lib_ring_buffer_reset(struct lttng_ust_lib_ring_buffer *buf,
38fae1d3 116 struct lttng_ust_shm_handle *handle)
852c2936 117{
1d498196 118 struct channel *chan = shmp(handle, buf->backend.chan);
4cfec15c 119 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
852c2936
MD
120 unsigned int i;
121
122 /*
123 * Reset iterator first. It will put the subbuffer if it currently holds
124 * it.
125 */
852c2936
MD
126 v_set(config, &buf->offset, 0);
127 for (i = 0; i < chan->backend.num_subbuf; i++) {
4746ae29
MD
128 v_set(config, &shmp_index(handle, buf->commit_hot, i)->cc, 0);
129 v_set(config, &shmp_index(handle, buf->commit_hot, i)->seq, 0);
130 v_set(config, &shmp_index(handle, buf->commit_cold, i)->cc_sb, 0);
852c2936 131 }
a6352fd4
MD
132 uatomic_set(&buf->consumed, 0);
133 uatomic_set(&buf->record_disabled, 0);
852c2936 134 v_set(config, &buf->last_tsc, 0);
1d498196 135 lib_ring_buffer_backend_reset(&buf->backend, handle);
852c2936
MD
136 /* Don't reset number of active readers */
137 v_set(config, &buf->records_lost_full, 0);
138 v_set(config, &buf->records_lost_wrap, 0);
139 v_set(config, &buf->records_lost_big, 0);
140 v_set(config, &buf->records_count, 0);
141 v_set(config, &buf->records_overrun, 0);
142 buf->finalized = 0;
143}
852c2936
MD
144
145/**
146 * channel_reset - Reset channel to initial values.
147 * @chan: Channel.
148 *
149 * Effectively empty the channel. Should be called when the channel is not used
150 * for writing. The channel can be opened for reading, but the reader should not
151 * be using the iterator concurrently with reset. The previous current iterator
152 * record is reset.
153 */
154void channel_reset(struct channel *chan)
155{
156 /*
157 * Reset iterators first. Will put the subbuffer if held for reading.
158 */
a6352fd4 159 uatomic_set(&chan->record_disabled, 0);
852c2936
MD
160 /* Don't reset commit_count_mask, still valid */
161 channel_backend_reset(&chan->backend);
162 /* Don't reset switch/read timer interval */
163 /* Don't reset notifiers and notifier enable bits */
164 /* Don't reset reader reference count */
165}
852c2936
MD
166
167/*
168 * Must be called under cpu hotplug protection.
169 */
4cfec15c 170int lib_ring_buffer_create(struct lttng_ust_lib_ring_buffer *buf,
a6352fd4 171 struct channel_backend *chanb, int cpu,
38fae1d3 172 struct lttng_ust_shm_handle *handle,
1d498196 173 struct shm_object *shmobj)
852c2936 174{
4cfec15c 175 const struct lttng_ust_lib_ring_buffer_config *config = &chanb->config;
14641deb 176 struct channel *chan = caa_container_of(chanb, struct channel, backend);
a3f61e7f 177 void *priv = channel_get_private(chan);
852c2936
MD
178 size_t subbuf_header_size;
179 u64 tsc;
180 int ret;
181
182 /* Test for cpu hotplug */
183 if (buf->backend.allocated)
184 return 0;
185
a6352fd4 186 ret = lib_ring_buffer_backend_create(&buf->backend, &chan->backend,
1d498196 187 cpu, handle, shmobj);
852c2936
MD
188 if (ret)
189 return ret;
190
1d498196
MD
191 align_shm(shmobj, __alignof__(struct commit_counters_hot));
192 set_shmp(buf->commit_hot,
193 zalloc_shm(shmobj,
194 sizeof(struct commit_counters_hot) * chan->backend.num_subbuf));
195 if (!shmp(handle, buf->commit_hot)) {
852c2936
MD
196 ret = -ENOMEM;
197 goto free_chanbuf;
198 }
199
1d498196
MD
200 align_shm(shmobj, __alignof__(struct commit_counters_cold));
201 set_shmp(buf->commit_cold,
202 zalloc_shm(shmobj,
203 sizeof(struct commit_counters_cold) * chan->backend.num_subbuf));
204 if (!shmp(handle, buf->commit_cold)) {
852c2936
MD
205 ret = -ENOMEM;
206 goto free_commit;
207 }
208
852c2936
MD
209 /*
210 * Write the subbuffer header for first subbuffer so we know the total
211 * duration of data gathering.
212 */
213 subbuf_header_size = config->cb.subbuffer_header_size();
214 v_set(config, &buf->offset, subbuf_header_size);
4746ae29 215 subbuffer_id_clear_noref(config, &shmp_index(handle, buf->backend.buf_wsb, 0)->id);
1d498196
MD
216 tsc = config->cb.ring_buffer_clock_read(shmp(handle, buf->backend.chan));
217 config->cb.buffer_begin(buf, tsc, 0, handle);
4746ae29 218 v_add(config, subbuf_header_size, &shmp_index(handle, buf->commit_hot, 0)->cc);
852c2936
MD
219
220 if (config->cb.buffer_create) {
1d498196 221 ret = config->cb.buffer_create(buf, priv, cpu, chanb->name, handle);
852c2936
MD
222 if (ret)
223 goto free_init;
224 }
852c2936 225 buf->backend.allocated = 1;
852c2936
MD
226 return 0;
227
228 /* Error handling */
229free_init:
a6352fd4 230 /* commit_cold will be freed by shm teardown */
852c2936 231free_commit:
a6352fd4 232 /* commit_hot will be freed by shm teardown */
852c2936
MD
233free_chanbuf:
234 lib_ring_buffer_backend_free(&buf->backend);
235 return ret;
236}
237
1d498196 238#if 0
852c2936
MD
239static void switch_buffer_timer(unsigned long data)
240{
4cfec15c 241 struct lttng_ust_lib_ring_buffer *buf = (struct lttng_ust_lib_ring_buffer *)data;
1d498196 242 struct channel *chan = shmp(handle, buf->backend.chan);
4cfec15c 243 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
852c2936
MD
244
245 /*
246 * Only flush buffers periodically if readers are active.
247 */
824f40b8 248 if (uatomic_read(&buf->active_readers) || uatomic_read(&buf->active_shadow_readers))
1d498196 249 lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE, handle);
852c2936 250
a6352fd4
MD
251 //TODO timers
252 //if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
253 // mod_timer_pinned(&buf->switch_timer,
254 // jiffies + chan->switch_timer_interval);
255 //else
256 // mod_timer(&buf->switch_timer,
257 // jiffies + chan->switch_timer_interval);
852c2936 258}
1d498196 259#endif //0
852c2936 260
4cfec15c 261static void lib_ring_buffer_start_switch_timer(struct lttng_ust_lib_ring_buffer *buf,
38fae1d3 262 struct lttng_ust_shm_handle *handle)
852c2936 263{
1d498196 264 struct channel *chan = shmp(handle, buf->backend.chan);
4cfec15c 265 //const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
852c2936
MD
266
267 if (!chan->switch_timer_interval || buf->switch_timer_enabled)
268 return;
a6352fd4
MD
269 //TODO
270 //init_timer(&buf->switch_timer);
271 //buf->switch_timer.function = switch_buffer_timer;
272 //buf->switch_timer.expires = jiffies + chan->switch_timer_interval;
273 //buf->switch_timer.data = (unsigned long)buf;
274 //if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
275 // add_timer_on(&buf->switch_timer, buf->backend.cpu);
276 //else
277 // add_timer(&buf->switch_timer);
852c2936
MD
278 buf->switch_timer_enabled = 1;
279}
280
4cfec15c 281static void lib_ring_buffer_stop_switch_timer(struct lttng_ust_lib_ring_buffer *buf,
38fae1d3 282 struct lttng_ust_shm_handle *handle)
852c2936 283{
1d498196 284 struct channel *chan = shmp(handle, buf->backend.chan);
852c2936
MD
285
286 if (!chan->switch_timer_interval || !buf->switch_timer_enabled)
287 return;
288
a6352fd4
MD
289 //TODO
290 //del_timer_sync(&buf->switch_timer);
852c2936
MD
291 buf->switch_timer_enabled = 0;
292}
293
1d498196 294#if 0
852c2936
MD
295/*
296 * Polling timer to check the channels for data.
297 */
298static void read_buffer_timer(unsigned long data)
299{
4cfec15c 300 struct lttng_ust_lib_ring_buffer *buf = (struct lttng_ust_lib_ring_buffer *)data;
1d498196 301 struct channel *chan = shmp(handle, buf->backend.chan);
4cfec15c 302 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
852c2936
MD
303
304 CHAN_WARN_ON(chan, !buf->backend.allocated);
305
824f40b8 306 if (uatomic_read(&buf->active_readers) || uatomic_read(&buf->active_shadow_readers))
852c2936 307 && lib_ring_buffer_poll_deliver(config, buf, chan)) {
a6352fd4
MD
308 //TODO
309 //wake_up_interruptible(&buf->read_wait);
310 //wake_up_interruptible(&chan->read_wait);
852c2936
MD
311 }
312
a6352fd4
MD
313 //TODO
314 //if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
315 // mod_timer_pinned(&buf->read_timer,
316 // jiffies + chan->read_timer_interval);
317 //else
318 // mod_timer(&buf->read_timer,
319 // jiffies + chan->read_timer_interval);
852c2936 320}
1d498196 321#endif //0
852c2936 322
4cfec15c 323static void lib_ring_buffer_start_read_timer(struct lttng_ust_lib_ring_buffer *buf,
38fae1d3 324 struct lttng_ust_shm_handle *handle)
852c2936 325{
1d498196 326 struct channel *chan = shmp(handle, buf->backend.chan);
4cfec15c 327 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
852c2936
MD
328
329 if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER
330 || !chan->read_timer_interval
331 || buf->read_timer_enabled)
332 return;
333
a6352fd4
MD
334 //TODO
335 //init_timer(&buf->read_timer);
336 //buf->read_timer.function = read_buffer_timer;
337 //buf->read_timer.expires = jiffies + chan->read_timer_interval;
338 //buf->read_timer.data = (unsigned long)buf;
852c2936 339
a6352fd4
MD
340 //if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
341 // add_timer_on(&buf->read_timer, buf->backend.cpu);
342 //else
343 // add_timer(&buf->read_timer);
852c2936
MD
344 buf->read_timer_enabled = 1;
345}
346
4cfec15c 347static void lib_ring_buffer_stop_read_timer(struct lttng_ust_lib_ring_buffer *buf,
38fae1d3 348 struct lttng_ust_shm_handle *handle)
852c2936 349{
1d498196 350 struct channel *chan = shmp(handle, buf->backend.chan);
4cfec15c 351 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
852c2936
MD
352
353 if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER
354 || !chan->read_timer_interval
355 || !buf->read_timer_enabled)
356 return;
357
a6352fd4
MD
358 //TODO
359 //del_timer_sync(&buf->read_timer);
852c2936
MD
360 /*
361 * do one more check to catch data that has been written in the last
362 * timer period.
363 */
1d498196 364 if (lib_ring_buffer_poll_deliver(config, buf, chan, handle)) {
a6352fd4
MD
365 //TODO
366 //wake_up_interruptible(&buf->read_wait);
367 //wake_up_interruptible(&chan->read_wait);
852c2936
MD
368 }
369 buf->read_timer_enabled = 0;
370}
371
1d498196 372static void channel_unregister_notifiers(struct channel *chan,
38fae1d3 373 struct lttng_ust_shm_handle *handle)
852c2936 374{
4cfec15c 375 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
852c2936
MD
376 int cpu;
377
852c2936 378 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
852c2936 379 for_each_possible_cpu(cpu) {
4cfec15c 380 struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[cpu].shmp);
a6352fd4 381
1d498196
MD
382 lib_ring_buffer_stop_switch_timer(buf, handle);
383 lib_ring_buffer_stop_read_timer(buf, handle);
852c2936 384 }
852c2936 385 } else {
4cfec15c 386 struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[0].shmp);
852c2936 387
1d498196
MD
388 lib_ring_buffer_stop_switch_timer(buf, handle);
389 lib_ring_buffer_stop_read_timer(buf, handle);
852c2936 390 }
8d8a24c8 391 //channel_backend_unregister_notifiers(&chan->backend);
852c2936
MD
392}
393
38fae1d3 394static void channel_free(struct channel *chan, struct lttng_ust_shm_handle *handle,
824f40b8 395 int shadow)
852c2936 396{
824f40b8
MD
397 if (!shadow)
398 channel_backend_free(&chan->backend, handle);
431d5cf0 399 /* chan is freed by shm teardown */
1d498196
MD
400 shm_object_table_destroy(handle->table);
401 free(handle);
852c2936
MD
402}
403
404/**
405 * channel_create - Create channel.
406 * @config: ring buffer instance configuration
407 * @name: name of the channel
a3f61e7f
MD
408 * @priv_data: ring buffer client private data area pointer (output)
409 * @priv_data_size: length, in bytes, of the private data area.
d028eddb 410 * @priv_data_init: initialization data for private data.
852c2936
MD
411 * @buf_addr: pointer the the beginning of the preallocated buffer contiguous
412 * address mapping. It is used only by RING_BUFFER_STATIC
413 * configuration. It can be set to NULL for other backends.
414 * @subbuf_size: subbuffer size
415 * @num_subbuf: number of subbuffers
416 * @switch_timer_interval: Time interval (in us) to fill sub-buffers with
417 * padding to let readers get those sub-buffers.
418 * Used for live streaming.
419 * @read_timer_interval: Time interval (in us) to wake up pending readers.
420 *
421 * Holds cpu hotplug.
422 * Returns NULL on failure.
423 */
4cfec15c 424struct lttng_ust_shm_handle *channel_create(const struct lttng_ust_lib_ring_buffer_config *config,
a3f61e7f
MD
425 const char *name,
426 void **priv_data,
427 size_t priv_data_align,
428 size_t priv_data_size,
d028eddb 429 void *priv_data_init,
a3f61e7f 430 void *buf_addr, size_t subbuf_size,
852c2936 431 size_t num_subbuf, unsigned int switch_timer_interval,
193183fb
MD
432 unsigned int read_timer_interval,
433 int *shm_fd, int *wait_fd, uint64_t *memory_map_size)
852c2936 434{
1d498196 435 int ret, cpu;
a3f61e7f 436 size_t shmsize, chansize;
852c2936 437 struct channel *chan;
38fae1d3 438 struct lttng_ust_shm_handle *handle;
1d498196 439 struct shm_object *shmobj;
193183fb 440 struct shm_ref *ref;
852c2936
MD
441
442 if (lib_ring_buffer_check_config(config, switch_timer_interval,
443 read_timer_interval))
444 return NULL;
445
38fae1d3 446 handle = zmalloc(sizeof(struct lttng_ust_shm_handle));
431d5cf0
MD
447 if (!handle)
448 return NULL;
449
1d498196
MD
450 /* Allocate table for channel + per-cpu buffers */
451 handle->table = shm_object_table_create(1 + num_possible_cpus());
452 if (!handle->table)
453 goto error_table_alloc;
852c2936 454
1d498196
MD
455 /* Calculate the shm allocation layout */
456 shmsize = sizeof(struct channel);
c1fca457 457 shmsize += offset_align(shmsize, __alignof__(struct lttng_ust_lib_ring_buffer_shmp));
1d498196 458 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
4cfec15c 459 shmsize += sizeof(struct lttng_ust_lib_ring_buffer_shmp) * num_possible_cpus();
1d498196 460 else
4cfec15c 461 shmsize += sizeof(struct lttng_ust_lib_ring_buffer_shmp);
a3f61e7f
MD
462 chansize = shmsize;
463 shmsize += offset_align(shmsize, priv_data_align);
464 shmsize += priv_data_size;
a6352fd4 465
1d498196 466 shmobj = shm_object_table_append(handle->table, shmsize);
b5a14697
MD
467 if (!shmobj)
468 goto error_append;
57773204 469 /* struct channel is at object 0, offset 0 (hardcoded) */
a3f61e7f 470 set_shmp(handle->chan, zalloc_shm(shmobj, chansize));
57773204
MD
471 assert(handle->chan._ref.index == 0);
472 assert(handle->chan._ref.offset == 0);
1d498196 473 chan = shmp(handle, handle->chan);
a6352fd4 474 if (!chan)
1d498196 475 goto error_append;
a6352fd4 476
a3f61e7f
MD
477 /* space for private data */
478 if (priv_data_size) {
479 DECLARE_SHMP(void, priv_data_alloc);
480
481 align_shm(shmobj, priv_data_align);
482 chan->priv_data_offset = shmobj->allocated_len;
483 set_shmp(priv_data_alloc, zalloc_shm(shmobj, priv_data_size));
484 if (!shmp(handle, priv_data_alloc))
485 goto error_append;
486 *priv_data = channel_get_private(chan);
d028eddb 487 memcpy(*priv_data, priv_data_init, priv_data_size);
a3f61e7f
MD
488 } else {
489 chan->priv_data_offset = -1;
490 *priv_data = NULL;
491 }
492
493 ret = channel_backend_init(&chan->backend, name, config,
1d498196 494 subbuf_size, num_subbuf, handle);
852c2936 495 if (ret)
1d498196 496 goto error_backend_init;
852c2936
MD
497
498 chan->commit_count_mask = (~0UL >> chan->backend.num_subbuf_order);
a6352fd4
MD
499 //TODO
500 //chan->switch_timer_interval = usecs_to_jiffies(switch_timer_interval);
501 //chan->read_timer_interval = usecs_to_jiffies(read_timer_interval);
a6352fd4
MD
502 //TODO
503 //init_waitqueue_head(&chan->read_wait);
504 //init_waitqueue_head(&chan->hp_wait);
852c2936
MD
505
506 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
852c2936
MD
507 /*
508 * In case of non-hotplug cpu, if the ring-buffer is allocated
509 * in early initcall, it will not be notified of secondary cpus.
510 * In that off case, we need to allocate for all possible cpus.
511 */
852c2936 512 for_each_possible_cpu(cpu) {
4cfec15c 513 struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[cpu].shmp);
1d498196
MD
514 lib_ring_buffer_start_switch_timer(buf, handle);
515 lib_ring_buffer_start_read_timer(buf, handle);
852c2936 516 }
852c2936 517 } else {
4cfec15c 518 struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[0].shmp);
852c2936 519
1d498196
MD
520 lib_ring_buffer_start_switch_timer(buf, handle);
521 lib_ring_buffer_start_read_timer(buf, handle);
852c2936 522 }
193183fb
MD
523 ref = &handle->chan._ref;
524 shm_get_object_data(handle, ref, shm_fd, wait_fd, memory_map_size);
431d5cf0 525 return handle;
852c2936 526
1d498196
MD
527error_backend_init:
528error_append:
529 shm_object_table_destroy(handle->table);
530error_table_alloc:
431d5cf0 531 free(handle);
852c2936
MD
532 return NULL;
533}
852c2936 534
38fae1d3 535struct lttng_ust_shm_handle *channel_handle_create(int shm_fd, int wait_fd,
193183fb
MD
536 uint64_t memory_map_size)
537{
38fae1d3 538 struct lttng_ust_shm_handle *handle;
193183fb
MD
539 struct shm_object *object;
540
38fae1d3 541 handle = zmalloc(sizeof(struct lttng_ust_shm_handle));
193183fb
MD
542 if (!handle)
543 return NULL;
544
545 /* Allocate table for channel + per-cpu buffers */
546 handle->table = shm_object_table_create(1 + num_possible_cpus());
547 if (!handle->table)
548 goto error_table_alloc;
549 /* Add channel object */
550 object = shm_object_table_append_shadow(handle->table,
551 shm_fd, wait_fd, memory_map_size);
552 if (!object)
553 goto error_table_object;
57773204
MD
554 /* struct channel is at object 0, offset 0 (hardcoded) */
555 handle->chan._ref.index = 0;
556 handle->chan._ref.offset = 0;
193183fb
MD
557 return handle;
558
559error_table_object:
560 shm_object_table_destroy(handle->table);
561error_table_alloc:
562 free(handle);
563 return NULL;
564}
565
38fae1d3 566int channel_handle_add_stream(struct lttng_ust_shm_handle *handle,
193183fb
MD
567 int shm_fd, int wait_fd, uint64_t memory_map_size)
568{
569 struct shm_object *object;
570
571 /* Add stream object */
572 object = shm_object_table_append_shadow(handle->table,
573 shm_fd, wait_fd, memory_map_size);
574 if (!object)
575 return -1;
576 return 0;
577}
578
852c2936 579static
38fae1d3 580void channel_release(struct channel *chan, struct lttng_ust_shm_handle *handle,
824f40b8 581 int shadow)
852c2936 582{
824f40b8 583 channel_free(chan, handle, shadow);
852c2936
MD
584}
585
586/**
587 * channel_destroy - Finalize, wait for q.s. and destroy channel.
588 * @chan: channel to destroy
589 *
590 * Holds cpu hotplug.
431d5cf0
MD
591 * Call "destroy" callback, finalize channels, decrement the channel
592 * reference count. Note that when readers have completed data
593 * consumption of finalized channels, get_subbuf() will return -ENODATA.
a3f61e7f 594 * They should release their handle at that point.
852c2936 595 */
a3f61e7f 596void channel_destroy(struct channel *chan, struct lttng_ust_shm_handle *handle,
824f40b8 597 int shadow)
852c2936 598{
4cfec15c 599 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
431d5cf0 600 int cpu;
852c2936 601
824f40b8
MD
602 if (shadow) {
603 channel_release(chan, handle, shadow);
a3f61e7f 604 return;
824f40b8
MD
605 }
606
1d498196 607 channel_unregister_notifiers(chan, handle);
852c2936
MD
608
609 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
852c2936 610 for_each_channel_cpu(cpu, chan) {
4cfec15c 611 struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[cpu].shmp);
852c2936
MD
612
613 if (config->cb.buffer_finalize)
614 config->cb.buffer_finalize(buf,
a3f61e7f 615 channel_get_private(chan),
1d498196 616 cpu, handle);
852c2936 617 if (buf->backend.allocated)
1d498196
MD
618 lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH,
619 handle);
852c2936
MD
620 /*
621 * Perform flush before writing to finalized.
622 */
a6352fd4 623 cmm_smp_wmb();
14641deb 624 CMM_ACCESS_ONCE(buf->finalized) = 1;
a6352fd4 625 //wake_up_interruptible(&buf->read_wait);
852c2936
MD
626 }
627 } else {
4cfec15c 628 struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[0].shmp);
852c2936
MD
629
630 if (config->cb.buffer_finalize)
a3f61e7f 631 config->cb.buffer_finalize(buf, channel_get_private(chan), -1, handle);
852c2936 632 if (buf->backend.allocated)
1d498196
MD
633 lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH,
634 handle);
852c2936
MD
635 /*
636 * Perform flush before writing to finalized.
637 */
a6352fd4 638 cmm_smp_wmb();
14641deb 639 CMM_ACCESS_ONCE(buf->finalized) = 1;
a6352fd4 640 //wake_up_interruptible(&buf->read_wait);
852c2936 641 }
14641deb 642 CMM_ACCESS_ONCE(chan->finalized) = 1;
a6352fd4
MD
643 //wake_up_interruptible(&chan->hp_wait);
644 //wake_up_interruptible(&chan->read_wait);
431d5cf0
MD
645 /*
646 * sessiond/consumer are keeping a reference on the shm file
647 * descriptor directly. No need to refcount.
648 */
824f40b8 649 channel_release(chan, handle, shadow);
a3f61e7f 650 return;
852c2936 651}
852c2936 652
4cfec15c
MD
653struct lttng_ust_lib_ring_buffer *channel_get_ring_buffer(
654 const struct lttng_ust_lib_ring_buffer_config *config,
1d498196 655 struct channel *chan, int cpu,
38fae1d3 656 struct lttng_ust_shm_handle *handle,
381c0f1e
MD
657 int *shm_fd, int *wait_fd,
658 uint64_t *memory_map_size)
852c2936 659{
381c0f1e
MD
660 struct shm_ref *ref;
661
662 if (config->alloc == RING_BUFFER_ALLOC_GLOBAL) {
663 ref = &chan->backend.buf[0].shmp._ref;
664 shm_get_object_data(handle, ref, shm_fd, wait_fd,
665 memory_map_size);
1d498196 666 return shmp(handle, chan->backend.buf[0].shmp);
381c0f1e 667 } else {
e095d803
MD
668 if (cpu >= num_possible_cpus())
669 return NULL;
381c0f1e
MD
670 ref = &chan->backend.buf[cpu].shmp._ref;
671 shm_get_object_data(handle, ref, shm_fd, wait_fd,
672 memory_map_size);
1d498196 673 return shmp(handle, chan->backend.buf[cpu].shmp);
381c0f1e 674 }
852c2936 675}
852c2936 676
4cfec15c 677int lib_ring_buffer_open_read(struct lttng_ust_lib_ring_buffer *buf,
38fae1d3 678 struct lttng_ust_shm_handle *handle,
824f40b8 679 int shadow)
852c2936 680{
824f40b8
MD
681 if (shadow) {
682 if (uatomic_cmpxchg(&buf->active_shadow_readers, 0, 1) != 0)
683 return -EBUSY;
684 cmm_smp_mb();
685 return 0;
686 }
a6352fd4 687 if (uatomic_cmpxchg(&buf->active_readers, 0, 1) != 0)
852c2936 688 return -EBUSY;
a6352fd4 689 cmm_smp_mb();
852c2936
MD
690 return 0;
691}
852c2936 692
4cfec15c 693void lib_ring_buffer_release_read(struct lttng_ust_lib_ring_buffer *buf,
38fae1d3 694 struct lttng_ust_shm_handle *handle,
824f40b8 695 int shadow)
852c2936 696{
1d498196 697 struct channel *chan = shmp(handle, buf->backend.chan);
852c2936 698
824f40b8
MD
699 if (shadow) {
700 CHAN_WARN_ON(chan, uatomic_read(&buf->active_shadow_readers) != 1);
701 cmm_smp_mb();
702 uatomic_dec(&buf->active_shadow_readers);
703 return;
704 }
a6352fd4
MD
705 CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1);
706 cmm_smp_mb();
707 uatomic_dec(&buf->active_readers);
852c2936
MD
708}
709
710/**
711 * lib_ring_buffer_snapshot - save subbuffer position snapshot (for read)
712 * @buf: ring buffer
713 * @consumed: consumed count indicating the position where to read
714 * @produced: produced count, indicates position when to stop reading
715 *
716 * Returns -ENODATA if buffer is finalized, -EAGAIN if there is currently no
717 * data to read at consumed position, or 0 if the get operation succeeds.
852c2936
MD
718 */
719
4cfec15c 720int lib_ring_buffer_snapshot(struct lttng_ust_lib_ring_buffer *buf,
1d498196 721 unsigned long *consumed, unsigned long *produced,
38fae1d3 722 struct lttng_ust_shm_handle *handle)
852c2936 723{
1d498196 724 struct channel *chan = shmp(handle, buf->backend.chan);
4cfec15c 725 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
852c2936
MD
726 unsigned long consumed_cur, write_offset;
727 int finalized;
728
14641deb 729 finalized = CMM_ACCESS_ONCE(buf->finalized);
852c2936
MD
730 /*
731 * Read finalized before counters.
732 */
a6352fd4
MD
733 cmm_smp_rmb();
734 consumed_cur = uatomic_read(&buf->consumed);
852c2936
MD
735 /*
736 * No need to issue a memory barrier between consumed count read and
737 * write offset read, because consumed count can only change
738 * concurrently in overwrite mode, and we keep a sequence counter
739 * identifier derived from the write offset to check we are getting
740 * the same sub-buffer we are expecting (the sub-buffers are atomically
741 * "tagged" upon writes, tags are checked upon read).
742 */
743 write_offset = v_read(config, &buf->offset);
744
745 /*
746 * Check that we are not about to read the same subbuffer in
747 * which the writer head is.
748 */
749 if (subbuf_trunc(write_offset, chan) - subbuf_trunc(consumed_cur, chan)
750 == 0)
751 goto nodata;
752
753 *consumed = consumed_cur;
754 *produced = subbuf_trunc(write_offset, chan);
755
756 return 0;
757
758nodata:
759 /*
760 * The memory barriers __wait_event()/wake_up_interruptible() take care
761 * of "raw_spin_is_locked" memory ordering.
762 */
763 if (finalized)
764 return -ENODATA;
852c2936
MD
765 else
766 return -EAGAIN;
767}
852c2936
MD
768
769/**
770 * lib_ring_buffer_put_snapshot - move consumed counter forward
771 * @buf: ring buffer
772 * @consumed_new: new consumed count value
773 */
4cfec15c 774void lib_ring_buffer_move_consumer(struct lttng_ust_lib_ring_buffer *buf,
1d498196 775 unsigned long consumed_new,
38fae1d3 776 struct lttng_ust_shm_handle *handle)
852c2936 777{
4cfec15c 778 struct lttng_ust_lib_ring_buffer_backend *bufb = &buf->backend;
1d498196 779 struct channel *chan = shmp(handle, bufb->chan);
852c2936
MD
780 unsigned long consumed;
781
824f40b8
MD
782 CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1
783 && uatomic_read(&buf->active_shadow_readers) != 1);
852c2936
MD
784
785 /*
786 * Only push the consumed value forward.
787 * If the consumed cmpxchg fails, this is because we have been pushed by
788 * the writer in flight recorder mode.
789 */
a6352fd4 790 consumed = uatomic_read(&buf->consumed);
852c2936 791 while ((long) consumed - (long) consumed_new < 0)
a6352fd4
MD
792 consumed = uatomic_cmpxchg(&buf->consumed, consumed,
793 consumed_new);
852c2936 794}
852c2936
MD
795
796/**
797 * lib_ring_buffer_get_subbuf - get exclusive access to subbuffer for reading
798 * @buf: ring buffer
799 * @consumed: consumed count indicating the position where to read
800 *
801 * Returns -ENODATA if buffer is finalized, -EAGAIN if there is currently no
802 * data to read at consumed position, or 0 if the get operation succeeds.
852c2936 803 */
4cfec15c 804int lib_ring_buffer_get_subbuf(struct lttng_ust_lib_ring_buffer *buf,
1d498196 805 unsigned long consumed,
38fae1d3 806 struct lttng_ust_shm_handle *handle)
852c2936 807{
1d498196 808 struct channel *chan = shmp(handle, buf->backend.chan);
4cfec15c 809 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
852c2936
MD
810 unsigned long consumed_cur, consumed_idx, commit_count, write_offset;
811 int ret;
812 int finalized;
813
814retry:
14641deb 815 finalized = CMM_ACCESS_ONCE(buf->finalized);
852c2936
MD
816 /*
817 * Read finalized before counters.
818 */
a6352fd4
MD
819 cmm_smp_rmb();
820 consumed_cur = uatomic_read(&buf->consumed);
852c2936 821 consumed_idx = subbuf_index(consumed, chan);
4746ae29 822 commit_count = v_read(config, &shmp_index(handle, buf->commit_cold, consumed_idx)->cc_sb);
852c2936
MD
823 /*
824 * Make sure we read the commit count before reading the buffer
825 * data and the write offset. Correct consumed offset ordering
826 * wrt commit count is insured by the use of cmpxchg to update
827 * the consumed offset.
852c2936 828 */
a6352fd4
MD
829 /*
830 * Local rmb to match the remote wmb to read the commit count
831 * before the buffer data and the write offset.
832 */
833 cmm_smp_rmb();
852c2936
MD
834
835 write_offset = v_read(config, &buf->offset);
836
837 /*
838 * Check that the buffer we are getting is after or at consumed_cur
839 * position.
840 */
841 if ((long) subbuf_trunc(consumed, chan)
842 - (long) subbuf_trunc(consumed_cur, chan) < 0)
843 goto nodata;
844
845 /*
846 * Check that the subbuffer we are trying to consume has been
847 * already fully committed.
848 */
849 if (((commit_count - chan->backend.subbuf_size)
850 & chan->commit_count_mask)
851 - (buf_trunc(consumed_cur, chan)
852 >> chan->backend.num_subbuf_order)
853 != 0)
854 goto nodata;
855
856 /*
857 * Check that we are not about to read the same subbuffer in
858 * which the writer head is.
859 */
860 if (subbuf_trunc(write_offset, chan) - subbuf_trunc(consumed_cur, chan)
861 == 0)
862 goto nodata;
863
864 /*
865 * Failure to get the subbuffer causes a busy-loop retry without going
866 * to a wait queue. These are caused by short-lived race windows where
867 * the writer is getting access to a subbuffer we were trying to get
868 * access to. Also checks that the "consumed" buffer count we are
869 * looking for matches the one contained in the subbuffer id.
870 */
871 ret = update_read_sb_index(config, &buf->backend, &chan->backend,
1d498196
MD
872 consumed_idx, buf_trunc_val(consumed, chan),
873 handle);
852c2936
MD
874 if (ret)
875 goto retry;
876 subbuffer_id_clear_noref(config, &buf->backend.buf_rsb.id);
877
878 buf->get_subbuf_consumed = consumed;
879 buf->get_subbuf = 1;
880
881 return 0;
882
883nodata:
884 /*
885 * The memory barriers __wait_event()/wake_up_interruptible() take care
886 * of "raw_spin_is_locked" memory ordering.
887 */
888 if (finalized)
889 return -ENODATA;
852c2936
MD
890 else
891 return -EAGAIN;
892}
852c2936
MD
893
894/**
895 * lib_ring_buffer_put_subbuf - release exclusive subbuffer access
896 * @buf: ring buffer
897 */
4cfec15c 898void lib_ring_buffer_put_subbuf(struct lttng_ust_lib_ring_buffer *buf,
38fae1d3 899 struct lttng_ust_shm_handle *handle)
852c2936 900{
4cfec15c 901 struct lttng_ust_lib_ring_buffer_backend *bufb = &buf->backend;
1d498196 902 struct channel *chan = shmp(handle, bufb->chan);
4cfec15c 903 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
852c2936
MD
904 unsigned long read_sb_bindex, consumed_idx, consumed;
905
824f40b8
MD
906 CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1
907 && uatomic_read(&buf->active_shadow_readers) != 1);
852c2936
MD
908
909 if (!buf->get_subbuf) {
910 /*
911 * Reader puts a subbuffer it did not get.
912 */
913 CHAN_WARN_ON(chan, 1);
914 return;
915 }
916 consumed = buf->get_subbuf_consumed;
917 buf->get_subbuf = 0;
918
919 /*
920 * Clear the records_unread counter. (overruns counter)
921 * Can still be non-zero if a file reader simply grabbed the data
922 * without using iterators.
923 * Can be below zero if an iterator is used on a snapshot more than
924 * once.
925 */
926 read_sb_bindex = subbuffer_id_get_index(config, bufb->buf_rsb.id);
927 v_add(config, v_read(config,
4746ae29 928 &shmp(handle, shmp_index(handle, bufb->array, read_sb_bindex)->shmp)->records_unread),
852c2936 929 &bufb->records_read);
4746ae29 930 v_set(config, &shmp(handle, shmp_index(handle, bufb->array, read_sb_bindex)->shmp)->records_unread, 0);
852c2936
MD
931 CHAN_WARN_ON(chan, config->mode == RING_BUFFER_OVERWRITE
932 && subbuffer_id_is_noref(config, bufb->buf_rsb.id));
933 subbuffer_id_set_noref(config, &bufb->buf_rsb.id);
934
935 /*
936 * Exchange the reader subbuffer with the one we put in its place in the
937 * writer subbuffer table. Expect the original consumed count. If
938 * update_read_sb_index fails, this is because the writer updated the
939 * subbuffer concurrently. We should therefore keep the subbuffer we
940 * currently have: it has become invalid to try reading this sub-buffer
941 * consumed count value anyway.
942 */
943 consumed_idx = subbuf_index(consumed, chan);
944 update_read_sb_index(config, &buf->backend, &chan->backend,
1d498196
MD
945 consumed_idx, buf_trunc_val(consumed, chan),
946 handle);
852c2936
MD
947 /*
948 * update_read_sb_index return value ignored. Don't exchange sub-buffer
949 * if the writer concurrently updated it.
950 */
951}
852c2936
MD
952
953/*
954 * cons_offset is an iterator on all subbuffer offsets between the reader
955 * position and the writer position. (inclusive)
956 */
957static
4cfec15c 958void lib_ring_buffer_print_subbuffer_errors(struct lttng_ust_lib_ring_buffer *buf,
852c2936
MD
959 struct channel *chan,
960 unsigned long cons_offset,
1d498196 961 int cpu,
38fae1d3 962 struct lttng_ust_shm_handle *handle)
852c2936 963{
4cfec15c 964 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
852c2936
MD
965 unsigned long cons_idx, commit_count, commit_count_sb;
966
967 cons_idx = subbuf_index(cons_offset, chan);
4746ae29
MD
968 commit_count = v_read(config, &shmp_index(handle, buf->commit_hot, cons_idx)->cc);
969 commit_count_sb = v_read(config, &shmp_index(handle, buf->commit_cold, cons_idx)->cc_sb);
852c2936
MD
970
971 if (subbuf_offset(commit_count, chan) != 0)
4d3c9523 972 DBG("ring buffer %s, cpu %d: "
852c2936
MD
973 "commit count in subbuffer %lu,\n"
974 "expecting multiples of %lu bytes\n"
975 " [ %lu bytes committed, %lu bytes reader-visible ]\n",
976 chan->backend.name, cpu, cons_idx,
977 chan->backend.subbuf_size,
978 commit_count, commit_count_sb);
979
4d3c9523 980 DBG("ring buffer: %s, cpu %d: %lu bytes committed\n",
852c2936
MD
981 chan->backend.name, cpu, commit_count);
982}
983
984static
4cfec15c 985void lib_ring_buffer_print_buffer_errors(struct lttng_ust_lib_ring_buffer *buf,
852c2936 986 struct channel *chan,
1d498196 987 void *priv, int cpu,
38fae1d3 988 struct lttng_ust_shm_handle *handle)
852c2936 989{
4cfec15c 990 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
852c2936
MD
991 unsigned long write_offset, cons_offset;
992
993 /*
994 * Can be called in the error path of allocation when
995 * trans_channel_data is not yet set.
996 */
997 if (!chan)
998 return;
999 /*
1000 * No need to order commit_count, write_offset and cons_offset reads
1001 * because we execute at teardown when no more writer nor reader
1002 * references are left.
1003 */
1004 write_offset = v_read(config, &buf->offset);
a6352fd4 1005 cons_offset = uatomic_read(&buf->consumed);
852c2936 1006 if (write_offset != cons_offset)
4d3c9523 1007 DBG("ring buffer %s, cpu %d: "
852c2936
MD
1008 "non-consumed data\n"
1009 " [ %lu bytes written, %lu bytes read ]\n",
1010 chan->backend.name, cpu, write_offset, cons_offset);
1011
a6352fd4 1012 for (cons_offset = uatomic_read(&buf->consumed);
852c2936
MD
1013 (long) (subbuf_trunc((unsigned long) v_read(config, &buf->offset),
1014 chan)
1015 - cons_offset) > 0;
1016 cons_offset = subbuf_align(cons_offset, chan))
1017 lib_ring_buffer_print_subbuffer_errors(buf, chan, cons_offset,
1d498196 1018 cpu, handle);
852c2936
MD
1019}
1020
1021static
1022void lib_ring_buffer_print_errors(struct channel *chan,
4cfec15c 1023 struct lttng_ust_lib_ring_buffer *buf, int cpu,
38fae1d3 1024 struct lttng_ust_shm_handle *handle)
852c2936 1025{
4cfec15c 1026 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
a3f61e7f 1027 void *priv = channel_get_private(chan);
852c2936 1028
4d3c9523 1029 DBG("ring buffer %s, cpu %d: %lu records written, "
852c2936
MD
1030 "%lu records overrun\n",
1031 chan->backend.name, cpu,
1032 v_read(config, &buf->records_count),
1033 v_read(config, &buf->records_overrun));
1034
1035 if (v_read(config, &buf->records_lost_full)
1036 || v_read(config, &buf->records_lost_wrap)
1037 || v_read(config, &buf->records_lost_big))
4d3c9523 1038 DBG("ring buffer %s, cpu %d: records were lost. Caused by:\n"
852c2936
MD
1039 " [ %lu buffer full, %lu nest buffer wrap-around, "
1040 "%lu event too big ]\n",
1041 chan->backend.name, cpu,
1042 v_read(config, &buf->records_lost_full),
1043 v_read(config, &buf->records_lost_wrap),
1044 v_read(config, &buf->records_lost_big));
1045
1d498196 1046 lib_ring_buffer_print_buffer_errors(buf, chan, priv, cpu, handle);
852c2936
MD
1047}
1048
1049/*
1050 * lib_ring_buffer_switch_old_start: Populate old subbuffer header.
1051 *
1052 * Only executed when the buffer is finalized, in SWITCH_FLUSH.
1053 */
1054static
4cfec15c 1055void lib_ring_buffer_switch_old_start(struct lttng_ust_lib_ring_buffer *buf,
852c2936
MD
1056 struct channel *chan,
1057 struct switch_offsets *offsets,
1d498196 1058 u64 tsc,
38fae1d3 1059 struct lttng_ust_shm_handle *handle)
852c2936 1060{
4cfec15c 1061 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
852c2936
MD
1062 unsigned long oldidx = subbuf_index(offsets->old, chan);
1063 unsigned long commit_count;
1064
1d498196 1065 config->cb.buffer_begin(buf, tsc, oldidx, handle);
852c2936
MD
1066
1067 /*
1068 * Order all writes to buffer before the commit count update that will
1069 * determine that the subbuffer is full.
1070 */
a6352fd4 1071 cmm_smp_wmb();
852c2936 1072 v_add(config, config->cb.subbuffer_header_size(),
4746ae29
MD
1073 &shmp_index(handle, buf->commit_hot, oldidx)->cc);
1074 commit_count = v_read(config, &shmp_index(handle, buf->commit_hot, oldidx)->cc);
852c2936
MD
1075 /* Check if the written buffer has to be delivered */
1076 lib_ring_buffer_check_deliver(config, buf, chan, offsets->old,
1d498196 1077 commit_count, oldidx, handle);
852c2936
MD
1078 lib_ring_buffer_write_commit_counter(config, buf, chan, oldidx,
1079 offsets->old, commit_count,
1d498196
MD
1080 config->cb.subbuffer_header_size(),
1081 handle);
852c2936
MD
1082}
1083
1084/*
1085 * lib_ring_buffer_switch_old_end: switch old subbuffer
1086 *
1087 * Note : offset_old should never be 0 here. It is ok, because we never perform
1088 * buffer switch on an empty subbuffer in SWITCH_ACTIVE mode. The caller
1089 * increments the offset_old value when doing a SWITCH_FLUSH on an empty
1090 * subbuffer.
1091 */
1092static
4cfec15c 1093void lib_ring_buffer_switch_old_end(struct lttng_ust_lib_ring_buffer *buf,
852c2936
MD
1094 struct channel *chan,
1095 struct switch_offsets *offsets,
1d498196 1096 u64 tsc,
38fae1d3 1097 struct lttng_ust_shm_handle *handle)
852c2936 1098{
4cfec15c 1099 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
852c2936
MD
1100 unsigned long oldidx = subbuf_index(offsets->old - 1, chan);
1101 unsigned long commit_count, padding_size, data_size;
1102
1103 data_size = subbuf_offset(offsets->old - 1, chan) + 1;
1104 padding_size = chan->backend.subbuf_size - data_size;
1d498196
MD
1105 subbuffer_set_data_size(config, &buf->backend, oldidx, data_size,
1106 handle);
852c2936
MD
1107
1108 /*
1109 * Order all writes to buffer before the commit count update that will
1110 * determine that the subbuffer is full.
1111 */
a6352fd4 1112 cmm_smp_wmb();
4746ae29
MD
1113 v_add(config, padding_size, &shmp_index(handle, buf->commit_hot, oldidx)->cc);
1114 commit_count = v_read(config, &shmp_index(handle, buf->commit_hot, oldidx)->cc);
852c2936 1115 lib_ring_buffer_check_deliver(config, buf, chan, offsets->old - 1,
1d498196 1116 commit_count, oldidx, handle);
852c2936
MD
1117 lib_ring_buffer_write_commit_counter(config, buf, chan, oldidx,
1118 offsets->old, commit_count,
1d498196 1119 padding_size, handle);
852c2936
MD
1120}
1121
1122/*
1123 * lib_ring_buffer_switch_new_start: Populate new subbuffer.
1124 *
1125 * This code can be executed unordered : writers may already have written to the
1126 * sub-buffer before this code gets executed, caution. The commit makes sure
1127 * that this code is executed before the deliver of this sub-buffer.
1128 */
1129static
4cfec15c 1130void lib_ring_buffer_switch_new_start(struct lttng_ust_lib_ring_buffer *buf,
852c2936
MD
1131 struct channel *chan,
1132 struct switch_offsets *offsets,
1d498196 1133 u64 tsc,
38fae1d3 1134 struct lttng_ust_shm_handle *handle)
852c2936 1135{
4cfec15c 1136 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
852c2936
MD
1137 unsigned long beginidx = subbuf_index(offsets->begin, chan);
1138 unsigned long commit_count;
1139
1d498196 1140 config->cb.buffer_begin(buf, tsc, beginidx, handle);
852c2936
MD
1141
1142 /*
1143 * Order all writes to buffer before the commit count update that will
1144 * determine that the subbuffer is full.
1145 */
a6352fd4 1146 cmm_smp_wmb();
852c2936 1147 v_add(config, config->cb.subbuffer_header_size(),
4746ae29
MD
1148 &shmp_index(handle, buf->commit_hot, beginidx)->cc);
1149 commit_count = v_read(config, &shmp_index(handle, buf->commit_hot, beginidx)->cc);
852c2936
MD
1150 /* Check if the written buffer has to be delivered */
1151 lib_ring_buffer_check_deliver(config, buf, chan, offsets->begin,
1d498196 1152 commit_count, beginidx, handle);
852c2936
MD
1153 lib_ring_buffer_write_commit_counter(config, buf, chan, beginidx,
1154 offsets->begin, commit_count,
1d498196
MD
1155 config->cb.subbuffer_header_size(),
1156 handle);
852c2936
MD
1157}
1158
1159/*
1160 * lib_ring_buffer_switch_new_end: finish switching current subbuffer
1161 *
1162 * The only remaining threads could be the ones with pending commits. They will
1163 * have to do the deliver themselves.
1164 */
1165static
4cfec15c 1166void lib_ring_buffer_switch_new_end(struct lttng_ust_lib_ring_buffer *buf,
1d498196
MD
1167 struct channel *chan,
1168 struct switch_offsets *offsets,
1169 u64 tsc,
38fae1d3 1170 struct lttng_ust_shm_handle *handle)
852c2936 1171{
4cfec15c 1172 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
852c2936
MD
1173 unsigned long endidx = subbuf_index(offsets->end - 1, chan);
1174 unsigned long commit_count, padding_size, data_size;
1175
1176 data_size = subbuf_offset(offsets->end - 1, chan) + 1;
1177 padding_size = chan->backend.subbuf_size - data_size;
1d498196
MD
1178 subbuffer_set_data_size(config, &buf->backend, endidx, data_size,
1179 handle);
852c2936
MD
1180
1181 /*
1182 * Order all writes to buffer before the commit count update that will
1183 * determine that the subbuffer is full.
1184 */
a6352fd4 1185 cmm_smp_wmb();
4746ae29
MD
1186 v_add(config, padding_size, &shmp_index(handle, buf->commit_hot, endidx)->cc);
1187 commit_count = v_read(config, &shmp_index(handle, buf->commit_hot, endidx)->cc);
852c2936 1188 lib_ring_buffer_check_deliver(config, buf, chan, offsets->end - 1,
1d498196 1189 commit_count, endidx, handle);
852c2936
MD
1190 lib_ring_buffer_write_commit_counter(config, buf, chan, endidx,
1191 offsets->end, commit_count,
1d498196 1192 padding_size, handle);
852c2936
MD
1193}
1194
1195/*
1196 * Returns :
1197 * 0 if ok
1198 * !0 if execution must be aborted.
1199 */
1200static
1201int lib_ring_buffer_try_switch_slow(enum switch_mode mode,
4cfec15c 1202 struct lttng_ust_lib_ring_buffer *buf,
852c2936
MD
1203 struct channel *chan,
1204 struct switch_offsets *offsets,
1205 u64 *tsc)
1206{
4cfec15c 1207 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
852c2936
MD
1208 unsigned long off;
1209
1210 offsets->begin = v_read(config, &buf->offset);
1211 offsets->old = offsets->begin;
1212 offsets->switch_old_start = 0;
1213 off = subbuf_offset(offsets->begin, chan);
1214
1215 *tsc = config->cb.ring_buffer_clock_read(chan);
1216
1217 /*
1218 * Ensure we flush the header of an empty subbuffer when doing the
1219 * finalize (SWITCH_FLUSH). This ensures that we end up knowing the
1220 * total data gathering duration even if there were no records saved
1221 * after the last buffer switch.
1222 * In SWITCH_ACTIVE mode, switch the buffer when it contains events.
1223 * SWITCH_ACTIVE only flushes the current subbuffer, dealing with end of
1224 * subbuffer header as appropriate.
1225 * The next record that reserves space will be responsible for
1226 * populating the following subbuffer header. We choose not to populate
1227 * the next subbuffer header here because we want to be able to use
a6352fd4
MD
1228 * SWITCH_ACTIVE for periodical buffer flush, which must
1229 * guarantee that all the buffer content (records and header
1230 * timestamps) are visible to the reader. This is required for
1231 * quiescence guarantees for the fusion merge.
852c2936
MD
1232 */
1233 if (mode == SWITCH_FLUSH || off > 0) {
b5a3dfa5 1234 if (caa_unlikely(off == 0)) {
852c2936
MD
1235 /*
1236 * The client does not save any header information.
1237 * Don't switch empty subbuffer on finalize, because it
1238 * is invalid to deliver a completely empty subbuffer.
1239 */
1240 if (!config->cb.subbuffer_header_size())
1241 return -1;
1242 /*
1243 * Need to write the subbuffer start header on finalize.
1244 */
1245 offsets->switch_old_start = 1;
1246 }
1247 offsets->begin = subbuf_align(offsets->begin, chan);
1248 } else
1249 return -1; /* we do not have to switch : buffer is empty */
1250 /* Note: old points to the next subbuf at offset 0 */
1251 offsets->end = offsets->begin;
1252 return 0;
1253}
1254
1255/*
1256 * Force a sub-buffer switch. This operation is completely reentrant : can be
1257 * called while tracing is active with absolutely no lock held.
1258 *
1259 * Note, however, that as a v_cmpxchg is used for some atomic
1260 * operations, this function must be called from the CPU which owns the buffer
1261 * for a ACTIVE flush.
1262 */
4cfec15c 1263void lib_ring_buffer_switch_slow(struct lttng_ust_lib_ring_buffer *buf, enum switch_mode mode,
38fae1d3 1264 struct lttng_ust_shm_handle *handle)
852c2936 1265{
1d498196 1266 struct channel *chan = shmp(handle, buf->backend.chan);
4cfec15c 1267 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
852c2936
MD
1268 struct switch_offsets offsets;
1269 unsigned long oldidx;
1270 u64 tsc;
1271
1272 offsets.size = 0;
1273
1274 /*
1275 * Perform retryable operations.
1276 */
1277 do {
1278 if (lib_ring_buffer_try_switch_slow(mode, buf, chan, &offsets,
1279 &tsc))
1280 return; /* Switch not needed */
1281 } while (v_cmpxchg(config, &buf->offset, offsets.old, offsets.end)
1282 != offsets.old);
1283
1284 /*
1285 * Atomically update last_tsc. This update races against concurrent
1286 * atomic updates, but the race will always cause supplementary full TSC
1287 * records, never the opposite (missing a full TSC record when it would
1288 * be needed).
1289 */
1290 save_last_tsc(config, buf, tsc);
1291
1292 /*
1293 * Push the reader if necessary
1294 */
1295 lib_ring_buffer_reserve_push_reader(buf, chan, offsets.old);
1296
1297 oldidx = subbuf_index(offsets.old, chan);
1d498196 1298 lib_ring_buffer_clear_noref(config, &buf->backend, oldidx, handle);
852c2936
MD
1299
1300 /*
1301 * May need to populate header start on SWITCH_FLUSH.
1302 */
1303 if (offsets.switch_old_start) {
1d498196 1304 lib_ring_buffer_switch_old_start(buf, chan, &offsets, tsc, handle);
852c2936
MD
1305 offsets.old += config->cb.subbuffer_header_size();
1306 }
1307
1308 /*
1309 * Switch old subbuffer.
1310 */
1d498196 1311 lib_ring_buffer_switch_old_end(buf, chan, &offsets, tsc, handle);
852c2936 1312}
852c2936
MD
1313
1314/*
1315 * Returns :
1316 * 0 if ok
1317 * -ENOSPC if event size is too large for packet.
1318 * -ENOBUFS if there is currently not enough space in buffer for the event.
1319 * -EIO if data cannot be written into the buffer for any other reason.
1320 */
1321static
4cfec15c 1322int lib_ring_buffer_try_reserve_slow(struct lttng_ust_lib_ring_buffer *buf,
852c2936
MD
1323 struct channel *chan,
1324 struct switch_offsets *offsets,
4cfec15c 1325 struct lttng_ust_lib_ring_buffer_ctx *ctx)
852c2936 1326{
4cfec15c 1327 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
38fae1d3 1328 struct lttng_ust_shm_handle *handle = ctx->handle;
852c2936
MD
1329 unsigned long reserve_commit_diff;
1330
1331 offsets->begin = v_read(config, &buf->offset);
1332 offsets->old = offsets->begin;
1333 offsets->switch_new_start = 0;
1334 offsets->switch_new_end = 0;
1335 offsets->switch_old_end = 0;
1336 offsets->pre_header_padding = 0;
1337
1338 ctx->tsc = config->cb.ring_buffer_clock_read(chan);
1339 if ((int64_t) ctx->tsc == -EIO)
1340 return -EIO;
1341
1342 if (last_tsc_overflow(config, buf, ctx->tsc))
1343 ctx->rflags |= RING_BUFFER_RFLAG_FULL_TSC;
1344
b5a3dfa5 1345 if (caa_unlikely(subbuf_offset(offsets->begin, ctx->chan) == 0)) {
852c2936
MD
1346 offsets->switch_new_start = 1; /* For offsets->begin */
1347 } else {
1348 offsets->size = config->cb.record_header_size(config, chan,
1349 offsets->begin,
1350 &offsets->pre_header_padding,
1351 ctx);
1352 offsets->size +=
1353 lib_ring_buffer_align(offsets->begin + offsets->size,
1354 ctx->largest_align)
1355 + ctx->data_size;
b5a3dfa5 1356 if (caa_unlikely(subbuf_offset(offsets->begin, chan) +
852c2936
MD
1357 offsets->size > chan->backend.subbuf_size)) {
1358 offsets->switch_old_end = 1; /* For offsets->old */
1359 offsets->switch_new_start = 1; /* For offsets->begin */
1360 }
1361 }
b5a3dfa5 1362 if (caa_unlikely(offsets->switch_new_start)) {
852c2936
MD
1363 unsigned long sb_index;
1364
1365 /*
1366 * We are typically not filling the previous buffer completely.
1367 */
b5a3dfa5 1368 if (caa_likely(offsets->switch_old_end))
852c2936
MD
1369 offsets->begin = subbuf_align(offsets->begin, chan);
1370 offsets->begin = offsets->begin
1371 + config->cb.subbuffer_header_size();
1372 /* Test new buffer integrity */
1373 sb_index = subbuf_index(offsets->begin, chan);
1374 reserve_commit_diff =
1375 (buf_trunc(offsets->begin, chan)
1376 >> chan->backend.num_subbuf_order)
1377 - ((unsigned long) v_read(config,
4746ae29 1378 &shmp_index(handle, buf->commit_cold, sb_index)->cc_sb)
852c2936 1379 & chan->commit_count_mask);
b5a3dfa5 1380 if (caa_likely(reserve_commit_diff == 0)) {
852c2936 1381 /* Next subbuffer not being written to. */
b5a3dfa5 1382 if (caa_unlikely(config->mode != RING_BUFFER_OVERWRITE &&
852c2936
MD
1383 subbuf_trunc(offsets->begin, chan)
1384 - subbuf_trunc((unsigned long)
a6352fd4 1385 uatomic_read(&buf->consumed), chan)
852c2936
MD
1386 >= chan->backend.buf_size)) {
1387 /*
1388 * We do not overwrite non consumed buffers
1389 * and we are full : record is lost.
1390 */
1391 v_inc(config, &buf->records_lost_full);
1392 return -ENOBUFS;
1393 } else {
1394 /*
1395 * Next subbuffer not being written to, and we
1396 * are either in overwrite mode or the buffer is
1397 * not full. It's safe to write in this new
1398 * subbuffer.
1399 */
1400 }
1401 } else {
1402 /*
1403 * Next subbuffer reserve offset does not match the
1404 * commit offset. Drop record in producer-consumer and
1405 * overwrite mode. Caused by either a writer OOPS or too
1406 * many nested writes over a reserve/commit pair.
1407 */
1408 v_inc(config, &buf->records_lost_wrap);
1409 return -EIO;
1410 }
1411 offsets->size =
1412 config->cb.record_header_size(config, chan,
1413 offsets->begin,
1414 &offsets->pre_header_padding,
1415 ctx);
1416 offsets->size +=
1417 lib_ring_buffer_align(offsets->begin + offsets->size,
1418 ctx->largest_align)
1419 + ctx->data_size;
b5a3dfa5 1420 if (caa_unlikely(subbuf_offset(offsets->begin, chan)
852c2936
MD
1421 + offsets->size > chan->backend.subbuf_size)) {
1422 /*
1423 * Record too big for subbuffers, report error, don't
1424 * complete the sub-buffer switch.
1425 */
1426 v_inc(config, &buf->records_lost_big);
1427 return -ENOSPC;
1428 } else {
1429 /*
1430 * We just made a successful buffer switch and the
1431 * record fits in the new subbuffer. Let's write.
1432 */
1433 }
1434 } else {
1435 /*
1436 * Record fits in the current buffer and we are not on a switch
1437 * boundary. It's safe to write.
1438 */
1439 }
1440 offsets->end = offsets->begin + offsets->size;
1441
b5a3dfa5 1442 if (caa_unlikely(subbuf_offset(offsets->end, chan) == 0)) {
852c2936
MD
1443 /*
1444 * The offset_end will fall at the very beginning of the next
1445 * subbuffer.
1446 */
1447 offsets->switch_new_end = 1; /* For offsets->begin */
1448 }
1449 return 0;
1450}
1451
1452/**
1453 * lib_ring_buffer_reserve_slow - Atomic slot reservation in a buffer.
1454 * @ctx: ring buffer context.
1455 *
1456 * Return : -NOBUFS if not enough space, -ENOSPC if event size too large,
1457 * -EIO for other errors, else returns 0.
1458 * It will take care of sub-buffer switching.
1459 */
4cfec15c 1460int lib_ring_buffer_reserve_slow(struct lttng_ust_lib_ring_buffer_ctx *ctx)
852c2936
MD
1461{
1462 struct channel *chan = ctx->chan;
38fae1d3 1463 struct lttng_ust_shm_handle *handle = ctx->handle;
4cfec15c
MD
1464 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
1465 struct lttng_ust_lib_ring_buffer *buf;
852c2936
MD
1466 struct switch_offsets offsets;
1467 int ret;
1468
1469 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
1d498196 1470 buf = shmp(handle, chan->backend.buf[ctx->cpu].shmp);
852c2936 1471 else
1d498196 1472 buf = shmp(handle, chan->backend.buf[0].shmp);
852c2936
MD
1473 ctx->buf = buf;
1474
1475 offsets.size = 0;
1476
1477 do {
1478 ret = lib_ring_buffer_try_reserve_slow(buf, chan, &offsets,
1479 ctx);
b5a3dfa5 1480 if (caa_unlikely(ret))
852c2936 1481 return ret;
b5a3dfa5 1482 } while (caa_unlikely(v_cmpxchg(config, &buf->offset, offsets.old,
852c2936
MD
1483 offsets.end)
1484 != offsets.old));
1485
1486 /*
1487 * Atomically update last_tsc. This update races against concurrent
1488 * atomic updates, but the race will always cause supplementary full TSC
1489 * records, never the opposite (missing a full TSC record when it would
1490 * be needed).
1491 */
1492 save_last_tsc(config, buf, ctx->tsc);
1493
1494 /*
1495 * Push the reader if necessary
1496 */
1497 lib_ring_buffer_reserve_push_reader(buf, chan, offsets.end - 1);
1498
1499 /*
1500 * Clear noref flag for this subbuffer.
1501 */
1502 lib_ring_buffer_clear_noref(config, &buf->backend,
1d498196
MD
1503 subbuf_index(offsets.end - 1, chan),
1504 handle);
852c2936
MD
1505
1506 /*
1507 * Switch old subbuffer if needed.
1508 */
b5a3dfa5 1509 if (caa_unlikely(offsets.switch_old_end)) {
852c2936 1510 lib_ring_buffer_clear_noref(config, &buf->backend,
1d498196
MD
1511 subbuf_index(offsets.old - 1, chan),
1512 handle);
1513 lib_ring_buffer_switch_old_end(buf, chan, &offsets, ctx->tsc, handle);
852c2936
MD
1514 }
1515
1516 /*
1517 * Populate new subbuffer.
1518 */
b5a3dfa5 1519 if (caa_unlikely(offsets.switch_new_start))
1d498196 1520 lib_ring_buffer_switch_new_start(buf, chan, &offsets, ctx->tsc, handle);
852c2936 1521
b5a3dfa5 1522 if (caa_unlikely(offsets.switch_new_end))
1d498196 1523 lib_ring_buffer_switch_new_end(buf, chan, &offsets, ctx->tsc, handle);
852c2936
MD
1524
1525 ctx->slot_size = offsets.size;
1526 ctx->pre_offset = offsets.begin;
1527 ctx->buf_offset = offsets.begin + offsets.pre_header_padding;
1528 return 0;
1529}
This page took 0.097209 seconds and 4 git commands to generate.