Fix: add missing debug printout to identify the cause of lost events
[lttng-ust.git] / libringbuffer / ring_buffer_frontend.c
CommitLineData
852c2936
MD
1/*
2 * ring_buffer_frontend.c
3 *
4 * (C) Copyright 2005-2010 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
5 *
6 * Ring buffer wait-free buffer synchronization. Producer-consumer and flight
7 * recorder (overwrite) modes. See thesis:
8 *
9 * Desnoyers, Mathieu (2009), "Low-Impact Operating System Tracing", Ph.D.
10 * dissertation, Ecole Polytechnique de Montreal.
11 * http://www.lttng.org/pub/thesis/desnoyers-dissertation-2009-12.pdf
12 *
13 * - Algorithm presentation in Chapter 5:
14 * "Lockless Multi-Core High-Throughput Buffering".
15 * - Algorithm formal verification in Section 8.6:
16 * "Formal verification of LTTng"
17 *
18 * Author:
19 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
20 *
21 * Inspired from LTT and RelayFS:
22 * Karim Yaghmour <karim@opersys.com>
23 * Tom Zanussi <zanussi@us.ibm.com>
24 * Bob Wisniewski <bob@watson.ibm.com>
25 * And from K42 :
26 * Bob Wisniewski <bob@watson.ibm.com>
27 *
28 * Buffer reader semantic :
29 *
30 * - get_subbuf_size
31 * while buffer is not finalized and empty
32 * - get_subbuf
33 * - if return value != 0, continue
34 * - splice one subbuffer worth of data to a pipe
35 * - splice the data from pipe to disk/network
36 * - put_subbuf
37 *
38 * Dual LGPL v2.1/GPL v2 license.
39 */
40
5ad63a16 41#define _GNU_SOURCE
a6352fd4 42#include <sys/types.h>
431d5cf0
MD
43#include <sys/mman.h>
44#include <sys/stat.h>
45#include <fcntl.h>
14641deb 46#include <urcu/compiler.h>
a6352fd4 47#include <urcu/ref.h>
35897f8b 48#include <helper.h>
14641deb 49
a6352fd4 50#include "smp.h"
4318ae1b 51#include <lttng/ringbuffer-config.h>
2fed87ae 52#include "vatomic.h"
4931a13e
MD
53#include "backend.h"
54#include "frontend.h"
a6352fd4 55#include "shm.h"
852c2936 56
431d5cf0
MD
57#ifndef max
58#define max(a, b) ((a) > (b) ? (a) : (b))
59#endif
60
2432c3c9
MD
61/*
62 * Use POSIX SHM: shm_open(3) and shm_unlink(3).
63 * close(2) to close the fd returned by shm_open.
64 * shm_unlink releases the shared memory object name.
65 * ftruncate(2) sets the size of the memory object.
66 * mmap/munmap maps the shared memory obj to a virtual address in the
67 * calling proceess (should be done both in libust and consumer).
68 * See shm_overview(7) for details.
69 * Pass file descriptor returned by shm_open(3) to ltt-sessiond through
70 * a UNIX socket.
71 *
72 * Since we don't need to access the object using its name, we can
73 * immediately shm_unlink(3) it, and only keep the handle with its file
74 * descriptor.
75 */
76
852c2936
MD
77/*
78 * Internal structure representing offsets to use at a sub-buffer switch.
79 */
80struct switch_offsets {
81 unsigned long begin, end, old;
82 size_t pre_header_padding, size;
83 unsigned int switch_new_start:1, switch_new_end:1, switch_old_start:1,
84 switch_old_end:1;
85};
86
a6352fd4 87__thread unsigned int lib_ring_buffer_nesting;
852c2936 88
45e9e699
MD
89/*
90 * TODO: this is unused. Errors are saved within the ring buffer.
91 * Eventually, allow consumerd to print these errors.
92 */
852c2936
MD
93static
94void lib_ring_buffer_print_errors(struct channel *chan,
4cfec15c 95 struct lttng_ust_lib_ring_buffer *buf, int cpu,
b68d3dc0
MD
96 struct lttng_ust_shm_handle *handle)
97 __attribute__((unused));
852c2936 98
852c2936
MD
99/**
100 * lib_ring_buffer_reset - Reset ring buffer to initial values.
101 * @buf: Ring buffer.
102 *
103 * Effectively empty the ring buffer. Should be called when the buffer is not
104 * used for writing. The ring buffer can be opened for reading, but the reader
105 * should not be using the iterator concurrently with reset. The previous
106 * current iterator record is reset.
107 */
4cfec15c 108void lib_ring_buffer_reset(struct lttng_ust_lib_ring_buffer *buf,
38fae1d3 109 struct lttng_ust_shm_handle *handle)
852c2936 110{
1d498196 111 struct channel *chan = shmp(handle, buf->backend.chan);
4cfec15c 112 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
852c2936
MD
113 unsigned int i;
114
115 /*
116 * Reset iterator first. It will put the subbuffer if it currently holds
117 * it.
118 */
852c2936
MD
119 v_set(config, &buf->offset, 0);
120 for (i = 0; i < chan->backend.num_subbuf; i++) {
4746ae29
MD
121 v_set(config, &shmp_index(handle, buf->commit_hot, i)->cc, 0);
122 v_set(config, &shmp_index(handle, buf->commit_hot, i)->seq, 0);
123 v_set(config, &shmp_index(handle, buf->commit_cold, i)->cc_sb, 0);
852c2936 124 }
a6352fd4
MD
125 uatomic_set(&buf->consumed, 0);
126 uatomic_set(&buf->record_disabled, 0);
852c2936 127 v_set(config, &buf->last_tsc, 0);
1d498196 128 lib_ring_buffer_backend_reset(&buf->backend, handle);
852c2936
MD
129 /* Don't reset number of active readers */
130 v_set(config, &buf->records_lost_full, 0);
131 v_set(config, &buf->records_lost_wrap, 0);
132 v_set(config, &buf->records_lost_big, 0);
133 v_set(config, &buf->records_count, 0);
134 v_set(config, &buf->records_overrun, 0);
135 buf->finalized = 0;
136}
852c2936
MD
137
138/**
139 * channel_reset - Reset channel to initial values.
140 * @chan: Channel.
141 *
142 * Effectively empty the channel. Should be called when the channel is not used
143 * for writing. The channel can be opened for reading, but the reader should not
144 * be using the iterator concurrently with reset. The previous current iterator
145 * record is reset.
146 */
147void channel_reset(struct channel *chan)
148{
149 /*
150 * Reset iterators first. Will put the subbuffer if held for reading.
151 */
a6352fd4 152 uatomic_set(&chan->record_disabled, 0);
852c2936
MD
153 /* Don't reset commit_count_mask, still valid */
154 channel_backend_reset(&chan->backend);
155 /* Don't reset switch/read timer interval */
156 /* Don't reset notifiers and notifier enable bits */
157 /* Don't reset reader reference count */
158}
852c2936
MD
159
160/*
161 * Must be called under cpu hotplug protection.
162 */
4cfec15c 163int lib_ring_buffer_create(struct lttng_ust_lib_ring_buffer *buf,
a6352fd4 164 struct channel_backend *chanb, int cpu,
38fae1d3 165 struct lttng_ust_shm_handle *handle,
1d498196 166 struct shm_object *shmobj)
852c2936 167{
4cfec15c 168 const struct lttng_ust_lib_ring_buffer_config *config = &chanb->config;
14641deb 169 struct channel *chan = caa_container_of(chanb, struct channel, backend);
a3f61e7f 170 void *priv = channel_get_private(chan);
852c2936 171 size_t subbuf_header_size;
2fed87ae 172 uint64_t tsc;
852c2936
MD
173 int ret;
174
175 /* Test for cpu hotplug */
176 if (buf->backend.allocated)
177 return 0;
178
a6352fd4 179 ret = lib_ring_buffer_backend_create(&buf->backend, &chan->backend,
1d498196 180 cpu, handle, shmobj);
852c2936
MD
181 if (ret)
182 return ret;
183
1d498196
MD
184 align_shm(shmobj, __alignof__(struct commit_counters_hot));
185 set_shmp(buf->commit_hot,
186 zalloc_shm(shmobj,
187 sizeof(struct commit_counters_hot) * chan->backend.num_subbuf));
188 if (!shmp(handle, buf->commit_hot)) {
852c2936
MD
189 ret = -ENOMEM;
190 goto free_chanbuf;
191 }
192
1d498196
MD
193 align_shm(shmobj, __alignof__(struct commit_counters_cold));
194 set_shmp(buf->commit_cold,
195 zalloc_shm(shmobj,
196 sizeof(struct commit_counters_cold) * chan->backend.num_subbuf));
197 if (!shmp(handle, buf->commit_cold)) {
852c2936
MD
198 ret = -ENOMEM;
199 goto free_commit;
200 }
201
852c2936
MD
202 /*
203 * Write the subbuffer header for first subbuffer so we know the total
204 * duration of data gathering.
205 */
206 subbuf_header_size = config->cb.subbuffer_header_size();
207 v_set(config, &buf->offset, subbuf_header_size);
4746ae29 208 subbuffer_id_clear_noref(config, &shmp_index(handle, buf->backend.buf_wsb, 0)->id);
1d498196
MD
209 tsc = config->cb.ring_buffer_clock_read(shmp(handle, buf->backend.chan));
210 config->cb.buffer_begin(buf, tsc, 0, handle);
4746ae29 211 v_add(config, subbuf_header_size, &shmp_index(handle, buf->commit_hot, 0)->cc);
852c2936
MD
212
213 if (config->cb.buffer_create) {
1d498196 214 ret = config->cb.buffer_create(buf, priv, cpu, chanb->name, handle);
852c2936
MD
215 if (ret)
216 goto free_init;
217 }
852c2936 218 buf->backend.allocated = 1;
852c2936
MD
219 return 0;
220
221 /* Error handling */
222free_init:
a6352fd4 223 /* commit_cold will be freed by shm teardown */
852c2936 224free_commit:
a6352fd4 225 /* commit_hot will be freed by shm teardown */
852c2936 226free_chanbuf:
852c2936
MD
227 return ret;
228}
229
1d498196 230#if 0
852c2936
MD
231static void switch_buffer_timer(unsigned long data)
232{
4cfec15c 233 struct lttng_ust_lib_ring_buffer *buf = (struct lttng_ust_lib_ring_buffer *)data;
1d498196 234 struct channel *chan = shmp(handle, buf->backend.chan);
4cfec15c 235 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
852c2936
MD
236
237 /*
238 * Only flush buffers periodically if readers are active.
239 */
824f40b8 240 if (uatomic_read(&buf->active_readers) || uatomic_read(&buf->active_shadow_readers))
1d498196 241 lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE, handle);
852c2936 242
a6352fd4
MD
243 //TODO timers
244 //if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
245 // mod_timer_pinned(&buf->switch_timer,
246 // jiffies + chan->switch_timer_interval);
247 //else
248 // mod_timer(&buf->switch_timer,
249 // jiffies + chan->switch_timer_interval);
852c2936 250}
1d498196 251#endif //0
852c2936 252
4cfec15c 253static void lib_ring_buffer_start_switch_timer(struct lttng_ust_lib_ring_buffer *buf,
38fae1d3 254 struct lttng_ust_shm_handle *handle)
852c2936 255{
1d498196 256 struct channel *chan = shmp(handle, buf->backend.chan);
4cfec15c 257 //const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
852c2936
MD
258
259 if (!chan->switch_timer_interval || buf->switch_timer_enabled)
260 return;
a6352fd4
MD
261 //TODO
262 //init_timer(&buf->switch_timer);
263 //buf->switch_timer.function = switch_buffer_timer;
264 //buf->switch_timer.expires = jiffies + chan->switch_timer_interval;
265 //buf->switch_timer.data = (unsigned long)buf;
266 //if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
267 // add_timer_on(&buf->switch_timer, buf->backend.cpu);
268 //else
269 // add_timer(&buf->switch_timer);
852c2936
MD
270 buf->switch_timer_enabled = 1;
271}
272
4cfec15c 273static void lib_ring_buffer_stop_switch_timer(struct lttng_ust_lib_ring_buffer *buf,
38fae1d3 274 struct lttng_ust_shm_handle *handle)
852c2936 275{
1d498196 276 struct channel *chan = shmp(handle, buf->backend.chan);
852c2936
MD
277
278 if (!chan->switch_timer_interval || !buf->switch_timer_enabled)
279 return;
280
a6352fd4
MD
281 //TODO
282 //del_timer_sync(&buf->switch_timer);
852c2936
MD
283 buf->switch_timer_enabled = 0;
284}
285
1d498196 286#if 0
852c2936
MD
287/*
288 * Polling timer to check the channels for data.
289 */
290static void read_buffer_timer(unsigned long data)
291{
4cfec15c 292 struct lttng_ust_lib_ring_buffer *buf = (struct lttng_ust_lib_ring_buffer *)data;
1d498196 293 struct channel *chan = shmp(handle, buf->backend.chan);
4cfec15c 294 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
852c2936
MD
295
296 CHAN_WARN_ON(chan, !buf->backend.allocated);
297
824f40b8 298 if (uatomic_read(&buf->active_readers) || uatomic_read(&buf->active_shadow_readers))
852c2936 299 && lib_ring_buffer_poll_deliver(config, buf, chan)) {
a6352fd4
MD
300 //TODO
301 //wake_up_interruptible(&buf->read_wait);
302 //wake_up_interruptible(&chan->read_wait);
852c2936
MD
303 }
304
a6352fd4
MD
305 //TODO
306 //if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
307 // mod_timer_pinned(&buf->read_timer,
308 // jiffies + chan->read_timer_interval);
309 //else
310 // mod_timer(&buf->read_timer,
311 // jiffies + chan->read_timer_interval);
852c2936 312}
1d498196 313#endif //0
852c2936 314
4cfec15c 315static void lib_ring_buffer_start_read_timer(struct lttng_ust_lib_ring_buffer *buf,
38fae1d3 316 struct lttng_ust_shm_handle *handle)
852c2936 317{
1d498196 318 struct channel *chan = shmp(handle, buf->backend.chan);
4cfec15c 319 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
852c2936
MD
320
321 if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER
322 || !chan->read_timer_interval
323 || buf->read_timer_enabled)
324 return;
325
a6352fd4
MD
326 //TODO
327 //init_timer(&buf->read_timer);
328 //buf->read_timer.function = read_buffer_timer;
329 //buf->read_timer.expires = jiffies + chan->read_timer_interval;
330 //buf->read_timer.data = (unsigned long)buf;
852c2936 331
a6352fd4
MD
332 //if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
333 // add_timer_on(&buf->read_timer, buf->backend.cpu);
334 //else
335 // add_timer(&buf->read_timer);
852c2936
MD
336 buf->read_timer_enabled = 1;
337}
338
4cfec15c 339static void lib_ring_buffer_stop_read_timer(struct lttng_ust_lib_ring_buffer *buf,
38fae1d3 340 struct lttng_ust_shm_handle *handle)
852c2936 341{
1d498196 342 struct channel *chan = shmp(handle, buf->backend.chan);
4cfec15c 343 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
852c2936
MD
344
345 if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER
346 || !chan->read_timer_interval
347 || !buf->read_timer_enabled)
348 return;
349
a6352fd4
MD
350 //TODO
351 //del_timer_sync(&buf->read_timer);
852c2936
MD
352 /*
353 * do one more check to catch data that has been written in the last
354 * timer period.
355 */
1d498196 356 if (lib_ring_buffer_poll_deliver(config, buf, chan, handle)) {
a6352fd4
MD
357 //TODO
358 //wake_up_interruptible(&buf->read_wait);
359 //wake_up_interruptible(&chan->read_wait);
852c2936
MD
360 }
361 buf->read_timer_enabled = 0;
362}
363
1d498196 364static void channel_unregister_notifiers(struct channel *chan,
38fae1d3 365 struct lttng_ust_shm_handle *handle)
852c2936 366{
4cfec15c 367 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
852c2936
MD
368 int cpu;
369
852c2936 370 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
852c2936 371 for_each_possible_cpu(cpu) {
4cfec15c 372 struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[cpu].shmp);
a6352fd4 373
1d498196
MD
374 lib_ring_buffer_stop_switch_timer(buf, handle);
375 lib_ring_buffer_stop_read_timer(buf, handle);
852c2936 376 }
852c2936 377 } else {
4cfec15c 378 struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[0].shmp);
852c2936 379
1d498196
MD
380 lib_ring_buffer_stop_switch_timer(buf, handle);
381 lib_ring_buffer_stop_read_timer(buf, handle);
852c2936 382 }
8d8a24c8 383 //channel_backend_unregister_notifiers(&chan->backend);
852c2936
MD
384}
385
38fae1d3 386static void channel_free(struct channel *chan, struct lttng_ust_shm_handle *handle,
824f40b8 387 int shadow)
852c2936 388{
824f40b8
MD
389 if (!shadow)
390 channel_backend_free(&chan->backend, handle);
431d5cf0 391 /* chan is freed by shm teardown */
1d498196
MD
392 shm_object_table_destroy(handle->table);
393 free(handle);
852c2936
MD
394}
395
396/**
397 * channel_create - Create channel.
398 * @config: ring buffer instance configuration
399 * @name: name of the channel
a3f61e7f
MD
400 * @priv_data: ring buffer client private data area pointer (output)
401 * @priv_data_size: length, in bytes, of the private data area.
d028eddb 402 * @priv_data_init: initialization data for private data.
852c2936
MD
403 * @buf_addr: pointer the the beginning of the preallocated buffer contiguous
404 * address mapping. It is used only by RING_BUFFER_STATIC
405 * configuration. It can be set to NULL for other backends.
406 * @subbuf_size: subbuffer size
407 * @num_subbuf: number of subbuffers
408 * @switch_timer_interval: Time interval (in us) to fill sub-buffers with
409 * padding to let readers get those sub-buffers.
410 * Used for live streaming.
411 * @read_timer_interval: Time interval (in us) to wake up pending readers.
412 *
413 * Holds cpu hotplug.
414 * Returns NULL on failure.
415 */
4cfec15c 416struct lttng_ust_shm_handle *channel_create(const struct lttng_ust_lib_ring_buffer_config *config,
a3f61e7f
MD
417 const char *name,
418 void **priv_data,
419 size_t priv_data_align,
420 size_t priv_data_size,
d028eddb 421 void *priv_data_init,
a3f61e7f 422 void *buf_addr, size_t subbuf_size,
852c2936 423 size_t num_subbuf, unsigned int switch_timer_interval,
193183fb 424 unsigned int read_timer_interval,
ef9ff354 425 int **shm_fd, int **wait_fd, uint64_t **memory_map_size)
852c2936 426{
1d498196 427 int ret, cpu;
a3f61e7f 428 size_t shmsize, chansize;
852c2936 429 struct channel *chan;
38fae1d3 430 struct lttng_ust_shm_handle *handle;
1d498196 431 struct shm_object *shmobj;
193183fb 432 struct shm_ref *ref;
852c2936
MD
433
434 if (lib_ring_buffer_check_config(config, switch_timer_interval,
435 read_timer_interval))
436 return NULL;
437
38fae1d3 438 handle = zmalloc(sizeof(struct lttng_ust_shm_handle));
431d5cf0
MD
439 if (!handle)
440 return NULL;
441
1d498196
MD
442 /* Allocate table for channel + per-cpu buffers */
443 handle->table = shm_object_table_create(1 + num_possible_cpus());
444 if (!handle->table)
445 goto error_table_alloc;
852c2936 446
1d498196
MD
447 /* Calculate the shm allocation layout */
448 shmsize = sizeof(struct channel);
c1fca457 449 shmsize += offset_align(shmsize, __alignof__(struct lttng_ust_lib_ring_buffer_shmp));
1d498196 450 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
4cfec15c 451 shmsize += sizeof(struct lttng_ust_lib_ring_buffer_shmp) * num_possible_cpus();
1d498196 452 else
4cfec15c 453 shmsize += sizeof(struct lttng_ust_lib_ring_buffer_shmp);
a3f61e7f
MD
454 chansize = shmsize;
455 shmsize += offset_align(shmsize, priv_data_align);
456 shmsize += priv_data_size;
a6352fd4 457
1d498196 458 shmobj = shm_object_table_append(handle->table, shmsize);
b5a14697
MD
459 if (!shmobj)
460 goto error_append;
57773204 461 /* struct channel is at object 0, offset 0 (hardcoded) */
a3f61e7f 462 set_shmp(handle->chan, zalloc_shm(shmobj, chansize));
57773204
MD
463 assert(handle->chan._ref.index == 0);
464 assert(handle->chan._ref.offset == 0);
1d498196 465 chan = shmp(handle, handle->chan);
a6352fd4 466 if (!chan)
1d498196 467 goto error_append;
a6352fd4 468
a3f61e7f
MD
469 /* space for private data */
470 if (priv_data_size) {
471 DECLARE_SHMP(void, priv_data_alloc);
472
473 align_shm(shmobj, priv_data_align);
474 chan->priv_data_offset = shmobj->allocated_len;
475 set_shmp(priv_data_alloc, zalloc_shm(shmobj, priv_data_size));
476 if (!shmp(handle, priv_data_alloc))
477 goto error_append;
478 *priv_data = channel_get_private(chan);
d028eddb 479 memcpy(*priv_data, priv_data_init, priv_data_size);
a3f61e7f
MD
480 } else {
481 chan->priv_data_offset = -1;
482 *priv_data = NULL;
483 }
484
485 ret = channel_backend_init(&chan->backend, name, config,
1d498196 486 subbuf_size, num_subbuf, handle);
852c2936 487 if (ret)
1d498196 488 goto error_backend_init;
852c2936
MD
489
490 chan->commit_count_mask = (~0UL >> chan->backend.num_subbuf_order);
a6352fd4
MD
491 //TODO
492 //chan->switch_timer_interval = usecs_to_jiffies(switch_timer_interval);
493 //chan->read_timer_interval = usecs_to_jiffies(read_timer_interval);
a6352fd4
MD
494 //TODO
495 //init_waitqueue_head(&chan->read_wait);
496 //init_waitqueue_head(&chan->hp_wait);
852c2936
MD
497
498 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
852c2936
MD
499 /*
500 * In case of non-hotplug cpu, if the ring-buffer is allocated
501 * in early initcall, it will not be notified of secondary cpus.
502 * In that off case, we need to allocate for all possible cpus.
503 */
852c2936 504 for_each_possible_cpu(cpu) {
4cfec15c 505 struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[cpu].shmp);
1d498196
MD
506 lib_ring_buffer_start_switch_timer(buf, handle);
507 lib_ring_buffer_start_read_timer(buf, handle);
852c2936 508 }
852c2936 509 } else {
4cfec15c 510 struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[0].shmp);
852c2936 511
1d498196
MD
512 lib_ring_buffer_start_switch_timer(buf, handle);
513 lib_ring_buffer_start_read_timer(buf, handle);
852c2936 514 }
193183fb
MD
515 ref = &handle->chan._ref;
516 shm_get_object_data(handle, ref, shm_fd, wait_fd, memory_map_size);
431d5cf0 517 return handle;
852c2936 518
1d498196
MD
519error_backend_init:
520error_append:
521 shm_object_table_destroy(handle->table);
522error_table_alloc:
431d5cf0 523 free(handle);
852c2936
MD
524 return NULL;
525}
852c2936 526
38fae1d3 527struct lttng_ust_shm_handle *channel_handle_create(int shm_fd, int wait_fd,
193183fb
MD
528 uint64_t memory_map_size)
529{
38fae1d3 530 struct lttng_ust_shm_handle *handle;
193183fb
MD
531 struct shm_object *object;
532
38fae1d3 533 handle = zmalloc(sizeof(struct lttng_ust_shm_handle));
193183fb
MD
534 if (!handle)
535 return NULL;
536
537 /* Allocate table for channel + per-cpu buffers */
538 handle->table = shm_object_table_create(1 + num_possible_cpus());
539 if (!handle->table)
540 goto error_table_alloc;
541 /* Add channel object */
542 object = shm_object_table_append_shadow(handle->table,
543 shm_fd, wait_fd, memory_map_size);
544 if (!object)
545 goto error_table_object;
57773204
MD
546 /* struct channel is at object 0, offset 0 (hardcoded) */
547 handle->chan._ref.index = 0;
548 handle->chan._ref.offset = 0;
193183fb
MD
549 return handle;
550
551error_table_object:
552 shm_object_table_destroy(handle->table);
553error_table_alloc:
554 free(handle);
555 return NULL;
556}
557
38fae1d3 558int channel_handle_add_stream(struct lttng_ust_shm_handle *handle,
193183fb
MD
559 int shm_fd, int wait_fd, uint64_t memory_map_size)
560{
561 struct shm_object *object;
562
563 /* Add stream object */
564 object = shm_object_table_append_shadow(handle->table,
565 shm_fd, wait_fd, memory_map_size);
566 if (!object)
567 return -1;
568 return 0;
569}
570
852c2936 571static
38fae1d3 572void channel_release(struct channel *chan, struct lttng_ust_shm_handle *handle,
824f40b8 573 int shadow)
852c2936 574{
824f40b8 575 channel_free(chan, handle, shadow);
852c2936
MD
576}
577
578/**
579 * channel_destroy - Finalize, wait for q.s. and destroy channel.
580 * @chan: channel to destroy
581 *
582 * Holds cpu hotplug.
431d5cf0
MD
583 * Call "destroy" callback, finalize channels, decrement the channel
584 * reference count. Note that when readers have completed data
585 * consumption of finalized channels, get_subbuf() will return -ENODATA.
a3f61e7f 586 * They should release their handle at that point.
852c2936 587 */
a3f61e7f 588void channel_destroy(struct channel *chan, struct lttng_ust_shm_handle *handle,
824f40b8 589 int shadow)
852c2936 590{
824f40b8
MD
591 if (shadow) {
592 channel_release(chan, handle, shadow);
a3f61e7f 593 return;
824f40b8
MD
594 }
595
1d498196 596 channel_unregister_notifiers(chan, handle);
852c2936 597
45e9e699
MD
598 /*
599 * Note: the consumer takes care of finalizing and switching the
600 * buffers.
601 */
852c2936 602
431d5cf0
MD
603 /*
604 * sessiond/consumer are keeping a reference on the shm file
605 * descriptor directly. No need to refcount.
606 */
824f40b8 607 channel_release(chan, handle, shadow);
a3f61e7f 608 return;
852c2936 609}
852c2936 610
4cfec15c
MD
611struct lttng_ust_lib_ring_buffer *channel_get_ring_buffer(
612 const struct lttng_ust_lib_ring_buffer_config *config,
1d498196 613 struct channel *chan, int cpu,
38fae1d3 614 struct lttng_ust_shm_handle *handle,
ef9ff354
MD
615 int **shm_fd, int **wait_fd,
616 uint64_t **memory_map_size)
852c2936 617{
381c0f1e
MD
618 struct shm_ref *ref;
619
620 if (config->alloc == RING_BUFFER_ALLOC_GLOBAL) {
621 ref = &chan->backend.buf[0].shmp._ref;
622 shm_get_object_data(handle, ref, shm_fd, wait_fd,
623 memory_map_size);
1d498196 624 return shmp(handle, chan->backend.buf[0].shmp);
381c0f1e 625 } else {
e095d803
MD
626 if (cpu >= num_possible_cpus())
627 return NULL;
381c0f1e
MD
628 ref = &chan->backend.buf[cpu].shmp._ref;
629 shm_get_object_data(handle, ref, shm_fd, wait_fd,
630 memory_map_size);
1d498196 631 return shmp(handle, chan->backend.buf[cpu].shmp);
381c0f1e 632 }
852c2936 633}
852c2936 634
4cfec15c 635int lib_ring_buffer_open_read(struct lttng_ust_lib_ring_buffer *buf,
38fae1d3 636 struct lttng_ust_shm_handle *handle,
824f40b8 637 int shadow)
852c2936 638{
824f40b8
MD
639 if (shadow) {
640 if (uatomic_cmpxchg(&buf->active_shadow_readers, 0, 1) != 0)
641 return -EBUSY;
642 cmm_smp_mb();
643 return 0;
644 }
a6352fd4 645 if (uatomic_cmpxchg(&buf->active_readers, 0, 1) != 0)
852c2936 646 return -EBUSY;
a6352fd4 647 cmm_smp_mb();
852c2936
MD
648 return 0;
649}
852c2936 650
4cfec15c 651void lib_ring_buffer_release_read(struct lttng_ust_lib_ring_buffer *buf,
38fae1d3 652 struct lttng_ust_shm_handle *handle,
824f40b8 653 int shadow)
852c2936 654{
1d498196 655 struct channel *chan = shmp(handle, buf->backend.chan);
852c2936 656
824f40b8
MD
657 if (shadow) {
658 CHAN_WARN_ON(chan, uatomic_read(&buf->active_shadow_readers) != 1);
659 cmm_smp_mb();
660 uatomic_dec(&buf->active_shadow_readers);
661 return;
662 }
a6352fd4
MD
663 CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1);
664 cmm_smp_mb();
665 uatomic_dec(&buf->active_readers);
852c2936
MD
666}
667
668/**
669 * lib_ring_buffer_snapshot - save subbuffer position snapshot (for read)
670 * @buf: ring buffer
671 * @consumed: consumed count indicating the position where to read
672 * @produced: produced count, indicates position when to stop reading
673 *
674 * Returns -ENODATA if buffer is finalized, -EAGAIN if there is currently no
675 * data to read at consumed position, or 0 if the get operation succeeds.
852c2936
MD
676 */
677
4cfec15c 678int lib_ring_buffer_snapshot(struct lttng_ust_lib_ring_buffer *buf,
1d498196 679 unsigned long *consumed, unsigned long *produced,
38fae1d3 680 struct lttng_ust_shm_handle *handle)
852c2936 681{
1d498196 682 struct channel *chan = shmp(handle, buf->backend.chan);
4cfec15c 683 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
852c2936
MD
684 unsigned long consumed_cur, write_offset;
685 int finalized;
686
14641deb 687 finalized = CMM_ACCESS_ONCE(buf->finalized);
852c2936
MD
688 /*
689 * Read finalized before counters.
690 */
a6352fd4
MD
691 cmm_smp_rmb();
692 consumed_cur = uatomic_read(&buf->consumed);
852c2936
MD
693 /*
694 * No need to issue a memory barrier between consumed count read and
695 * write offset read, because consumed count can only change
696 * concurrently in overwrite mode, and we keep a sequence counter
697 * identifier derived from the write offset to check we are getting
698 * the same sub-buffer we are expecting (the sub-buffers are atomically
699 * "tagged" upon writes, tags are checked upon read).
700 */
701 write_offset = v_read(config, &buf->offset);
702
703 /*
704 * Check that we are not about to read the same subbuffer in
705 * which the writer head is.
706 */
707 if (subbuf_trunc(write_offset, chan) - subbuf_trunc(consumed_cur, chan)
708 == 0)
709 goto nodata;
710
711 *consumed = consumed_cur;
712 *produced = subbuf_trunc(write_offset, chan);
713
714 return 0;
715
716nodata:
717 /*
718 * The memory barriers __wait_event()/wake_up_interruptible() take care
719 * of "raw_spin_is_locked" memory ordering.
720 */
721 if (finalized)
722 return -ENODATA;
852c2936
MD
723 else
724 return -EAGAIN;
725}
852c2936
MD
726
727/**
728 * lib_ring_buffer_put_snapshot - move consumed counter forward
729 * @buf: ring buffer
730 * @consumed_new: new consumed count value
731 */
4cfec15c 732void lib_ring_buffer_move_consumer(struct lttng_ust_lib_ring_buffer *buf,
1d498196 733 unsigned long consumed_new,
38fae1d3 734 struct lttng_ust_shm_handle *handle)
852c2936 735{
4cfec15c 736 struct lttng_ust_lib_ring_buffer_backend *bufb = &buf->backend;
1d498196 737 struct channel *chan = shmp(handle, bufb->chan);
852c2936
MD
738 unsigned long consumed;
739
824f40b8
MD
740 CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1
741 && uatomic_read(&buf->active_shadow_readers) != 1);
852c2936
MD
742
743 /*
744 * Only push the consumed value forward.
745 * If the consumed cmpxchg fails, this is because we have been pushed by
746 * the writer in flight recorder mode.
747 */
a6352fd4 748 consumed = uatomic_read(&buf->consumed);
852c2936 749 while ((long) consumed - (long) consumed_new < 0)
a6352fd4
MD
750 consumed = uatomic_cmpxchg(&buf->consumed, consumed,
751 consumed_new);
852c2936 752}
852c2936
MD
753
754/**
755 * lib_ring_buffer_get_subbuf - get exclusive access to subbuffer for reading
756 * @buf: ring buffer
757 * @consumed: consumed count indicating the position where to read
758 *
759 * Returns -ENODATA if buffer is finalized, -EAGAIN if there is currently no
760 * data to read at consumed position, or 0 if the get operation succeeds.
852c2936 761 */
4cfec15c 762int lib_ring_buffer_get_subbuf(struct lttng_ust_lib_ring_buffer *buf,
1d498196 763 unsigned long consumed,
38fae1d3 764 struct lttng_ust_shm_handle *handle)
852c2936 765{
1d498196 766 struct channel *chan = shmp(handle, buf->backend.chan);
4cfec15c 767 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
852c2936
MD
768 unsigned long consumed_cur, consumed_idx, commit_count, write_offset;
769 int ret;
770 int finalized;
771
772retry:
14641deb 773 finalized = CMM_ACCESS_ONCE(buf->finalized);
852c2936
MD
774 /*
775 * Read finalized before counters.
776 */
a6352fd4
MD
777 cmm_smp_rmb();
778 consumed_cur = uatomic_read(&buf->consumed);
852c2936 779 consumed_idx = subbuf_index(consumed, chan);
4746ae29 780 commit_count = v_read(config, &shmp_index(handle, buf->commit_cold, consumed_idx)->cc_sb);
852c2936
MD
781 /*
782 * Make sure we read the commit count before reading the buffer
783 * data and the write offset. Correct consumed offset ordering
784 * wrt commit count is insured by the use of cmpxchg to update
785 * the consumed offset.
852c2936 786 */
a6352fd4
MD
787 /*
788 * Local rmb to match the remote wmb to read the commit count
789 * before the buffer data and the write offset.
790 */
791 cmm_smp_rmb();
852c2936
MD
792
793 write_offset = v_read(config, &buf->offset);
794
795 /*
796 * Check that the buffer we are getting is after or at consumed_cur
797 * position.
798 */
799 if ((long) subbuf_trunc(consumed, chan)
800 - (long) subbuf_trunc(consumed_cur, chan) < 0)
801 goto nodata;
802
803 /*
804 * Check that the subbuffer we are trying to consume has been
805 * already fully committed.
806 */
807 if (((commit_count - chan->backend.subbuf_size)
808 & chan->commit_count_mask)
809 - (buf_trunc(consumed_cur, chan)
810 >> chan->backend.num_subbuf_order)
811 != 0)
812 goto nodata;
813
814 /*
815 * Check that we are not about to read the same subbuffer in
816 * which the writer head is.
817 */
818 if (subbuf_trunc(write_offset, chan) - subbuf_trunc(consumed_cur, chan)
819 == 0)
820 goto nodata;
821
822 /*
823 * Failure to get the subbuffer causes a busy-loop retry without going
824 * to a wait queue. These are caused by short-lived race windows where
825 * the writer is getting access to a subbuffer we were trying to get
826 * access to. Also checks that the "consumed" buffer count we are
827 * looking for matches the one contained in the subbuffer id.
828 */
829 ret = update_read_sb_index(config, &buf->backend, &chan->backend,
1d498196
MD
830 consumed_idx, buf_trunc_val(consumed, chan),
831 handle);
852c2936
MD
832 if (ret)
833 goto retry;
834 subbuffer_id_clear_noref(config, &buf->backend.buf_rsb.id);
835
836 buf->get_subbuf_consumed = consumed;
837 buf->get_subbuf = 1;
838
839 return 0;
840
841nodata:
842 /*
843 * The memory barriers __wait_event()/wake_up_interruptible() take care
844 * of "raw_spin_is_locked" memory ordering.
845 */
846 if (finalized)
847 return -ENODATA;
852c2936
MD
848 else
849 return -EAGAIN;
850}
852c2936
MD
851
852/**
853 * lib_ring_buffer_put_subbuf - release exclusive subbuffer access
854 * @buf: ring buffer
855 */
4cfec15c 856void lib_ring_buffer_put_subbuf(struct lttng_ust_lib_ring_buffer *buf,
38fae1d3 857 struct lttng_ust_shm_handle *handle)
852c2936 858{
4cfec15c 859 struct lttng_ust_lib_ring_buffer_backend *bufb = &buf->backend;
1d498196 860 struct channel *chan = shmp(handle, bufb->chan);
4cfec15c 861 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
852c2936
MD
862 unsigned long read_sb_bindex, consumed_idx, consumed;
863
824f40b8
MD
864 CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1
865 && uatomic_read(&buf->active_shadow_readers) != 1);
852c2936
MD
866
867 if (!buf->get_subbuf) {
868 /*
869 * Reader puts a subbuffer it did not get.
870 */
871 CHAN_WARN_ON(chan, 1);
872 return;
873 }
874 consumed = buf->get_subbuf_consumed;
875 buf->get_subbuf = 0;
876
877 /*
878 * Clear the records_unread counter. (overruns counter)
879 * Can still be non-zero if a file reader simply grabbed the data
880 * without using iterators.
881 * Can be below zero if an iterator is used on a snapshot more than
882 * once.
883 */
884 read_sb_bindex = subbuffer_id_get_index(config, bufb->buf_rsb.id);
885 v_add(config, v_read(config,
4746ae29 886 &shmp(handle, shmp_index(handle, bufb->array, read_sb_bindex)->shmp)->records_unread),
852c2936 887 &bufb->records_read);
4746ae29 888 v_set(config, &shmp(handle, shmp_index(handle, bufb->array, read_sb_bindex)->shmp)->records_unread, 0);
852c2936
MD
889 CHAN_WARN_ON(chan, config->mode == RING_BUFFER_OVERWRITE
890 && subbuffer_id_is_noref(config, bufb->buf_rsb.id));
891 subbuffer_id_set_noref(config, &bufb->buf_rsb.id);
892
893 /*
894 * Exchange the reader subbuffer with the one we put in its place in the
895 * writer subbuffer table. Expect the original consumed count. If
896 * update_read_sb_index fails, this is because the writer updated the
897 * subbuffer concurrently. We should therefore keep the subbuffer we
898 * currently have: it has become invalid to try reading this sub-buffer
899 * consumed count value anyway.
900 */
901 consumed_idx = subbuf_index(consumed, chan);
902 update_read_sb_index(config, &buf->backend, &chan->backend,
1d498196
MD
903 consumed_idx, buf_trunc_val(consumed, chan),
904 handle);
852c2936
MD
905 /*
906 * update_read_sb_index return value ignored. Don't exchange sub-buffer
907 * if the writer concurrently updated it.
908 */
909}
852c2936
MD
910
911/*
912 * cons_offset is an iterator on all subbuffer offsets between the reader
913 * position and the writer position. (inclusive)
914 */
915static
4cfec15c 916void lib_ring_buffer_print_subbuffer_errors(struct lttng_ust_lib_ring_buffer *buf,
852c2936
MD
917 struct channel *chan,
918 unsigned long cons_offset,
1d498196 919 int cpu,
38fae1d3 920 struct lttng_ust_shm_handle *handle)
852c2936 921{
4cfec15c 922 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
852c2936
MD
923 unsigned long cons_idx, commit_count, commit_count_sb;
924
925 cons_idx = subbuf_index(cons_offset, chan);
4746ae29
MD
926 commit_count = v_read(config, &shmp_index(handle, buf->commit_hot, cons_idx)->cc);
927 commit_count_sb = v_read(config, &shmp_index(handle, buf->commit_cold, cons_idx)->cc_sb);
852c2936
MD
928
929 if (subbuf_offset(commit_count, chan) != 0)
4d3c9523 930 DBG("ring buffer %s, cpu %d: "
852c2936
MD
931 "commit count in subbuffer %lu,\n"
932 "expecting multiples of %lu bytes\n"
933 " [ %lu bytes committed, %lu bytes reader-visible ]\n",
934 chan->backend.name, cpu, cons_idx,
935 chan->backend.subbuf_size,
936 commit_count, commit_count_sb);
937
4d3c9523 938 DBG("ring buffer: %s, cpu %d: %lu bytes committed\n",
852c2936
MD
939 chan->backend.name, cpu, commit_count);
940}
941
942static
4cfec15c 943void lib_ring_buffer_print_buffer_errors(struct lttng_ust_lib_ring_buffer *buf,
852c2936 944 struct channel *chan,
1d498196 945 void *priv, int cpu,
38fae1d3 946 struct lttng_ust_shm_handle *handle)
852c2936 947{
4cfec15c 948 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
852c2936
MD
949 unsigned long write_offset, cons_offset;
950
852c2936
MD
951 /*
952 * No need to order commit_count, write_offset and cons_offset reads
953 * because we execute at teardown when no more writer nor reader
954 * references are left.
955 */
956 write_offset = v_read(config, &buf->offset);
a6352fd4 957 cons_offset = uatomic_read(&buf->consumed);
852c2936 958 if (write_offset != cons_offset)
4d3c9523 959 DBG("ring buffer %s, cpu %d: "
852c2936
MD
960 "non-consumed data\n"
961 " [ %lu bytes written, %lu bytes read ]\n",
962 chan->backend.name, cpu, write_offset, cons_offset);
963
a6352fd4 964 for (cons_offset = uatomic_read(&buf->consumed);
852c2936
MD
965 (long) (subbuf_trunc((unsigned long) v_read(config, &buf->offset),
966 chan)
967 - cons_offset) > 0;
968 cons_offset = subbuf_align(cons_offset, chan))
969 lib_ring_buffer_print_subbuffer_errors(buf, chan, cons_offset,
1d498196 970 cpu, handle);
852c2936
MD
971}
972
973static
974void lib_ring_buffer_print_errors(struct channel *chan,
4cfec15c 975 struct lttng_ust_lib_ring_buffer *buf, int cpu,
38fae1d3 976 struct lttng_ust_shm_handle *handle)
852c2936 977{
4cfec15c 978 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
a3f61e7f 979 void *priv = channel_get_private(chan);
852c2936 980
4d3c9523 981 DBG("ring buffer %s, cpu %d: %lu records written, "
852c2936
MD
982 "%lu records overrun\n",
983 chan->backend.name, cpu,
984 v_read(config, &buf->records_count),
985 v_read(config, &buf->records_overrun));
986
987 if (v_read(config, &buf->records_lost_full)
988 || v_read(config, &buf->records_lost_wrap)
989 || v_read(config, &buf->records_lost_big))
4d3c9523 990 DBG("ring buffer %s, cpu %d: records were lost. Caused by:\n"
852c2936
MD
991 " [ %lu buffer full, %lu nest buffer wrap-around, "
992 "%lu event too big ]\n",
993 chan->backend.name, cpu,
994 v_read(config, &buf->records_lost_full),
995 v_read(config, &buf->records_lost_wrap),
996 v_read(config, &buf->records_lost_big));
997
1d498196 998 lib_ring_buffer_print_buffer_errors(buf, chan, priv, cpu, handle);
852c2936
MD
999}
1000
1001/*
1002 * lib_ring_buffer_switch_old_start: Populate old subbuffer header.
1003 *
1004 * Only executed when the buffer is finalized, in SWITCH_FLUSH.
1005 */
1006static
4cfec15c 1007void lib_ring_buffer_switch_old_start(struct lttng_ust_lib_ring_buffer *buf,
852c2936
MD
1008 struct channel *chan,
1009 struct switch_offsets *offsets,
2fed87ae 1010 uint64_t tsc,
38fae1d3 1011 struct lttng_ust_shm_handle *handle)
852c2936 1012{
4cfec15c 1013 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
852c2936
MD
1014 unsigned long oldidx = subbuf_index(offsets->old, chan);
1015 unsigned long commit_count;
1016
1d498196 1017 config->cb.buffer_begin(buf, tsc, oldidx, handle);
852c2936
MD
1018
1019 /*
1020 * Order all writes to buffer before the commit count update that will
1021 * determine that the subbuffer is full.
1022 */
a6352fd4 1023 cmm_smp_wmb();
852c2936 1024 v_add(config, config->cb.subbuffer_header_size(),
4746ae29
MD
1025 &shmp_index(handle, buf->commit_hot, oldidx)->cc);
1026 commit_count = v_read(config, &shmp_index(handle, buf->commit_hot, oldidx)->cc);
852c2936
MD
1027 /* Check if the written buffer has to be delivered */
1028 lib_ring_buffer_check_deliver(config, buf, chan, offsets->old,
1d498196 1029 commit_count, oldidx, handle);
852c2936
MD
1030 lib_ring_buffer_write_commit_counter(config, buf, chan, oldidx,
1031 offsets->old, commit_count,
1d498196
MD
1032 config->cb.subbuffer_header_size(),
1033 handle);
852c2936
MD
1034}
1035
1036/*
1037 * lib_ring_buffer_switch_old_end: switch old subbuffer
1038 *
1039 * Note : offset_old should never be 0 here. It is ok, because we never perform
1040 * buffer switch on an empty subbuffer in SWITCH_ACTIVE mode. The caller
1041 * increments the offset_old value when doing a SWITCH_FLUSH on an empty
1042 * subbuffer.
1043 */
1044static
4cfec15c 1045void lib_ring_buffer_switch_old_end(struct lttng_ust_lib_ring_buffer *buf,
852c2936
MD
1046 struct channel *chan,
1047 struct switch_offsets *offsets,
2fed87ae 1048 uint64_t tsc,
38fae1d3 1049 struct lttng_ust_shm_handle *handle)
852c2936 1050{
4cfec15c 1051 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
852c2936
MD
1052 unsigned long oldidx = subbuf_index(offsets->old - 1, chan);
1053 unsigned long commit_count, padding_size, data_size;
1054
1055 data_size = subbuf_offset(offsets->old - 1, chan) + 1;
1056 padding_size = chan->backend.subbuf_size - data_size;
1d498196
MD
1057 subbuffer_set_data_size(config, &buf->backend, oldidx, data_size,
1058 handle);
852c2936
MD
1059
1060 /*
1061 * Order all writes to buffer before the commit count update that will
1062 * determine that the subbuffer is full.
1063 */
a6352fd4 1064 cmm_smp_wmb();
4746ae29
MD
1065 v_add(config, padding_size, &shmp_index(handle, buf->commit_hot, oldidx)->cc);
1066 commit_count = v_read(config, &shmp_index(handle, buf->commit_hot, oldidx)->cc);
852c2936 1067 lib_ring_buffer_check_deliver(config, buf, chan, offsets->old - 1,
1d498196 1068 commit_count, oldidx, handle);
852c2936
MD
1069 lib_ring_buffer_write_commit_counter(config, buf, chan, oldidx,
1070 offsets->old, commit_count,
1d498196 1071 padding_size, handle);
852c2936
MD
1072}
1073
1074/*
1075 * lib_ring_buffer_switch_new_start: Populate new subbuffer.
1076 *
1077 * This code can be executed unordered : writers may already have written to the
1078 * sub-buffer before this code gets executed, caution. The commit makes sure
1079 * that this code is executed before the deliver of this sub-buffer.
1080 */
1081static
4cfec15c 1082void lib_ring_buffer_switch_new_start(struct lttng_ust_lib_ring_buffer *buf,
852c2936
MD
1083 struct channel *chan,
1084 struct switch_offsets *offsets,
2fed87ae 1085 uint64_t tsc,
38fae1d3 1086 struct lttng_ust_shm_handle *handle)
852c2936 1087{
4cfec15c 1088 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
852c2936
MD
1089 unsigned long beginidx = subbuf_index(offsets->begin, chan);
1090 unsigned long commit_count;
1091
1d498196 1092 config->cb.buffer_begin(buf, tsc, beginidx, handle);
852c2936
MD
1093
1094 /*
1095 * Order all writes to buffer before the commit count update that will
1096 * determine that the subbuffer is full.
1097 */
a6352fd4 1098 cmm_smp_wmb();
852c2936 1099 v_add(config, config->cb.subbuffer_header_size(),
4746ae29
MD
1100 &shmp_index(handle, buf->commit_hot, beginidx)->cc);
1101 commit_count = v_read(config, &shmp_index(handle, buf->commit_hot, beginidx)->cc);
852c2936
MD
1102 /* Check if the written buffer has to be delivered */
1103 lib_ring_buffer_check_deliver(config, buf, chan, offsets->begin,
1d498196 1104 commit_count, beginidx, handle);
852c2936
MD
1105 lib_ring_buffer_write_commit_counter(config, buf, chan, beginidx,
1106 offsets->begin, commit_count,
1d498196
MD
1107 config->cb.subbuffer_header_size(),
1108 handle);
852c2936
MD
1109}
1110
1111/*
1112 * lib_ring_buffer_switch_new_end: finish switching current subbuffer
1113 *
1114 * The only remaining threads could be the ones with pending commits. They will
1115 * have to do the deliver themselves.
1116 */
1117static
4cfec15c 1118void lib_ring_buffer_switch_new_end(struct lttng_ust_lib_ring_buffer *buf,
1d498196
MD
1119 struct channel *chan,
1120 struct switch_offsets *offsets,
2fed87ae 1121 uint64_t tsc,
38fae1d3 1122 struct lttng_ust_shm_handle *handle)
852c2936 1123{
4cfec15c 1124 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
852c2936
MD
1125 unsigned long endidx = subbuf_index(offsets->end - 1, chan);
1126 unsigned long commit_count, padding_size, data_size;
1127
1128 data_size = subbuf_offset(offsets->end - 1, chan) + 1;
1129 padding_size = chan->backend.subbuf_size - data_size;
1d498196
MD
1130 subbuffer_set_data_size(config, &buf->backend, endidx, data_size,
1131 handle);
852c2936
MD
1132
1133 /*
1134 * Order all writes to buffer before the commit count update that will
1135 * determine that the subbuffer is full.
1136 */
a6352fd4 1137 cmm_smp_wmb();
4746ae29
MD
1138 v_add(config, padding_size, &shmp_index(handle, buf->commit_hot, endidx)->cc);
1139 commit_count = v_read(config, &shmp_index(handle, buf->commit_hot, endidx)->cc);
852c2936 1140 lib_ring_buffer_check_deliver(config, buf, chan, offsets->end - 1,
1d498196 1141 commit_count, endidx, handle);
852c2936
MD
1142 lib_ring_buffer_write_commit_counter(config, buf, chan, endidx,
1143 offsets->end, commit_count,
1d498196 1144 padding_size, handle);
852c2936
MD
1145}
1146
1147/*
1148 * Returns :
1149 * 0 if ok
1150 * !0 if execution must be aborted.
1151 */
1152static
1153int lib_ring_buffer_try_switch_slow(enum switch_mode mode,
4cfec15c 1154 struct lttng_ust_lib_ring_buffer *buf,
852c2936
MD
1155 struct channel *chan,
1156 struct switch_offsets *offsets,
2fed87ae 1157 uint64_t *tsc)
852c2936 1158{
4cfec15c 1159 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
852c2936
MD
1160 unsigned long off;
1161
1162 offsets->begin = v_read(config, &buf->offset);
1163 offsets->old = offsets->begin;
1164 offsets->switch_old_start = 0;
1165 off = subbuf_offset(offsets->begin, chan);
1166
1167 *tsc = config->cb.ring_buffer_clock_read(chan);
1168
1169 /*
1170 * Ensure we flush the header of an empty subbuffer when doing the
1171 * finalize (SWITCH_FLUSH). This ensures that we end up knowing the
1172 * total data gathering duration even if there were no records saved
1173 * after the last buffer switch.
1174 * In SWITCH_ACTIVE mode, switch the buffer when it contains events.
1175 * SWITCH_ACTIVE only flushes the current subbuffer, dealing with end of
1176 * subbuffer header as appropriate.
1177 * The next record that reserves space will be responsible for
1178 * populating the following subbuffer header. We choose not to populate
1179 * the next subbuffer header here because we want to be able to use
a6352fd4
MD
1180 * SWITCH_ACTIVE for periodical buffer flush, which must
1181 * guarantee that all the buffer content (records and header
1182 * timestamps) are visible to the reader. This is required for
1183 * quiescence guarantees for the fusion merge.
852c2936
MD
1184 */
1185 if (mode == SWITCH_FLUSH || off > 0) {
b5a3dfa5 1186 if (caa_unlikely(off == 0)) {
852c2936
MD
1187 /*
1188 * The client does not save any header information.
1189 * Don't switch empty subbuffer on finalize, because it
1190 * is invalid to deliver a completely empty subbuffer.
1191 */
1192 if (!config->cb.subbuffer_header_size())
1193 return -1;
1194 /*
1195 * Need to write the subbuffer start header on finalize.
1196 */
1197 offsets->switch_old_start = 1;
1198 }
1199 offsets->begin = subbuf_align(offsets->begin, chan);
1200 } else
1201 return -1; /* we do not have to switch : buffer is empty */
1202 /* Note: old points to the next subbuf at offset 0 */
1203 offsets->end = offsets->begin;
1204 return 0;
1205}
1206
1207/*
1208 * Force a sub-buffer switch. This operation is completely reentrant : can be
1209 * called while tracing is active with absolutely no lock held.
1210 *
1211 * Note, however, that as a v_cmpxchg is used for some atomic
1212 * operations, this function must be called from the CPU which owns the buffer
1213 * for a ACTIVE flush.
1214 */
4cfec15c 1215void lib_ring_buffer_switch_slow(struct lttng_ust_lib_ring_buffer *buf, enum switch_mode mode,
38fae1d3 1216 struct lttng_ust_shm_handle *handle)
852c2936 1217{
1d498196 1218 struct channel *chan = shmp(handle, buf->backend.chan);
4cfec15c 1219 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
852c2936
MD
1220 struct switch_offsets offsets;
1221 unsigned long oldidx;
2fed87ae 1222 uint64_t tsc;
852c2936
MD
1223
1224 offsets.size = 0;
1225
1226 /*
1227 * Perform retryable operations.
1228 */
1229 do {
1230 if (lib_ring_buffer_try_switch_slow(mode, buf, chan, &offsets,
1231 &tsc))
1232 return; /* Switch not needed */
1233 } while (v_cmpxchg(config, &buf->offset, offsets.old, offsets.end)
1234 != offsets.old);
1235
1236 /*
1237 * Atomically update last_tsc. This update races against concurrent
1238 * atomic updates, but the race will always cause supplementary full TSC
1239 * records, never the opposite (missing a full TSC record when it would
1240 * be needed).
1241 */
1242 save_last_tsc(config, buf, tsc);
1243
1244 /*
1245 * Push the reader if necessary
1246 */
1247 lib_ring_buffer_reserve_push_reader(buf, chan, offsets.old);
1248
1249 oldidx = subbuf_index(offsets.old, chan);
1d498196 1250 lib_ring_buffer_clear_noref(config, &buf->backend, oldidx, handle);
852c2936
MD
1251
1252 /*
1253 * May need to populate header start on SWITCH_FLUSH.
1254 */
1255 if (offsets.switch_old_start) {
1d498196 1256 lib_ring_buffer_switch_old_start(buf, chan, &offsets, tsc, handle);
852c2936
MD
1257 offsets.old += config->cb.subbuffer_header_size();
1258 }
1259
1260 /*
1261 * Switch old subbuffer.
1262 */
1d498196 1263 lib_ring_buffer_switch_old_end(buf, chan, &offsets, tsc, handle);
852c2936 1264}
852c2936
MD
1265
1266/*
1267 * Returns :
1268 * 0 if ok
1269 * -ENOSPC if event size is too large for packet.
1270 * -ENOBUFS if there is currently not enough space in buffer for the event.
1271 * -EIO if data cannot be written into the buffer for any other reason.
1272 */
1273static
4cfec15c 1274int lib_ring_buffer_try_reserve_slow(struct lttng_ust_lib_ring_buffer *buf,
852c2936
MD
1275 struct channel *chan,
1276 struct switch_offsets *offsets,
4cfec15c 1277 struct lttng_ust_lib_ring_buffer_ctx *ctx)
852c2936 1278{
4cfec15c 1279 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
38fae1d3 1280 struct lttng_ust_shm_handle *handle = ctx->handle;
852c2936
MD
1281 unsigned long reserve_commit_diff;
1282
1283 offsets->begin = v_read(config, &buf->offset);
1284 offsets->old = offsets->begin;
1285 offsets->switch_new_start = 0;
1286 offsets->switch_new_end = 0;
1287 offsets->switch_old_end = 0;
1288 offsets->pre_header_padding = 0;
1289
1290 ctx->tsc = config->cb.ring_buffer_clock_read(chan);
1291 if ((int64_t) ctx->tsc == -EIO)
1292 return -EIO;
1293
1294 if (last_tsc_overflow(config, buf, ctx->tsc))
1295 ctx->rflags |= RING_BUFFER_RFLAG_FULL_TSC;
1296
b5a3dfa5 1297 if (caa_unlikely(subbuf_offset(offsets->begin, ctx->chan) == 0)) {
852c2936
MD
1298 offsets->switch_new_start = 1; /* For offsets->begin */
1299 } else {
1300 offsets->size = config->cb.record_header_size(config, chan,
1301 offsets->begin,
1302 &offsets->pre_header_padding,
1303 ctx);
1304 offsets->size +=
1305 lib_ring_buffer_align(offsets->begin + offsets->size,
1306 ctx->largest_align)
1307 + ctx->data_size;
b5a3dfa5 1308 if (caa_unlikely(subbuf_offset(offsets->begin, chan) +
852c2936
MD
1309 offsets->size > chan->backend.subbuf_size)) {
1310 offsets->switch_old_end = 1; /* For offsets->old */
1311 offsets->switch_new_start = 1; /* For offsets->begin */
1312 }
1313 }
b5a3dfa5 1314 if (caa_unlikely(offsets->switch_new_start)) {
852c2936
MD
1315 unsigned long sb_index;
1316
1317 /*
1318 * We are typically not filling the previous buffer completely.
1319 */
b5a3dfa5 1320 if (caa_likely(offsets->switch_old_end))
852c2936
MD
1321 offsets->begin = subbuf_align(offsets->begin, chan);
1322 offsets->begin = offsets->begin
1323 + config->cb.subbuffer_header_size();
1324 /* Test new buffer integrity */
1325 sb_index = subbuf_index(offsets->begin, chan);
1326 reserve_commit_diff =
1327 (buf_trunc(offsets->begin, chan)
1328 >> chan->backend.num_subbuf_order)
1329 - ((unsigned long) v_read(config,
4746ae29 1330 &shmp_index(handle, buf->commit_cold, sb_index)->cc_sb)
852c2936 1331 & chan->commit_count_mask);
b5a3dfa5 1332 if (caa_likely(reserve_commit_diff == 0)) {
852c2936 1333 /* Next subbuffer not being written to. */
b5a3dfa5 1334 if (caa_unlikely(config->mode != RING_BUFFER_OVERWRITE &&
852c2936
MD
1335 subbuf_trunc(offsets->begin, chan)
1336 - subbuf_trunc((unsigned long)
a6352fd4 1337 uatomic_read(&buf->consumed), chan)
852c2936
MD
1338 >= chan->backend.buf_size)) {
1339 /*
1340 * We do not overwrite non consumed buffers
1341 * and we are full : record is lost.
1342 */
1343 v_inc(config, &buf->records_lost_full);
c0226302 1344 DBG("Record lost: buffer is full\n");
852c2936
MD
1345 return -ENOBUFS;
1346 } else {
1347 /*
1348 * Next subbuffer not being written to, and we
1349 * are either in overwrite mode or the buffer is
1350 * not full. It's safe to write in this new
1351 * subbuffer.
1352 */
1353 }
1354 } else {
1355 /*
1356 * Next subbuffer reserve offset does not match the
1357 * commit offset. Drop record in producer-consumer and
1358 * overwrite mode. Caused by either a writer OOPS or too
1359 * many nested writes over a reserve/commit pair.
1360 */
1361 v_inc(config, &buf->records_lost_wrap);
c0226302 1362 DBG("Record lost: buffer wrap-around\n");
852c2936
MD
1363 return -EIO;
1364 }
1365 offsets->size =
1366 config->cb.record_header_size(config, chan,
1367 offsets->begin,
1368 &offsets->pre_header_padding,
1369 ctx);
1370 offsets->size +=
1371 lib_ring_buffer_align(offsets->begin + offsets->size,
1372 ctx->largest_align)
1373 + ctx->data_size;
b5a3dfa5 1374 if (caa_unlikely(subbuf_offset(offsets->begin, chan)
852c2936
MD
1375 + offsets->size > chan->backend.subbuf_size)) {
1376 /*
1377 * Record too big for subbuffers, report error, don't
1378 * complete the sub-buffer switch.
1379 */
1380 v_inc(config, &buf->records_lost_big);
c0226302 1381 DBG("Record lost: record size (%zu bytes) is too large for buffer\n", offsets->size);
852c2936
MD
1382 return -ENOSPC;
1383 } else {
1384 /*
1385 * We just made a successful buffer switch and the
1386 * record fits in the new subbuffer. Let's write.
1387 */
1388 }
1389 } else {
1390 /*
1391 * Record fits in the current buffer and we are not on a switch
1392 * boundary. It's safe to write.
1393 */
1394 }
1395 offsets->end = offsets->begin + offsets->size;
1396
b5a3dfa5 1397 if (caa_unlikely(subbuf_offset(offsets->end, chan) == 0)) {
852c2936
MD
1398 /*
1399 * The offset_end will fall at the very beginning of the next
1400 * subbuffer.
1401 */
1402 offsets->switch_new_end = 1; /* For offsets->begin */
1403 }
1404 return 0;
1405}
1406
1407/**
1408 * lib_ring_buffer_reserve_slow - Atomic slot reservation in a buffer.
1409 * @ctx: ring buffer context.
1410 *
1411 * Return : -NOBUFS if not enough space, -ENOSPC if event size too large,
1412 * -EIO for other errors, else returns 0.
1413 * It will take care of sub-buffer switching.
1414 */
4cfec15c 1415int lib_ring_buffer_reserve_slow(struct lttng_ust_lib_ring_buffer_ctx *ctx)
852c2936
MD
1416{
1417 struct channel *chan = ctx->chan;
38fae1d3 1418 struct lttng_ust_shm_handle *handle = ctx->handle;
4cfec15c
MD
1419 const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
1420 struct lttng_ust_lib_ring_buffer *buf;
852c2936
MD
1421 struct switch_offsets offsets;
1422 int ret;
1423
1424 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
1d498196 1425 buf = shmp(handle, chan->backend.buf[ctx->cpu].shmp);
852c2936 1426 else
1d498196 1427 buf = shmp(handle, chan->backend.buf[0].shmp);
852c2936
MD
1428 ctx->buf = buf;
1429
1430 offsets.size = 0;
1431
1432 do {
1433 ret = lib_ring_buffer_try_reserve_slow(buf, chan, &offsets,
1434 ctx);
b5a3dfa5 1435 if (caa_unlikely(ret))
852c2936 1436 return ret;
b5a3dfa5 1437 } while (caa_unlikely(v_cmpxchg(config, &buf->offset, offsets.old,
852c2936
MD
1438 offsets.end)
1439 != offsets.old));
1440
1441 /*
1442 * Atomically update last_tsc. This update races against concurrent
1443 * atomic updates, but the race will always cause supplementary full TSC
1444 * records, never the opposite (missing a full TSC record when it would
1445 * be needed).
1446 */
1447 save_last_tsc(config, buf, ctx->tsc);
1448
1449 /*
1450 * Push the reader if necessary
1451 */
1452 lib_ring_buffer_reserve_push_reader(buf, chan, offsets.end - 1);
1453
1454 /*
1455 * Clear noref flag for this subbuffer.
1456 */
1457 lib_ring_buffer_clear_noref(config, &buf->backend,
1d498196
MD
1458 subbuf_index(offsets.end - 1, chan),
1459 handle);
852c2936
MD
1460
1461 /*
1462 * Switch old subbuffer if needed.
1463 */
b5a3dfa5 1464 if (caa_unlikely(offsets.switch_old_end)) {
852c2936 1465 lib_ring_buffer_clear_noref(config, &buf->backend,
1d498196
MD
1466 subbuf_index(offsets.old - 1, chan),
1467 handle);
1468 lib_ring_buffer_switch_old_end(buf, chan, &offsets, ctx->tsc, handle);
852c2936
MD
1469 }
1470
1471 /*
1472 * Populate new subbuffer.
1473 */
b5a3dfa5 1474 if (caa_unlikely(offsets.switch_new_start))
1d498196 1475 lib_ring_buffer_switch_new_start(buf, chan, &offsets, ctx->tsc, handle);
852c2936 1476
b5a3dfa5 1477 if (caa_unlikely(offsets.switch_new_end))
1d498196 1478 lib_ring_buffer_switch_new_end(buf, chan, &offsets, ctx->tsc, handle);
852c2936
MD
1479
1480 ctx->slot_size = offsets.size;
1481 ctx->pre_offset = offsets.begin;
1482 ctx->buf_offset = offsets.begin + offsets.pre_header_padding;
1483 return 0;
1484}
This page took 0.0997440000000001 seconds and 4 git commands to generate.