Rename struct lib_ring_buffer* to struct lttng_kernel_ring_buffer*
[lttng-modules.git] / src / lib / ringbuffer / ring_buffer_iterator.c
CommitLineData
b7cdc182 1/* SPDX-License-Identifier: (GPL-2.0-only OR LGPL-2.1-only)
9f36eaed 2 *
f3bc08c5
MD
3 * ring_buffer_iterator.c
4 *
f3bc08c5
MD
5 * Ring buffer and channel iterators. Get each event of a channel in order. Uses
6 * a prio heap for per-cpu buffers, giving a O(log(NR_CPUS)) algorithmic
7 * complexity for the "get next event" operation.
8 *
886d51a3 9 * Copyright (C) 2010-2012 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
f3bc08c5
MD
10 */
11
24591303 12#include <ringbuffer/iterator.h>
c075712b 13#include <wrapper/file.h>
80bb2600 14#include <wrapper/uaccess.h>
f3bc08c5
MD
15#include <linux/jiffies.h>
16#include <linux/delay.h>
17#include <linux/module.h>
18
19/*
20 * Safety factor taking into account internal kernel interrupt latency.
21 * Assuming 250ms worse-case latency.
22 */
23#define MAX_SYSTEM_LATENCY 250
24
25/*
26 * Maximum delta expected between trace clocks. At most 1 jiffy delta.
27 */
28#define MAX_CLOCK_DELTA (jiffies_to_usecs(1) * 1000)
29
30/**
31 * lib_ring_buffer_get_next_record - Get the next record in a buffer.
32 * @chan: channel
33 * @buf: buffer
34 *
35 * Returns the size of the event read, -EAGAIN if buffer is empty, -ENODATA if
36 * buffer is empty and finalized. The buffer must already be opened for reading.
37 */
860c213b 38ssize_t lib_ring_buffer_get_next_record(struct lttng_kernel_ring_buffer_channel *chan,
e20c0fec 39 struct lttng_kernel_ring_buffer *buf)
f3bc08c5 40{
e20c0fec
MD
41 const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config;
42 struct lttng_kernel_ring_buffer_iter *iter = &buf->iter;
f3bc08c5
MD
43 int ret;
44
45restart:
46 switch (iter->state) {
47 case ITER_GET_SUBBUF:
48 ret = lib_ring_buffer_get_next_subbuf(buf);
585e5dcc 49 if (ret && !LTTNG_READ_ONCE(buf->finalized)
f3bc08c5
MD
50 && config->alloc == RING_BUFFER_ALLOC_GLOBAL) {
51 /*
52 * Use "pull" scheme for global buffers. The reader
53 * itself flushes the buffer to "pull" data not visible
54 * to readers yet. Flush current subbuffer and re-try.
55 *
56 * Per-CPU buffers rather use a "push" scheme because
57 * the IPI needed to flush all CPU's buffers is too
58 * costly. In the "push" scheme, the reader waits for
da9f3fb7 59 * the writer periodic timer to flush the
f3bc08c5
MD
60 * buffers (keeping track of a quiescent state
61 * timestamp). Therefore, the writer "pushes" data out
62 * of the buffers rather than letting the reader "pull"
63 * data from the buffer.
64 */
65 lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
66 ret = lib_ring_buffer_get_next_subbuf(buf);
67 }
68 if (ret)
69 return ret;
70 iter->consumed = buf->cons_snapshot;
71 iter->data_size = lib_ring_buffer_get_read_data_size(config, buf);
72 iter->read_offset = iter->consumed;
73 /* skip header */
74 iter->read_offset += config->cb.subbuffer_header_size();
75 iter->state = ITER_TEST_RECORD;
76 goto restart;
77 case ITER_TEST_RECORD:
78 if (iter->read_offset - iter->consumed >= iter->data_size) {
79 iter->state = ITER_PUT_SUBBUF;
80 } else {
81 CHAN_WARN_ON(chan, !config->cb.record_get);
82 config->cb.record_get(config, chan, buf,
83 iter->read_offset,
84 &iter->header_len,
85 &iter->payload_len,
86 &iter->timestamp);
87 iter->read_offset += iter->header_len;
88 subbuffer_consume_record(config, &buf->backend);
89 iter->state = ITER_NEXT_RECORD;
90 return iter->payload_len;
91 }
92 goto restart;
93 case ITER_NEXT_RECORD:
94 iter->read_offset += iter->payload_len;
95 iter->state = ITER_TEST_RECORD;
96 goto restart;
97 case ITER_PUT_SUBBUF:
98 lib_ring_buffer_put_next_subbuf(buf);
99 iter->state = ITER_GET_SUBBUF;
100 goto restart;
101 default:
102 CHAN_WARN_ON(chan, 1); /* Should not happen */
103 return -EPERM;
104 }
105}
106EXPORT_SYMBOL_GPL(lib_ring_buffer_get_next_record);
107
e20c0fec 108void lib_ring_buffer_put_current_record(struct lttng_kernel_ring_buffer *buf)
47d4b31d 109{
e20c0fec 110 struct lttng_kernel_ring_buffer_iter *iter;
47d4b31d
MD
111
112 if (!buf)
113 return;
114 iter = &buf->iter;
115 if (iter->state != ITER_NEXT_RECORD)
116 return;
117 iter->read_offset += iter->payload_len;
118 iter->state = ITER_TEST_RECORD;
119 if (iter->read_offset - iter->consumed >= iter->data_size) {
120 lib_ring_buffer_put_next_subbuf(buf);
121 iter->state = ITER_GET_SUBBUF;
122 }
123}
124EXPORT_SYMBOL_GPL(lib_ring_buffer_put_current_record);
125
f3bc08c5
MD
126static int buf_is_higher(void *a, void *b)
127{
e20c0fec
MD
128 struct lttng_kernel_ring_buffer *bufa = a;
129 struct lttng_kernel_ring_buffer *bufb = b;
f3bc08c5
MD
130
131 /* Consider lowest timestamps to be at the top of the heap */
132 return (bufa->iter.timestamp < bufb->iter.timestamp);
133}
134
135static
e20c0fec 136void lib_ring_buffer_get_empty_buf_records(const struct lttng_kernel_ring_buffer_config *config,
860c213b 137 struct lttng_kernel_ring_buffer_channel *chan)
f3bc08c5 138{
a88db018 139 struct lttng_ptr_heap *heap = &chan->iter.heap;
e20c0fec 140 struct lttng_kernel_ring_buffer *buf, *tmp;
f3bc08c5
MD
141 ssize_t len;
142
143 list_for_each_entry_safe(buf, tmp, &chan->iter.empty_head,
144 iter.empty_node) {
145 len = lib_ring_buffer_get_next_record(chan, buf);
146
147 /*
148 * Deal with -EAGAIN and -ENODATA.
149 * len >= 0 means record contains data.
150 * -EBUSY should never happen, because we support only one
151 * reader.
152 */
153 switch (len) {
154 case -EAGAIN:
155 /* Keep node in empty list */
156 break;
157 case -ENODATA:
158 /*
159 * Buffer is finalized. Don't add to list of empty
160 * buffer, because it has no more data to provide, ever.
161 */
162 list_del(&buf->iter.empty_node);
163 break;
164 case -EBUSY:
165 CHAN_WARN_ON(chan, 1);
166 break;
167 default:
168 /*
169 * Insert buffer into the heap, remove from empty buffer
ab2277d6 170 * list.
f3bc08c5
MD
171 */
172 CHAN_WARN_ON(chan, len < 0);
173 list_del(&buf->iter.empty_node);
a88db018 174 CHAN_WARN_ON(chan, lttng_heap_insert(heap, buf));
f3bc08c5
MD
175 }
176 }
177}
178
179static
e20c0fec 180void lib_ring_buffer_wait_for_qs(const struct lttng_kernel_ring_buffer_config *config,
860c213b 181 struct lttng_kernel_ring_buffer_channel *chan)
f3bc08c5
MD
182{
183 u64 timestamp_qs;
184 unsigned long wait_msecs;
185
186 /*
187 * No need to wait if no empty buffers are present.
188 */
189 if (list_empty(&chan->iter.empty_head))
190 return;
191
192 timestamp_qs = config->cb.ring_buffer_clock_read(chan);
193 /*
194 * We need to consider previously empty buffers.
195 * Do a get next buf record on each of them. Add them to
196 * the heap if they have data. If at least one of them
197 * don't have data, we need to wait for
198 * switch_timer_interval + MAX_SYSTEM_LATENCY (so we are sure the
199 * buffers have been switched either by the timer or idle entry) and
200 * check them again, adding them if they have data.
201 */
202 lib_ring_buffer_get_empty_buf_records(config, chan);
203
204 /*
205 * No need to wait if no empty buffers are present.
206 */
207 if (list_empty(&chan->iter.empty_head))
208 return;
209
210 /*
211 * We need to wait for the buffer switch timer to run. If the
212 * CPU is idle, idle entry performed the switch.
213 * TODO: we could optimize further by skipping the sleep if all
214 * empty buffers belong to idle or offline cpus.
215 */
216 wait_msecs = jiffies_to_msecs(chan->switch_timer_interval);
217 wait_msecs += MAX_SYSTEM_LATENCY;
218 msleep(wait_msecs);
219 lib_ring_buffer_get_empty_buf_records(config, chan);
220 /*
221 * Any buffer still in the empty list here cannot possibly
222 * contain an event with a timestamp prior to "timestamp_qs".
223 * The new quiescent state timestamp is the one we grabbed
224 * before waiting for buffer data. It is therefore safe to
225 * ignore empty buffers up to last_qs timestamp for fusion
226 * merge.
227 */
228 chan->iter.last_qs = timestamp_qs;
229}
230
231/**
232 * channel_get_next_record - Get the next record in a channel.
233 * @chan: channel
234 * @ret_buf: the buffer in which the event is located (output)
235 *
236 * Returns the size of new current event, -EAGAIN if all buffers are empty,
237 * -ENODATA if all buffers are empty and finalized. The channel must already be
238 * opened for reading.
239 */
240
860c213b 241ssize_t channel_get_next_record(struct lttng_kernel_ring_buffer_channel *chan,
e20c0fec 242 struct lttng_kernel_ring_buffer **ret_buf)
f3bc08c5 243{
e20c0fec
MD
244 const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config;
245 struct lttng_kernel_ring_buffer *buf;
a88db018 246 struct lttng_ptr_heap *heap;
f3bc08c5
MD
247 ssize_t len;
248
249 if (config->alloc == RING_BUFFER_ALLOC_GLOBAL) {
250 *ret_buf = channel_get_ring_buffer(config, chan, 0);
251 return lib_ring_buffer_get_next_record(chan, *ret_buf);
252 }
253
254 heap = &chan->iter.heap;
255
256 /*
257 * get next record for topmost buffer.
258 */
a88db018 259 buf = lttng_heap_maximum(heap);
f3bc08c5
MD
260 if (buf) {
261 len = lib_ring_buffer_get_next_record(chan, buf);
262 /*
263 * Deal with -EAGAIN and -ENODATA.
264 * len >= 0 means record contains data.
265 */
266 switch (len) {
267 case -EAGAIN:
268 buf->iter.timestamp = 0;
269 list_add(&buf->iter.empty_node, &chan->iter.empty_head);
270 /* Remove topmost buffer from the heap */
a88db018 271 CHAN_WARN_ON(chan, lttng_heap_remove(heap) != buf);
f3bc08c5
MD
272 break;
273 case -ENODATA:
274 /*
275 * Buffer is finalized. Remove buffer from heap and
276 * don't add to list of empty buffer, because it has no
277 * more data to provide, ever.
278 */
a88db018 279 CHAN_WARN_ON(chan, lttng_heap_remove(heap) != buf);
f3bc08c5
MD
280 break;
281 case -EBUSY:
282 CHAN_WARN_ON(chan, 1);
283 break;
284 default:
285 /*
286 * Reinsert buffer into the heap. Note that heap can be
287 * partially empty, so we need to use
a88db018 288 * lttng_heap_replace_max().
f3bc08c5
MD
289 */
290 CHAN_WARN_ON(chan, len < 0);
a88db018 291 CHAN_WARN_ON(chan, lttng_heap_replace_max(heap, buf) != buf);
f3bc08c5
MD
292 break;
293 }
294 }
295
a88db018 296 buf = lttng_heap_maximum(heap);
f3bc08c5
MD
297 if (!buf || buf->iter.timestamp > chan->iter.last_qs) {
298 /*
299 * Deal with buffers previously showing no data.
300 * Add buffers containing data to the heap, update
301 * last_qs.
302 */
303 lib_ring_buffer_wait_for_qs(config, chan);
304 }
305
a88db018 306 *ret_buf = buf = lttng_heap_maximum(heap);
f3bc08c5
MD
307 if (buf) {
308 /*
309 * If this warning triggers, you probably need to check your
310 * system interrupt latency. Typical causes: too many printk()
311 * output going to a serial console with interrupts off.
312 * Allow for MAX_CLOCK_DELTA ns timestamp delta going backward.
313 * Observed on SMP KVM setups with trace_clock().
314 */
315 if (chan->iter.last_timestamp
316 > (buf->iter.timestamp + MAX_CLOCK_DELTA)) {
5a15f70c 317 printk(KERN_WARNING "LTTng: ring_buffer: timestamps going "
f3bc08c5
MD
318 "backward. Last time %llu ns, cpu %d, "
319 "current time %llu ns, cpu %d, "
320 "delta %llu ns.\n",
321 chan->iter.last_timestamp, chan->iter.last_cpu,
322 buf->iter.timestamp, buf->backend.cpu,
323 chan->iter.last_timestamp - buf->iter.timestamp);
324 CHAN_WARN_ON(chan, 1);
325 }
326 chan->iter.last_timestamp = buf->iter.timestamp;
327 chan->iter.last_cpu = buf->backend.cpu;
328 return buf->iter.payload_len;
329 } else {
330 /* Heap is empty */
331 if (list_empty(&chan->iter.empty_head))
332 return -ENODATA; /* All buffers finalized */
333 else
334 return -EAGAIN; /* Temporarily empty */
335 }
336}
337EXPORT_SYMBOL_GPL(channel_get_next_record);
338
339static
e20c0fec 340void lib_ring_buffer_iterator_init(struct lttng_kernel_ring_buffer_channel *chan, struct lttng_kernel_ring_buffer *buf)
f3bc08c5
MD
341{
342 if (buf->iter.allocated)
343 return;
344
345 buf->iter.allocated = 1;
346 if (chan->iter.read_open && !buf->iter.read_open) {
347 CHAN_WARN_ON(chan, lib_ring_buffer_open_read(buf) != 0);
348 buf->iter.read_open = 1;
349 }
350
351 /* Add to list of buffers without any current record */
5a8fd222 352 if (chan->backend.config.alloc == RING_BUFFER_ALLOC_PER_CPU)
f3bc08c5
MD
353 list_add(&buf->iter.empty_node, &chan->iter.empty_head);
354}
355
5f4c791e 356#if (LTTNG_LINUX_VERSION_CODE >= LTTNG_KERNEL_VERSION(4,10,0))
1e367326
MD
357
358int lttng_cpuhp_rb_iter_online(unsigned int cpu,
359 struct lttng_cpuhp_node *node)
360{
860c213b 361 struct lttng_kernel_ring_buffer_channel *chan = container_of(node, struct lttng_kernel_ring_buffer_channel,
1e367326 362 cpuhp_iter_online);
e20c0fec
MD
363 struct lttng_kernel_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu);
364 const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config;
1e367326
MD
365
366 CHAN_WARN_ON(chan, config->alloc == RING_BUFFER_ALLOC_GLOBAL);
367
368 lib_ring_buffer_iterator_init(chan, buf);
369 return 0;
370}
371EXPORT_SYMBOL_GPL(lttng_cpuhp_rb_iter_online);
372
5f4c791e 373#else /* #if (LTTNG_LINUX_VERSION_CODE >= LTTNG_KERNEL_VERSION(4,10,0)) */
1e367326 374
f3bc08c5
MD
375#ifdef CONFIG_HOTPLUG_CPU
376static
e8f071d5 377int channel_iterator_cpu_hotplug(struct notifier_block *nb,
f3bc08c5
MD
378 unsigned long action,
379 void *hcpu)
380{
381 unsigned int cpu = (unsigned long)hcpu;
860c213b 382 struct lttng_kernel_ring_buffer_channel *chan = container_of(nb, struct lttng_kernel_ring_buffer_channel,
f3bc08c5 383 hp_iter_notifier);
e20c0fec
MD
384 struct lttng_kernel_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu);
385 const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
386
387 if (!chan->hp_iter_enable)
388 return NOTIFY_DONE;
389
390 CHAN_WARN_ON(chan, config->alloc == RING_BUFFER_ALLOC_GLOBAL);
391
392 switch (action) {
393 case CPU_DOWN_FAILED:
394 case CPU_DOWN_FAILED_FROZEN:
395 case CPU_ONLINE:
396 case CPU_ONLINE_FROZEN:
397 lib_ring_buffer_iterator_init(chan, buf);
398 return NOTIFY_OK;
399 default:
400 return NOTIFY_DONE;
401 }
402}
403#endif
404
5f4c791e 405#endif /* #else #if (LTTNG_LINUX_VERSION_CODE >= LTTNG_KERNEL_VERSION(4,10,0)) */
1e367326 406
860c213b 407int channel_iterator_init(struct lttng_kernel_ring_buffer_channel *chan)
f3bc08c5 408{
e20c0fec
MD
409 const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config;
410 struct lttng_kernel_ring_buffer *buf;
f3bc08c5
MD
411
412 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
1e367326 413 int ret;
f3bc08c5
MD
414
415 INIT_LIST_HEAD(&chan->iter.empty_head);
a88db018 416 ret = lttng_heap_init(&chan->iter.heap,
41affe31 417 num_possible_cpus(),
f3bc08c5
MD
418 GFP_KERNEL, buf_is_higher);
419 if (ret)
420 return ret;
1e367326 421
5f4c791e 422#if (LTTNG_LINUX_VERSION_CODE >= LTTNG_KERNEL_VERSION(4,10,0))
1e367326
MD
423 chan->cpuhp_iter_online.component = LTTNG_RING_BUFFER_ITER;
424 ret = cpuhp_state_add_instance(lttng_rb_hp_online,
425 &chan->cpuhp_iter_online.node);
426 if (ret)
427 return ret;
5f4c791e 428#else /* #if (LTTNG_LINUX_VERSION_CODE >= LTTNG_KERNEL_VERSION(4,10,0)) */
1e367326
MD
429 {
430 int cpu;
431
432 /*
433 * In case of non-hotplug cpu, if the ring-buffer is allocated
434 * in early initcall, it will not be notified of secondary cpus.
435 * In that off case, we need to allocate for all possible cpus.
436 */
f3bc08c5 437#ifdef CONFIG_HOTPLUG_CPU
1e367326
MD
438 chan->hp_iter_notifier.notifier_call =
439 channel_iterator_cpu_hotplug;
440 chan->hp_iter_notifier.priority = 10;
441 register_cpu_notifier(&chan->hp_iter_notifier);
442
443 get_online_cpus();
444 for_each_online_cpu(cpu) {
445 buf = per_cpu_ptr(chan->backend.buf, cpu);
446 lib_ring_buffer_iterator_init(chan, buf);
447 }
448 chan->hp_iter_enable = 1;
449 put_online_cpus();
f3bc08c5 450#else
1e367326
MD
451 for_each_possible_cpu(cpu) {
452 buf = per_cpu_ptr(chan->backend.buf, cpu);
453 lib_ring_buffer_iterator_init(chan, buf);
454 }
f3bc08c5 455#endif
1e367326 456 }
5f4c791e 457#endif /* #else #if (LTTNG_LINUX_VERSION_CODE >= LTTNG_KERNEL_VERSION(4,10,0)) */
f3bc08c5
MD
458 } else {
459 buf = channel_get_ring_buffer(config, chan, 0);
460 lib_ring_buffer_iterator_init(chan, buf);
461 }
462 return 0;
463}
464
860c213b 465void channel_iterator_unregister_notifiers(struct lttng_kernel_ring_buffer_channel *chan)
f3bc08c5 466{
e20c0fec 467 const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
468
469 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
5f4c791e 470#if (LTTNG_LINUX_VERSION_CODE >= LTTNG_KERNEL_VERSION(4,10,0))
1e367326
MD
471 {
472 int ret;
473
474 ret = cpuhp_state_remove_instance(lttng_rb_hp_online,
475 &chan->cpuhp_iter_online.node);
476 WARN_ON(ret);
477 }
5f4c791e 478#else /* #if (LTTNG_LINUX_VERSION_CODE >= LTTNG_KERNEL_VERSION(4,10,0)) */
f3bc08c5
MD
479 chan->hp_iter_enable = 0;
480 unregister_cpu_notifier(&chan->hp_iter_notifier);
5f4c791e 481#endif /* #else #if (LTTNG_LINUX_VERSION_CODE >= LTTNG_KERNEL_VERSION(4,10,0)) */
f3bc08c5
MD
482 }
483}
484
860c213b 485void channel_iterator_free(struct lttng_kernel_ring_buffer_channel *chan)
f3bc08c5 486{
e20c0fec 487 const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
488
489 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
a88db018 490 lttng_heap_free(&chan->iter.heap);
f3bc08c5
MD
491}
492
e20c0fec 493int lib_ring_buffer_iterator_open(struct lttng_kernel_ring_buffer *buf)
f3bc08c5 494{
860c213b 495 struct lttng_kernel_ring_buffer_channel *chan = buf->backend.chan;
e20c0fec 496 const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
497 CHAN_WARN_ON(chan, config->output != RING_BUFFER_ITERATOR);
498 return lib_ring_buffer_open_read(buf);
499}
500EXPORT_SYMBOL_GPL(lib_ring_buffer_iterator_open);
501
502/*
503 * Note: Iterators must not be mixed with other types of outputs, because an
504 * iterator can leave the buffer in "GET" state, which is not consistent with
505 * other types of output (mmap, splice, raw data read).
506 */
e20c0fec 507void lib_ring_buffer_iterator_release(struct lttng_kernel_ring_buffer *buf)
f3bc08c5
MD
508{
509 lib_ring_buffer_release_read(buf);
510}
511EXPORT_SYMBOL_GPL(lib_ring_buffer_iterator_release);
512
860c213b 513int channel_iterator_open(struct lttng_kernel_ring_buffer_channel *chan)
f3bc08c5 514{
e20c0fec
MD
515 const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config;
516 struct lttng_kernel_ring_buffer *buf;
f3bc08c5
MD
517 int ret = 0, cpu;
518
519 CHAN_WARN_ON(chan, config->output != RING_BUFFER_ITERATOR);
520
521 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
522 get_online_cpus();
523 /* Allow CPU hotplug to keep track of opened reader */
524 chan->iter.read_open = 1;
525 for_each_channel_cpu(cpu, chan) {
526 buf = channel_get_ring_buffer(config, chan, cpu);
527 ret = lib_ring_buffer_iterator_open(buf);
528 if (ret)
529 goto error;
530 buf->iter.read_open = 1;
531 }
532 put_online_cpus();
533 } else {
534 buf = channel_get_ring_buffer(config, chan, 0);
535 ret = lib_ring_buffer_iterator_open(buf);
536 }
537 return ret;
538error:
539 /* Error should always happen on CPU 0, hence no close is required. */
540 CHAN_WARN_ON(chan, cpu != 0);
541 put_online_cpus();
542 return ret;
543}
544EXPORT_SYMBOL_GPL(channel_iterator_open);
545
860c213b 546void channel_iterator_release(struct lttng_kernel_ring_buffer_channel *chan)
f3bc08c5 547{
e20c0fec
MD
548 const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config;
549 struct lttng_kernel_ring_buffer *buf;
f3bc08c5
MD
550 int cpu;
551
552 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
553 get_online_cpus();
554 for_each_channel_cpu(cpu, chan) {
555 buf = channel_get_ring_buffer(config, chan, cpu);
556 if (buf->iter.read_open) {
557 lib_ring_buffer_iterator_release(buf);
558 buf->iter.read_open = 0;
559 }
560 }
561 chan->iter.read_open = 0;
562 put_online_cpus();
563 } else {
564 buf = channel_get_ring_buffer(config, chan, 0);
565 lib_ring_buffer_iterator_release(buf);
566 }
567}
568EXPORT_SYMBOL_GPL(channel_iterator_release);
569
e20c0fec 570void lib_ring_buffer_iterator_reset(struct lttng_kernel_ring_buffer *buf)
f3bc08c5 571{
860c213b 572 struct lttng_kernel_ring_buffer_channel *chan = buf->backend.chan;
f3bc08c5
MD
573
574 if (buf->iter.state != ITER_GET_SUBBUF)
575 lib_ring_buffer_put_next_subbuf(buf);
576 buf->iter.state = ITER_GET_SUBBUF;
577 /* Remove from heap (if present). */
a88db018 578 if (lttng_heap_cherrypick(&chan->iter.heap, buf))
f3bc08c5
MD
579 list_add(&buf->iter.empty_node, &chan->iter.empty_head);
580 buf->iter.timestamp = 0;
581 buf->iter.header_len = 0;
582 buf->iter.payload_len = 0;
583 buf->iter.consumed = 0;
584 buf->iter.read_offset = 0;
585 buf->iter.data_size = 0;
586 /* Don't reset allocated and read_open */
587}
588
860c213b 589void channel_iterator_reset(struct lttng_kernel_ring_buffer_channel *chan)
f3bc08c5 590{
e20c0fec
MD
591 const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config;
592 struct lttng_kernel_ring_buffer *buf;
f3bc08c5
MD
593 int cpu;
594
595 /* Empty heap, put into empty_head */
a88db018 596 while ((buf = lttng_heap_remove(&chan->iter.heap)) != NULL)
f3bc08c5
MD
597 list_add(&buf->iter.empty_node, &chan->iter.empty_head);
598
599 for_each_channel_cpu(cpu, chan) {
600 buf = channel_get_ring_buffer(config, chan, cpu);
601 lib_ring_buffer_iterator_reset(buf);
602 }
603 /* Don't reset read_open */
604 chan->iter.last_qs = 0;
605 chan->iter.last_timestamp = 0;
606 chan->iter.last_cpu = 0;
607 chan->iter.len_left = 0;
608}
609
610/*
611 * Ring buffer payload extraction read() implementation.
612 */
613static
614ssize_t channel_ring_buffer_file_read(struct file *filp,
615 char __user *user_buf,
616 size_t count,
617 loff_t *ppos,
860c213b 618 struct lttng_kernel_ring_buffer_channel *chan,
e20c0fec 619 struct lttng_kernel_ring_buffer *buf,
f3bc08c5
MD
620 int fusionmerge)
621{
e20c0fec 622 const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
623 size_t read_count = 0, read_offset;
624 ssize_t len;
625
626 might_sleep();
80bb2600 627 if (!lttng_access_ok(VERIFY_WRITE, user_buf, count))
f3bc08c5
MD
628 return -EFAULT;
629
630 /* Finish copy of previous record */
631 if (*ppos != 0) {
632 if (read_count < count) {
633 len = chan->iter.len_left;
634 read_offset = *ppos;
635 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU
636 && fusionmerge)
a88db018 637 buf = lttng_heap_maximum(&chan->iter.heap);
f3bc08c5
MD
638 CHAN_WARN_ON(chan, !buf);
639 goto skip_get_next;
640 }
641 }
642
643 while (read_count < count) {
644 size_t copy_len, space_left;
645
646 if (fusionmerge)
647 len = channel_get_next_record(chan, &buf);
648 else
649 len = lib_ring_buffer_get_next_record(chan, buf);
650len_test:
651 if (len < 0) {
652 /*
653 * Check if buffer is finalized (end of file).
654 */
655 if (len == -ENODATA) {
656 /* A 0 read_count will tell about end of file */
657 goto nodata;
658 }
659 if (filp->f_flags & O_NONBLOCK) {
660 if (!read_count)
661 read_count = -EAGAIN;
662 goto nodata;
663 } else {
664 int error;
665
666 /*
667 * No data available at the moment, return what
668 * we got.
669 */
670 if (read_count)
671 goto nodata;
672
673 /*
674 * Wait for returned len to be >= 0 or -ENODATA.
675 */
676 if (fusionmerge)
677 error = wait_event_interruptible(
678 chan->read_wait,
679 ((len = channel_get_next_record(chan,
680 &buf)), len != -EAGAIN));
681 else
682 error = wait_event_interruptible(
683 buf->read_wait,
684 ((len = lib_ring_buffer_get_next_record(
685 chan, buf)), len != -EAGAIN));
686 CHAN_WARN_ON(chan, len == -EBUSY);
687 if (error) {
688 read_count = error;
689 goto nodata;
690 }
691 CHAN_WARN_ON(chan, len < 0 && len != -ENODATA);
692 goto len_test;
693 }
694 }
695 read_offset = buf->iter.read_offset;
696skip_get_next:
697 space_left = count - read_count;
698 if (len <= space_left) {
699 copy_len = len;
700 chan->iter.len_left = 0;
701 *ppos = 0;
702 } else {
703 copy_len = space_left;
704 chan->iter.len_left = len - copy_len;
705 *ppos = read_offset + copy_len;
706 }
707 if (__lib_ring_buffer_copy_to_user(&buf->backend, read_offset,
708 &user_buf[read_count],
709 copy_len)) {
710 /*
711 * Leave the len_left and ppos values at their current
712 * state, as we currently have a valid event to read.
713 */
714 return -EFAULT;
715 }
716 read_count += copy_len;
47d4b31d
MD
717 }
718 goto put_record;
f3bc08c5
MD
719
720nodata:
721 *ppos = 0;
722 chan->iter.len_left = 0;
47d4b31d
MD
723put_record:
724 lib_ring_buffer_put_current_record(buf);
f3bc08c5
MD
725 return read_count;
726}
727
728/**
729 * lib_ring_buffer_file_read - Read buffer record payload.
730 * @filp: file structure pointer.
731 * @buffer: user buffer to read data into.
732 * @count: number of bytes to read.
733 * @ppos: file read position.
734 *
735 * Returns a negative value on error, or the number of bytes read on success.
736 * ppos is used to save the position _within the current record_ between calls
737 * to read().
738 */
739static
740ssize_t lib_ring_buffer_file_read(struct file *filp,
741 char __user *user_buf,
742 size_t count,
743 loff_t *ppos)
744{
b06ed645 745 struct inode *inode = filp->lttng_f_dentry->d_inode;
e20c0fec 746 struct lttng_kernel_ring_buffer *buf = inode->i_private;
860c213b 747 struct lttng_kernel_ring_buffer_channel *chan = buf->backend.chan;
f3bc08c5
MD
748
749 return channel_ring_buffer_file_read(filp, user_buf, count, ppos,
750 chan, buf, 0);
751}
752
753/**
754 * channel_file_read - Read channel record payload.
755 * @filp: file structure pointer.
756 * @buffer: user buffer to read data into.
757 * @count: number of bytes to read.
758 * @ppos: file read position.
759 *
760 * Returns a negative value on error, or the number of bytes read on success.
761 * ppos is used to save the position _within the current record_ between calls
762 * to read().
763 */
764static
765ssize_t channel_file_read(struct file *filp,
766 char __user *user_buf,
767 size_t count,
768 loff_t *ppos)
769{
b06ed645 770 struct inode *inode = filp->lttng_f_dentry->d_inode;
860c213b 771 struct lttng_kernel_ring_buffer_channel *chan = inode->i_private;
e20c0fec 772 const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
773
774 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
775 return channel_ring_buffer_file_read(filp, user_buf, count,
776 ppos, chan, NULL, 1);
777 else {
e20c0fec 778 struct lttng_kernel_ring_buffer *buf =
f3bc08c5
MD
779 channel_get_ring_buffer(config, chan, 0);
780 return channel_ring_buffer_file_read(filp, user_buf, count,
781 ppos, chan, buf, 0);
782 }
783}
784
785static
786int lib_ring_buffer_file_open(struct inode *inode, struct file *file)
787{
e20c0fec 788 struct lttng_kernel_ring_buffer *buf = inode->i_private;
f3bc08c5
MD
789 int ret;
790
791 ret = lib_ring_buffer_iterator_open(buf);
792 if (ret)
793 return ret;
794
795 file->private_data = buf;
796 ret = nonseekable_open(inode, file);
797 if (ret)
798 goto release_iter;
799 return 0;
800
801release_iter:
802 lib_ring_buffer_iterator_release(buf);
803 return ret;
804}
805
806static
807int lib_ring_buffer_file_release(struct inode *inode, struct file *file)
808{
e20c0fec 809 struct lttng_kernel_ring_buffer *buf = inode->i_private;
f3bc08c5
MD
810
811 lib_ring_buffer_iterator_release(buf);
812 return 0;
813}
814
815static
816int channel_file_open(struct inode *inode, struct file *file)
817{
860c213b 818 struct lttng_kernel_ring_buffer_channel *chan = inode->i_private;
f3bc08c5
MD
819 int ret;
820
821 ret = channel_iterator_open(chan);
822 if (ret)
823 return ret;
824
825 file->private_data = chan;
826 ret = nonseekable_open(inode, file);
827 if (ret)
828 goto release_iter;
829 return 0;
830
831release_iter:
832 channel_iterator_release(chan);
833 return ret;
834}
835
836static
837int channel_file_release(struct inode *inode, struct file *file)
838{
860c213b 839 struct lttng_kernel_ring_buffer_channel *chan = inode->i_private;
f3bc08c5
MD
840
841 channel_iterator_release(chan);
842 return 0;
843}
844
845const struct file_operations channel_payload_file_operations = {
a33c9927 846 .owner = THIS_MODULE,
f3bc08c5
MD
847 .open = channel_file_open,
848 .release = channel_file_release,
849 .read = channel_file_read,
d83004aa 850 .llseek = vfs_lib_ring_buffer_no_llseek,
f3bc08c5
MD
851};
852EXPORT_SYMBOL_GPL(channel_payload_file_operations);
853
854const struct file_operations lib_ring_buffer_payload_file_operations = {
a33c9927 855 .owner = THIS_MODULE,
f3bc08c5
MD
856 .open = lib_ring_buffer_file_open,
857 .release = lib_ring_buffer_file_release,
858 .read = lib_ring_buffer_file_read,
d83004aa 859 .llseek = vfs_lib_ring_buffer_no_llseek,
f3bc08c5
MD
860};
861EXPORT_SYMBOL_GPL(lib_ring_buffer_payload_file_operations);
This page took 0.08487 seconds and 4 git commands to generate.