Fix: ACCESS_ONCE() removed in kernel 4.15
[lttng-modules.git] / lib / ringbuffer / ring_buffer_iterator.c
CommitLineData
f3bc08c5
MD
1/*
2 * ring_buffer_iterator.c
3 *
f3bc08c5
MD
4 * Ring buffer and channel iterators. Get each event of a channel in order. Uses
5 * a prio heap for per-cpu buffers, giving a O(log(NR_CPUS)) algorithmic
6 * complexity for the "get next event" operation.
7 *
886d51a3
MD
8 * Copyright (C) 2010-2012 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
9 *
10 * This library is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License as published by the Free Software Foundation; only
13 * version 2.1 of the License.
14 *
15 * This library is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 * Lesser General Public License for more details.
19 *
20 * You should have received a copy of the GNU Lesser General Public
21 * License along with this library; if not, write to the Free Software
22 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
23 *
f3bc08c5
MD
24 * Author:
25 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
f3bc08c5
MD
26 */
27
c075712b
MD
28#include <wrapper/ringbuffer/iterator.h>
29#include <wrapper/file.h>
f3bc08c5
MD
30#include <linux/jiffies.h>
31#include <linux/delay.h>
32#include <linux/module.h>
33
34/*
35 * Safety factor taking into account internal kernel interrupt latency.
36 * Assuming 250ms worse-case latency.
37 */
38#define MAX_SYSTEM_LATENCY 250
39
40/*
41 * Maximum delta expected between trace clocks. At most 1 jiffy delta.
42 */
43#define MAX_CLOCK_DELTA (jiffies_to_usecs(1) * 1000)
44
45/**
46 * lib_ring_buffer_get_next_record - Get the next record in a buffer.
47 * @chan: channel
48 * @buf: buffer
49 *
50 * Returns the size of the event read, -EAGAIN if buffer is empty, -ENODATA if
51 * buffer is empty and finalized. The buffer must already be opened for reading.
52 */
53ssize_t lib_ring_buffer_get_next_record(struct channel *chan,
54 struct lib_ring_buffer *buf)
55{
5a8fd222 56 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
57 struct lib_ring_buffer_iter *iter = &buf->iter;
58 int ret;
59
60restart:
61 switch (iter->state) {
62 case ITER_GET_SUBBUF:
63 ret = lib_ring_buffer_get_next_subbuf(buf);
a8f2d0c7 64 if (ret && !READ_ONCE(buf->finalized)
f3bc08c5
MD
65 && config->alloc == RING_BUFFER_ALLOC_GLOBAL) {
66 /*
67 * Use "pull" scheme for global buffers. The reader
68 * itself flushes the buffer to "pull" data not visible
69 * to readers yet. Flush current subbuffer and re-try.
70 *
71 * Per-CPU buffers rather use a "push" scheme because
72 * the IPI needed to flush all CPU's buffers is too
73 * costly. In the "push" scheme, the reader waits for
da9f3fb7 74 * the writer periodic timer to flush the
f3bc08c5
MD
75 * buffers (keeping track of a quiescent state
76 * timestamp). Therefore, the writer "pushes" data out
77 * of the buffers rather than letting the reader "pull"
78 * data from the buffer.
79 */
80 lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
81 ret = lib_ring_buffer_get_next_subbuf(buf);
82 }
83 if (ret)
84 return ret;
85 iter->consumed = buf->cons_snapshot;
86 iter->data_size = lib_ring_buffer_get_read_data_size(config, buf);
87 iter->read_offset = iter->consumed;
88 /* skip header */
89 iter->read_offset += config->cb.subbuffer_header_size();
90 iter->state = ITER_TEST_RECORD;
91 goto restart;
92 case ITER_TEST_RECORD:
93 if (iter->read_offset - iter->consumed >= iter->data_size) {
94 iter->state = ITER_PUT_SUBBUF;
95 } else {
96 CHAN_WARN_ON(chan, !config->cb.record_get);
97 config->cb.record_get(config, chan, buf,
98 iter->read_offset,
99 &iter->header_len,
100 &iter->payload_len,
101 &iter->timestamp);
102 iter->read_offset += iter->header_len;
103 subbuffer_consume_record(config, &buf->backend);
104 iter->state = ITER_NEXT_RECORD;
105 return iter->payload_len;
106 }
107 goto restart;
108 case ITER_NEXT_RECORD:
109 iter->read_offset += iter->payload_len;
110 iter->state = ITER_TEST_RECORD;
111 goto restart;
112 case ITER_PUT_SUBBUF:
113 lib_ring_buffer_put_next_subbuf(buf);
114 iter->state = ITER_GET_SUBBUF;
115 goto restart;
116 default:
117 CHAN_WARN_ON(chan, 1); /* Should not happen */
118 return -EPERM;
119 }
120}
121EXPORT_SYMBOL_GPL(lib_ring_buffer_get_next_record);
122
123static int buf_is_higher(void *a, void *b)
124{
125 struct lib_ring_buffer *bufa = a;
126 struct lib_ring_buffer *bufb = b;
127
128 /* Consider lowest timestamps to be at the top of the heap */
129 return (bufa->iter.timestamp < bufb->iter.timestamp);
130}
131
132static
133void lib_ring_buffer_get_empty_buf_records(const struct lib_ring_buffer_config *config,
134 struct channel *chan)
135{
a88db018 136 struct lttng_ptr_heap *heap = &chan->iter.heap;
f3bc08c5
MD
137 struct lib_ring_buffer *buf, *tmp;
138 ssize_t len;
139
140 list_for_each_entry_safe(buf, tmp, &chan->iter.empty_head,
141 iter.empty_node) {
142 len = lib_ring_buffer_get_next_record(chan, buf);
143
144 /*
145 * Deal with -EAGAIN and -ENODATA.
146 * len >= 0 means record contains data.
147 * -EBUSY should never happen, because we support only one
148 * reader.
149 */
150 switch (len) {
151 case -EAGAIN:
152 /* Keep node in empty list */
153 break;
154 case -ENODATA:
155 /*
156 * Buffer is finalized. Don't add to list of empty
157 * buffer, because it has no more data to provide, ever.
158 */
159 list_del(&buf->iter.empty_node);
160 break;
161 case -EBUSY:
162 CHAN_WARN_ON(chan, 1);
163 break;
164 default:
165 /*
166 * Insert buffer into the heap, remove from empty buffer
ab2277d6 167 * list.
f3bc08c5
MD
168 */
169 CHAN_WARN_ON(chan, len < 0);
170 list_del(&buf->iter.empty_node);
a88db018 171 CHAN_WARN_ON(chan, lttng_heap_insert(heap, buf));
f3bc08c5
MD
172 }
173 }
174}
175
176static
177void lib_ring_buffer_wait_for_qs(const struct lib_ring_buffer_config *config,
178 struct channel *chan)
179{
180 u64 timestamp_qs;
181 unsigned long wait_msecs;
182
183 /*
184 * No need to wait if no empty buffers are present.
185 */
186 if (list_empty(&chan->iter.empty_head))
187 return;
188
189 timestamp_qs = config->cb.ring_buffer_clock_read(chan);
190 /*
191 * We need to consider previously empty buffers.
192 * Do a get next buf record on each of them. Add them to
193 * the heap if they have data. If at least one of them
194 * don't have data, we need to wait for
195 * switch_timer_interval + MAX_SYSTEM_LATENCY (so we are sure the
196 * buffers have been switched either by the timer or idle entry) and
197 * check them again, adding them if they have data.
198 */
199 lib_ring_buffer_get_empty_buf_records(config, chan);
200
201 /*
202 * No need to wait if no empty buffers are present.
203 */
204 if (list_empty(&chan->iter.empty_head))
205 return;
206
207 /*
208 * We need to wait for the buffer switch timer to run. If the
209 * CPU is idle, idle entry performed the switch.
210 * TODO: we could optimize further by skipping the sleep if all
211 * empty buffers belong to idle or offline cpus.
212 */
213 wait_msecs = jiffies_to_msecs(chan->switch_timer_interval);
214 wait_msecs += MAX_SYSTEM_LATENCY;
215 msleep(wait_msecs);
216 lib_ring_buffer_get_empty_buf_records(config, chan);
217 /*
218 * Any buffer still in the empty list here cannot possibly
219 * contain an event with a timestamp prior to "timestamp_qs".
220 * The new quiescent state timestamp is the one we grabbed
221 * before waiting for buffer data. It is therefore safe to
222 * ignore empty buffers up to last_qs timestamp for fusion
223 * merge.
224 */
225 chan->iter.last_qs = timestamp_qs;
226}
227
228/**
229 * channel_get_next_record - Get the next record in a channel.
230 * @chan: channel
231 * @ret_buf: the buffer in which the event is located (output)
232 *
233 * Returns the size of new current event, -EAGAIN if all buffers are empty,
234 * -ENODATA if all buffers are empty and finalized. The channel must already be
235 * opened for reading.
236 */
237
238ssize_t channel_get_next_record(struct channel *chan,
239 struct lib_ring_buffer **ret_buf)
240{
5a8fd222 241 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5 242 struct lib_ring_buffer *buf;
a88db018 243 struct lttng_ptr_heap *heap;
f3bc08c5
MD
244 ssize_t len;
245
246 if (config->alloc == RING_BUFFER_ALLOC_GLOBAL) {
247 *ret_buf = channel_get_ring_buffer(config, chan, 0);
248 return lib_ring_buffer_get_next_record(chan, *ret_buf);
249 }
250
251 heap = &chan->iter.heap;
252
253 /*
254 * get next record for topmost buffer.
255 */
a88db018 256 buf = lttng_heap_maximum(heap);
f3bc08c5
MD
257 if (buf) {
258 len = lib_ring_buffer_get_next_record(chan, buf);
259 /*
260 * Deal with -EAGAIN and -ENODATA.
261 * len >= 0 means record contains data.
262 */
263 switch (len) {
264 case -EAGAIN:
265 buf->iter.timestamp = 0;
266 list_add(&buf->iter.empty_node, &chan->iter.empty_head);
267 /* Remove topmost buffer from the heap */
a88db018 268 CHAN_WARN_ON(chan, lttng_heap_remove(heap) != buf);
f3bc08c5
MD
269 break;
270 case -ENODATA:
271 /*
272 * Buffer is finalized. Remove buffer from heap and
273 * don't add to list of empty buffer, because it has no
274 * more data to provide, ever.
275 */
a88db018 276 CHAN_WARN_ON(chan, lttng_heap_remove(heap) != buf);
f3bc08c5
MD
277 break;
278 case -EBUSY:
279 CHAN_WARN_ON(chan, 1);
280 break;
281 default:
282 /*
283 * Reinsert buffer into the heap. Note that heap can be
284 * partially empty, so we need to use
a88db018 285 * lttng_heap_replace_max().
f3bc08c5
MD
286 */
287 CHAN_WARN_ON(chan, len < 0);
a88db018 288 CHAN_WARN_ON(chan, lttng_heap_replace_max(heap, buf) != buf);
f3bc08c5
MD
289 break;
290 }
291 }
292
a88db018 293 buf = lttng_heap_maximum(heap);
f3bc08c5
MD
294 if (!buf || buf->iter.timestamp > chan->iter.last_qs) {
295 /*
296 * Deal with buffers previously showing no data.
297 * Add buffers containing data to the heap, update
298 * last_qs.
299 */
300 lib_ring_buffer_wait_for_qs(config, chan);
301 }
302
a88db018 303 *ret_buf = buf = lttng_heap_maximum(heap);
f3bc08c5
MD
304 if (buf) {
305 /*
306 * If this warning triggers, you probably need to check your
307 * system interrupt latency. Typical causes: too many printk()
308 * output going to a serial console with interrupts off.
309 * Allow for MAX_CLOCK_DELTA ns timestamp delta going backward.
310 * Observed on SMP KVM setups with trace_clock().
311 */
312 if (chan->iter.last_timestamp
313 > (buf->iter.timestamp + MAX_CLOCK_DELTA)) {
314 printk(KERN_WARNING "ring_buffer: timestamps going "
315 "backward. Last time %llu ns, cpu %d, "
316 "current time %llu ns, cpu %d, "
317 "delta %llu ns.\n",
318 chan->iter.last_timestamp, chan->iter.last_cpu,
319 buf->iter.timestamp, buf->backend.cpu,
320 chan->iter.last_timestamp - buf->iter.timestamp);
321 CHAN_WARN_ON(chan, 1);
322 }
323 chan->iter.last_timestamp = buf->iter.timestamp;
324 chan->iter.last_cpu = buf->backend.cpu;
325 return buf->iter.payload_len;
326 } else {
327 /* Heap is empty */
328 if (list_empty(&chan->iter.empty_head))
329 return -ENODATA; /* All buffers finalized */
330 else
331 return -EAGAIN; /* Temporarily empty */
332 }
333}
334EXPORT_SYMBOL_GPL(channel_get_next_record);
335
336static
337void lib_ring_buffer_iterator_init(struct channel *chan, struct lib_ring_buffer *buf)
338{
339 if (buf->iter.allocated)
340 return;
341
342 buf->iter.allocated = 1;
343 if (chan->iter.read_open && !buf->iter.read_open) {
344 CHAN_WARN_ON(chan, lib_ring_buffer_open_read(buf) != 0);
345 buf->iter.read_open = 1;
346 }
347
348 /* Add to list of buffers without any current record */
5a8fd222 349 if (chan->backend.config.alloc == RING_BUFFER_ALLOC_PER_CPU)
f3bc08c5
MD
350 list_add(&buf->iter.empty_node, &chan->iter.empty_head);
351}
352
1e367326
MD
353#if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0))
354
355int lttng_cpuhp_rb_iter_online(unsigned int cpu,
356 struct lttng_cpuhp_node *node)
357{
358 struct channel *chan = container_of(node, struct channel,
359 cpuhp_iter_online);
360 struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu);
361 const struct lib_ring_buffer_config *config = &chan->backend.config;
362
363 CHAN_WARN_ON(chan, config->alloc == RING_BUFFER_ALLOC_GLOBAL);
364
365 lib_ring_buffer_iterator_init(chan, buf);
366 return 0;
367}
368EXPORT_SYMBOL_GPL(lttng_cpuhp_rb_iter_online);
369
370#else /* #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
371
f3bc08c5
MD
372#ifdef CONFIG_HOTPLUG_CPU
373static
e8f071d5 374int channel_iterator_cpu_hotplug(struct notifier_block *nb,
f3bc08c5
MD
375 unsigned long action,
376 void *hcpu)
377{
378 unsigned int cpu = (unsigned long)hcpu;
379 struct channel *chan = container_of(nb, struct channel,
380 hp_iter_notifier);
381 struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu);
5a8fd222 382 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
383
384 if (!chan->hp_iter_enable)
385 return NOTIFY_DONE;
386
387 CHAN_WARN_ON(chan, config->alloc == RING_BUFFER_ALLOC_GLOBAL);
388
389 switch (action) {
390 case CPU_DOWN_FAILED:
391 case CPU_DOWN_FAILED_FROZEN:
392 case CPU_ONLINE:
393 case CPU_ONLINE_FROZEN:
394 lib_ring_buffer_iterator_init(chan, buf);
395 return NOTIFY_OK;
396 default:
397 return NOTIFY_DONE;
398 }
399}
400#endif
401
1e367326
MD
402#endif /* #else #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
403
f3bc08c5
MD
404int channel_iterator_init(struct channel *chan)
405{
5a8fd222 406 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
407 struct lib_ring_buffer *buf;
408
409 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
1e367326 410 int ret;
f3bc08c5
MD
411
412 INIT_LIST_HEAD(&chan->iter.empty_head);
a88db018 413 ret = lttng_heap_init(&chan->iter.heap,
41affe31 414 num_possible_cpus(),
f3bc08c5
MD
415 GFP_KERNEL, buf_is_higher);
416 if (ret)
417 return ret;
1e367326
MD
418
419#if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0))
420 chan->cpuhp_iter_online.component = LTTNG_RING_BUFFER_ITER;
421 ret = cpuhp_state_add_instance(lttng_rb_hp_online,
422 &chan->cpuhp_iter_online.node);
423 if (ret)
424 return ret;
425#else /* #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
426 {
427 int cpu;
428
429 /*
430 * In case of non-hotplug cpu, if the ring-buffer is allocated
431 * in early initcall, it will not be notified of secondary cpus.
432 * In that off case, we need to allocate for all possible cpus.
433 */
f3bc08c5 434#ifdef CONFIG_HOTPLUG_CPU
1e367326
MD
435 chan->hp_iter_notifier.notifier_call =
436 channel_iterator_cpu_hotplug;
437 chan->hp_iter_notifier.priority = 10;
438 register_cpu_notifier(&chan->hp_iter_notifier);
439
440 get_online_cpus();
441 for_each_online_cpu(cpu) {
442 buf = per_cpu_ptr(chan->backend.buf, cpu);
443 lib_ring_buffer_iterator_init(chan, buf);
444 }
445 chan->hp_iter_enable = 1;
446 put_online_cpus();
f3bc08c5 447#else
1e367326
MD
448 for_each_possible_cpu(cpu) {
449 buf = per_cpu_ptr(chan->backend.buf, cpu);
450 lib_ring_buffer_iterator_init(chan, buf);
451 }
f3bc08c5 452#endif
1e367326
MD
453 }
454#endif /* #else #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
f3bc08c5
MD
455 } else {
456 buf = channel_get_ring_buffer(config, chan, 0);
457 lib_ring_buffer_iterator_init(chan, buf);
458 }
459 return 0;
460}
461
462void channel_iterator_unregister_notifiers(struct channel *chan)
463{
5a8fd222 464 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
465
466 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
1e367326
MD
467#if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0))
468 {
469 int ret;
470
471 ret = cpuhp_state_remove_instance(lttng_rb_hp_online,
472 &chan->cpuhp_iter_online.node);
473 WARN_ON(ret);
474 }
475#else /* #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
f3bc08c5
MD
476 chan->hp_iter_enable = 0;
477 unregister_cpu_notifier(&chan->hp_iter_notifier);
1e367326 478#endif /* #else #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
f3bc08c5
MD
479 }
480}
481
482void channel_iterator_free(struct channel *chan)
483{
5a8fd222 484 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
485
486 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
a88db018 487 lttng_heap_free(&chan->iter.heap);
f3bc08c5
MD
488}
489
490int lib_ring_buffer_iterator_open(struct lib_ring_buffer *buf)
491{
492 struct channel *chan = buf->backend.chan;
5a8fd222 493 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
494 CHAN_WARN_ON(chan, config->output != RING_BUFFER_ITERATOR);
495 return lib_ring_buffer_open_read(buf);
496}
497EXPORT_SYMBOL_GPL(lib_ring_buffer_iterator_open);
498
499/*
500 * Note: Iterators must not be mixed with other types of outputs, because an
501 * iterator can leave the buffer in "GET" state, which is not consistent with
502 * other types of output (mmap, splice, raw data read).
503 */
504void lib_ring_buffer_iterator_release(struct lib_ring_buffer *buf)
505{
506 lib_ring_buffer_release_read(buf);
507}
508EXPORT_SYMBOL_GPL(lib_ring_buffer_iterator_release);
509
510int channel_iterator_open(struct channel *chan)
511{
5a8fd222 512 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
513 struct lib_ring_buffer *buf;
514 int ret = 0, cpu;
515
516 CHAN_WARN_ON(chan, config->output != RING_BUFFER_ITERATOR);
517
518 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
519 get_online_cpus();
520 /* Allow CPU hotplug to keep track of opened reader */
521 chan->iter.read_open = 1;
522 for_each_channel_cpu(cpu, chan) {
523 buf = channel_get_ring_buffer(config, chan, cpu);
524 ret = lib_ring_buffer_iterator_open(buf);
525 if (ret)
526 goto error;
527 buf->iter.read_open = 1;
528 }
529 put_online_cpus();
530 } else {
531 buf = channel_get_ring_buffer(config, chan, 0);
532 ret = lib_ring_buffer_iterator_open(buf);
533 }
534 return ret;
535error:
536 /* Error should always happen on CPU 0, hence no close is required. */
537 CHAN_WARN_ON(chan, cpu != 0);
538 put_online_cpus();
539 return ret;
540}
541EXPORT_SYMBOL_GPL(channel_iterator_open);
542
543void channel_iterator_release(struct channel *chan)
544{
5a8fd222 545 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
546 struct lib_ring_buffer *buf;
547 int cpu;
548
549 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
550 get_online_cpus();
551 for_each_channel_cpu(cpu, chan) {
552 buf = channel_get_ring_buffer(config, chan, cpu);
553 if (buf->iter.read_open) {
554 lib_ring_buffer_iterator_release(buf);
555 buf->iter.read_open = 0;
556 }
557 }
558 chan->iter.read_open = 0;
559 put_online_cpus();
560 } else {
561 buf = channel_get_ring_buffer(config, chan, 0);
562 lib_ring_buffer_iterator_release(buf);
563 }
564}
565EXPORT_SYMBOL_GPL(channel_iterator_release);
566
567void lib_ring_buffer_iterator_reset(struct lib_ring_buffer *buf)
568{
569 struct channel *chan = buf->backend.chan;
570
571 if (buf->iter.state != ITER_GET_SUBBUF)
572 lib_ring_buffer_put_next_subbuf(buf);
573 buf->iter.state = ITER_GET_SUBBUF;
574 /* Remove from heap (if present). */
a88db018 575 if (lttng_heap_cherrypick(&chan->iter.heap, buf))
f3bc08c5
MD
576 list_add(&buf->iter.empty_node, &chan->iter.empty_head);
577 buf->iter.timestamp = 0;
578 buf->iter.header_len = 0;
579 buf->iter.payload_len = 0;
580 buf->iter.consumed = 0;
581 buf->iter.read_offset = 0;
582 buf->iter.data_size = 0;
583 /* Don't reset allocated and read_open */
584}
585
586void channel_iterator_reset(struct channel *chan)
587{
5a8fd222 588 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
589 struct lib_ring_buffer *buf;
590 int cpu;
591
592 /* Empty heap, put into empty_head */
a88db018 593 while ((buf = lttng_heap_remove(&chan->iter.heap)) != NULL)
f3bc08c5
MD
594 list_add(&buf->iter.empty_node, &chan->iter.empty_head);
595
596 for_each_channel_cpu(cpu, chan) {
597 buf = channel_get_ring_buffer(config, chan, cpu);
598 lib_ring_buffer_iterator_reset(buf);
599 }
600 /* Don't reset read_open */
601 chan->iter.last_qs = 0;
602 chan->iter.last_timestamp = 0;
603 chan->iter.last_cpu = 0;
604 chan->iter.len_left = 0;
605}
606
607/*
608 * Ring buffer payload extraction read() implementation.
609 */
610static
611ssize_t channel_ring_buffer_file_read(struct file *filp,
612 char __user *user_buf,
613 size_t count,
614 loff_t *ppos,
615 struct channel *chan,
616 struct lib_ring_buffer *buf,
617 int fusionmerge)
618{
5a8fd222 619 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
620 size_t read_count = 0, read_offset;
621 ssize_t len;
622
623 might_sleep();
624 if (!access_ok(VERIFY_WRITE, user_buf, count))
625 return -EFAULT;
626
627 /* Finish copy of previous record */
628 if (*ppos != 0) {
629 if (read_count < count) {
630 len = chan->iter.len_left;
631 read_offset = *ppos;
632 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU
633 && fusionmerge)
a88db018 634 buf = lttng_heap_maximum(&chan->iter.heap);
f3bc08c5
MD
635 CHAN_WARN_ON(chan, !buf);
636 goto skip_get_next;
637 }
638 }
639
640 while (read_count < count) {
641 size_t copy_len, space_left;
642
643 if (fusionmerge)
644 len = channel_get_next_record(chan, &buf);
645 else
646 len = lib_ring_buffer_get_next_record(chan, buf);
647len_test:
648 if (len < 0) {
649 /*
650 * Check if buffer is finalized (end of file).
651 */
652 if (len == -ENODATA) {
653 /* A 0 read_count will tell about end of file */
654 goto nodata;
655 }
656 if (filp->f_flags & O_NONBLOCK) {
657 if (!read_count)
658 read_count = -EAGAIN;
659 goto nodata;
660 } else {
661 int error;
662
663 /*
664 * No data available at the moment, return what
665 * we got.
666 */
667 if (read_count)
668 goto nodata;
669
670 /*
671 * Wait for returned len to be >= 0 or -ENODATA.
672 */
673 if (fusionmerge)
674 error = wait_event_interruptible(
675 chan->read_wait,
676 ((len = channel_get_next_record(chan,
677 &buf)), len != -EAGAIN));
678 else
679 error = wait_event_interruptible(
680 buf->read_wait,
681 ((len = lib_ring_buffer_get_next_record(
682 chan, buf)), len != -EAGAIN));
683 CHAN_WARN_ON(chan, len == -EBUSY);
684 if (error) {
685 read_count = error;
686 goto nodata;
687 }
688 CHAN_WARN_ON(chan, len < 0 && len != -ENODATA);
689 goto len_test;
690 }
691 }
692 read_offset = buf->iter.read_offset;
693skip_get_next:
694 space_left = count - read_count;
695 if (len <= space_left) {
696 copy_len = len;
697 chan->iter.len_left = 0;
698 *ppos = 0;
699 } else {
700 copy_len = space_left;
701 chan->iter.len_left = len - copy_len;
702 *ppos = read_offset + copy_len;
703 }
704 if (__lib_ring_buffer_copy_to_user(&buf->backend, read_offset,
705 &user_buf[read_count],
706 copy_len)) {
707 /*
708 * Leave the len_left and ppos values at their current
709 * state, as we currently have a valid event to read.
710 */
711 return -EFAULT;
712 }
713 read_count += copy_len;
714 };
715 return read_count;
716
717nodata:
718 *ppos = 0;
719 chan->iter.len_left = 0;
720 return read_count;
721}
722
723/**
724 * lib_ring_buffer_file_read - Read buffer record payload.
725 * @filp: file structure pointer.
726 * @buffer: user buffer to read data into.
727 * @count: number of bytes to read.
728 * @ppos: file read position.
729 *
730 * Returns a negative value on error, or the number of bytes read on success.
731 * ppos is used to save the position _within the current record_ between calls
732 * to read().
733 */
734static
735ssize_t lib_ring_buffer_file_read(struct file *filp,
736 char __user *user_buf,
737 size_t count,
738 loff_t *ppos)
739{
b06ed645 740 struct inode *inode = filp->lttng_f_dentry->d_inode;
f3bc08c5
MD
741 struct lib_ring_buffer *buf = inode->i_private;
742 struct channel *chan = buf->backend.chan;
743
744 return channel_ring_buffer_file_read(filp, user_buf, count, ppos,
745 chan, buf, 0);
746}
747
748/**
749 * channel_file_read - Read channel record payload.
750 * @filp: file structure pointer.
751 * @buffer: user buffer to read data into.
752 * @count: number of bytes to read.
753 * @ppos: file read position.
754 *
755 * Returns a negative value on error, or the number of bytes read on success.
756 * ppos is used to save the position _within the current record_ between calls
757 * to read().
758 */
759static
760ssize_t channel_file_read(struct file *filp,
761 char __user *user_buf,
762 size_t count,
763 loff_t *ppos)
764{
b06ed645 765 struct inode *inode = filp->lttng_f_dentry->d_inode;
f3bc08c5 766 struct channel *chan = inode->i_private;
5a8fd222 767 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
768
769 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
770 return channel_ring_buffer_file_read(filp, user_buf, count,
771 ppos, chan, NULL, 1);
772 else {
773 struct lib_ring_buffer *buf =
774 channel_get_ring_buffer(config, chan, 0);
775 return channel_ring_buffer_file_read(filp, user_buf, count,
776 ppos, chan, buf, 0);
777 }
778}
779
780static
781int lib_ring_buffer_file_open(struct inode *inode, struct file *file)
782{
783 struct lib_ring_buffer *buf = inode->i_private;
784 int ret;
785
786 ret = lib_ring_buffer_iterator_open(buf);
787 if (ret)
788 return ret;
789
790 file->private_data = buf;
791 ret = nonseekable_open(inode, file);
792 if (ret)
793 goto release_iter;
794 return 0;
795
796release_iter:
797 lib_ring_buffer_iterator_release(buf);
798 return ret;
799}
800
801static
802int lib_ring_buffer_file_release(struct inode *inode, struct file *file)
803{
804 struct lib_ring_buffer *buf = inode->i_private;
805
806 lib_ring_buffer_iterator_release(buf);
807 return 0;
808}
809
810static
811int channel_file_open(struct inode *inode, struct file *file)
812{
813 struct channel *chan = inode->i_private;
814 int ret;
815
816 ret = channel_iterator_open(chan);
817 if (ret)
818 return ret;
819
820 file->private_data = chan;
821 ret = nonseekable_open(inode, file);
822 if (ret)
823 goto release_iter;
824 return 0;
825
826release_iter:
827 channel_iterator_release(chan);
828 return ret;
829}
830
831static
832int channel_file_release(struct inode *inode, struct file *file)
833{
834 struct channel *chan = inode->i_private;
835
836 channel_iterator_release(chan);
837 return 0;
838}
839
840const struct file_operations channel_payload_file_operations = {
a33c9927 841 .owner = THIS_MODULE,
f3bc08c5
MD
842 .open = channel_file_open,
843 .release = channel_file_release,
844 .read = channel_file_read,
d83004aa 845 .llseek = vfs_lib_ring_buffer_no_llseek,
f3bc08c5
MD
846};
847EXPORT_SYMBOL_GPL(channel_payload_file_operations);
848
849const struct file_operations lib_ring_buffer_payload_file_operations = {
a33c9927 850 .owner = THIS_MODULE,
f3bc08c5
MD
851 .open = lib_ring_buffer_file_open,
852 .release = lib_ring_buffer_file_release,
853 .read = lib_ring_buffer_file_read,
d83004aa 854 .llseek = vfs_lib_ring_buffer_no_llseek,
f3bc08c5
MD
855};
856EXPORT_SYMBOL_GPL(lib_ring_buffer_payload_file_operations);
This page took 0.07174 seconds and 4 git commands to generate.