Fix: ACCESS_ONCE() removed in kernel 4.15
[lttng-modules.git] / lib / ringbuffer / ring_buffer_iterator.c
1 /*
2 * ring_buffer_iterator.c
3 *
4 * Ring buffer and channel iterators. Get each event of a channel in order. Uses
5 * a prio heap for per-cpu buffers, giving a O(log(NR_CPUS)) algorithmic
6 * complexity for the "get next event" operation.
7 *
8 * Copyright (C) 2010-2012 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
9 *
10 * This library is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License as published by the Free Software Foundation; only
13 * version 2.1 of the License.
14 *
15 * This library is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 * Lesser General Public License for more details.
19 *
20 * You should have received a copy of the GNU Lesser General Public
21 * License along with this library; if not, write to the Free Software
22 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
23 *
24 * Author:
25 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
26 */
27
28 #include <wrapper/ringbuffer/iterator.h>
29 #include <wrapper/file.h>
30 #include <linux/jiffies.h>
31 #include <linux/delay.h>
32 #include <linux/module.h>
33
34 /*
35 * Safety factor taking into account internal kernel interrupt latency.
36 * Assuming 250ms worse-case latency.
37 */
38 #define MAX_SYSTEM_LATENCY 250
39
40 /*
41 * Maximum delta expected between trace clocks. At most 1 jiffy delta.
42 */
43 #define MAX_CLOCK_DELTA (jiffies_to_usecs(1) * 1000)
44
45 /**
46 * lib_ring_buffer_get_next_record - Get the next record in a buffer.
47 * @chan: channel
48 * @buf: buffer
49 *
50 * Returns the size of the event read, -EAGAIN if buffer is empty, -ENODATA if
51 * buffer is empty and finalized. The buffer must already be opened for reading.
52 */
53 ssize_t lib_ring_buffer_get_next_record(struct channel *chan,
54 struct lib_ring_buffer *buf)
55 {
56 const struct lib_ring_buffer_config *config = &chan->backend.config;
57 struct lib_ring_buffer_iter *iter = &buf->iter;
58 int ret;
59
60 restart:
61 switch (iter->state) {
62 case ITER_GET_SUBBUF:
63 ret = lib_ring_buffer_get_next_subbuf(buf);
64 if (ret && !READ_ONCE(buf->finalized)
65 && config->alloc == RING_BUFFER_ALLOC_GLOBAL) {
66 /*
67 * Use "pull" scheme for global buffers. The reader
68 * itself flushes the buffer to "pull" data not visible
69 * to readers yet. Flush current subbuffer and re-try.
70 *
71 * Per-CPU buffers rather use a "push" scheme because
72 * the IPI needed to flush all CPU's buffers is too
73 * costly. In the "push" scheme, the reader waits for
74 * the writer periodic timer to flush the
75 * buffers (keeping track of a quiescent state
76 * timestamp). Therefore, the writer "pushes" data out
77 * of the buffers rather than letting the reader "pull"
78 * data from the buffer.
79 */
80 lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
81 ret = lib_ring_buffer_get_next_subbuf(buf);
82 }
83 if (ret)
84 return ret;
85 iter->consumed = buf->cons_snapshot;
86 iter->data_size = lib_ring_buffer_get_read_data_size(config, buf);
87 iter->read_offset = iter->consumed;
88 /* skip header */
89 iter->read_offset += config->cb.subbuffer_header_size();
90 iter->state = ITER_TEST_RECORD;
91 goto restart;
92 case ITER_TEST_RECORD:
93 if (iter->read_offset - iter->consumed >= iter->data_size) {
94 iter->state = ITER_PUT_SUBBUF;
95 } else {
96 CHAN_WARN_ON(chan, !config->cb.record_get);
97 config->cb.record_get(config, chan, buf,
98 iter->read_offset,
99 &iter->header_len,
100 &iter->payload_len,
101 &iter->timestamp);
102 iter->read_offset += iter->header_len;
103 subbuffer_consume_record(config, &buf->backend);
104 iter->state = ITER_NEXT_RECORD;
105 return iter->payload_len;
106 }
107 goto restart;
108 case ITER_NEXT_RECORD:
109 iter->read_offset += iter->payload_len;
110 iter->state = ITER_TEST_RECORD;
111 goto restart;
112 case ITER_PUT_SUBBUF:
113 lib_ring_buffer_put_next_subbuf(buf);
114 iter->state = ITER_GET_SUBBUF;
115 goto restart;
116 default:
117 CHAN_WARN_ON(chan, 1); /* Should not happen */
118 return -EPERM;
119 }
120 }
121 EXPORT_SYMBOL_GPL(lib_ring_buffer_get_next_record);
122
123 static int buf_is_higher(void *a, void *b)
124 {
125 struct lib_ring_buffer *bufa = a;
126 struct lib_ring_buffer *bufb = b;
127
128 /* Consider lowest timestamps to be at the top of the heap */
129 return (bufa->iter.timestamp < bufb->iter.timestamp);
130 }
131
132 static
133 void lib_ring_buffer_get_empty_buf_records(const struct lib_ring_buffer_config *config,
134 struct channel *chan)
135 {
136 struct lttng_ptr_heap *heap = &chan->iter.heap;
137 struct lib_ring_buffer *buf, *tmp;
138 ssize_t len;
139
140 list_for_each_entry_safe(buf, tmp, &chan->iter.empty_head,
141 iter.empty_node) {
142 len = lib_ring_buffer_get_next_record(chan, buf);
143
144 /*
145 * Deal with -EAGAIN and -ENODATA.
146 * len >= 0 means record contains data.
147 * -EBUSY should never happen, because we support only one
148 * reader.
149 */
150 switch (len) {
151 case -EAGAIN:
152 /* Keep node in empty list */
153 break;
154 case -ENODATA:
155 /*
156 * Buffer is finalized. Don't add to list of empty
157 * buffer, because it has no more data to provide, ever.
158 */
159 list_del(&buf->iter.empty_node);
160 break;
161 case -EBUSY:
162 CHAN_WARN_ON(chan, 1);
163 break;
164 default:
165 /*
166 * Insert buffer into the heap, remove from empty buffer
167 * list.
168 */
169 CHAN_WARN_ON(chan, len < 0);
170 list_del(&buf->iter.empty_node);
171 CHAN_WARN_ON(chan, lttng_heap_insert(heap, buf));
172 }
173 }
174 }
175
176 static
177 void lib_ring_buffer_wait_for_qs(const struct lib_ring_buffer_config *config,
178 struct channel *chan)
179 {
180 u64 timestamp_qs;
181 unsigned long wait_msecs;
182
183 /*
184 * No need to wait if no empty buffers are present.
185 */
186 if (list_empty(&chan->iter.empty_head))
187 return;
188
189 timestamp_qs = config->cb.ring_buffer_clock_read(chan);
190 /*
191 * We need to consider previously empty buffers.
192 * Do a get next buf record on each of them. Add them to
193 * the heap if they have data. If at least one of them
194 * don't have data, we need to wait for
195 * switch_timer_interval + MAX_SYSTEM_LATENCY (so we are sure the
196 * buffers have been switched either by the timer or idle entry) and
197 * check them again, adding them if they have data.
198 */
199 lib_ring_buffer_get_empty_buf_records(config, chan);
200
201 /*
202 * No need to wait if no empty buffers are present.
203 */
204 if (list_empty(&chan->iter.empty_head))
205 return;
206
207 /*
208 * We need to wait for the buffer switch timer to run. If the
209 * CPU is idle, idle entry performed the switch.
210 * TODO: we could optimize further by skipping the sleep if all
211 * empty buffers belong to idle or offline cpus.
212 */
213 wait_msecs = jiffies_to_msecs(chan->switch_timer_interval);
214 wait_msecs += MAX_SYSTEM_LATENCY;
215 msleep(wait_msecs);
216 lib_ring_buffer_get_empty_buf_records(config, chan);
217 /*
218 * Any buffer still in the empty list here cannot possibly
219 * contain an event with a timestamp prior to "timestamp_qs".
220 * The new quiescent state timestamp is the one we grabbed
221 * before waiting for buffer data. It is therefore safe to
222 * ignore empty buffers up to last_qs timestamp for fusion
223 * merge.
224 */
225 chan->iter.last_qs = timestamp_qs;
226 }
227
228 /**
229 * channel_get_next_record - Get the next record in a channel.
230 * @chan: channel
231 * @ret_buf: the buffer in which the event is located (output)
232 *
233 * Returns the size of new current event, -EAGAIN if all buffers are empty,
234 * -ENODATA if all buffers are empty and finalized. The channel must already be
235 * opened for reading.
236 */
237
238 ssize_t channel_get_next_record(struct channel *chan,
239 struct lib_ring_buffer **ret_buf)
240 {
241 const struct lib_ring_buffer_config *config = &chan->backend.config;
242 struct lib_ring_buffer *buf;
243 struct lttng_ptr_heap *heap;
244 ssize_t len;
245
246 if (config->alloc == RING_BUFFER_ALLOC_GLOBAL) {
247 *ret_buf = channel_get_ring_buffer(config, chan, 0);
248 return lib_ring_buffer_get_next_record(chan, *ret_buf);
249 }
250
251 heap = &chan->iter.heap;
252
253 /*
254 * get next record for topmost buffer.
255 */
256 buf = lttng_heap_maximum(heap);
257 if (buf) {
258 len = lib_ring_buffer_get_next_record(chan, buf);
259 /*
260 * Deal with -EAGAIN and -ENODATA.
261 * len >= 0 means record contains data.
262 */
263 switch (len) {
264 case -EAGAIN:
265 buf->iter.timestamp = 0;
266 list_add(&buf->iter.empty_node, &chan->iter.empty_head);
267 /* Remove topmost buffer from the heap */
268 CHAN_WARN_ON(chan, lttng_heap_remove(heap) != buf);
269 break;
270 case -ENODATA:
271 /*
272 * Buffer is finalized. Remove buffer from heap and
273 * don't add to list of empty buffer, because it has no
274 * more data to provide, ever.
275 */
276 CHAN_WARN_ON(chan, lttng_heap_remove(heap) != buf);
277 break;
278 case -EBUSY:
279 CHAN_WARN_ON(chan, 1);
280 break;
281 default:
282 /*
283 * Reinsert buffer into the heap. Note that heap can be
284 * partially empty, so we need to use
285 * lttng_heap_replace_max().
286 */
287 CHAN_WARN_ON(chan, len < 0);
288 CHAN_WARN_ON(chan, lttng_heap_replace_max(heap, buf) != buf);
289 break;
290 }
291 }
292
293 buf = lttng_heap_maximum(heap);
294 if (!buf || buf->iter.timestamp > chan->iter.last_qs) {
295 /*
296 * Deal with buffers previously showing no data.
297 * Add buffers containing data to the heap, update
298 * last_qs.
299 */
300 lib_ring_buffer_wait_for_qs(config, chan);
301 }
302
303 *ret_buf = buf = lttng_heap_maximum(heap);
304 if (buf) {
305 /*
306 * If this warning triggers, you probably need to check your
307 * system interrupt latency. Typical causes: too many printk()
308 * output going to a serial console with interrupts off.
309 * Allow for MAX_CLOCK_DELTA ns timestamp delta going backward.
310 * Observed on SMP KVM setups with trace_clock().
311 */
312 if (chan->iter.last_timestamp
313 > (buf->iter.timestamp + MAX_CLOCK_DELTA)) {
314 printk(KERN_WARNING "ring_buffer: timestamps going "
315 "backward. Last time %llu ns, cpu %d, "
316 "current time %llu ns, cpu %d, "
317 "delta %llu ns.\n",
318 chan->iter.last_timestamp, chan->iter.last_cpu,
319 buf->iter.timestamp, buf->backend.cpu,
320 chan->iter.last_timestamp - buf->iter.timestamp);
321 CHAN_WARN_ON(chan, 1);
322 }
323 chan->iter.last_timestamp = buf->iter.timestamp;
324 chan->iter.last_cpu = buf->backend.cpu;
325 return buf->iter.payload_len;
326 } else {
327 /* Heap is empty */
328 if (list_empty(&chan->iter.empty_head))
329 return -ENODATA; /* All buffers finalized */
330 else
331 return -EAGAIN; /* Temporarily empty */
332 }
333 }
334 EXPORT_SYMBOL_GPL(channel_get_next_record);
335
336 static
337 void lib_ring_buffer_iterator_init(struct channel *chan, struct lib_ring_buffer *buf)
338 {
339 if (buf->iter.allocated)
340 return;
341
342 buf->iter.allocated = 1;
343 if (chan->iter.read_open && !buf->iter.read_open) {
344 CHAN_WARN_ON(chan, lib_ring_buffer_open_read(buf) != 0);
345 buf->iter.read_open = 1;
346 }
347
348 /* Add to list of buffers without any current record */
349 if (chan->backend.config.alloc == RING_BUFFER_ALLOC_PER_CPU)
350 list_add(&buf->iter.empty_node, &chan->iter.empty_head);
351 }
352
353 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0))
354
355 int lttng_cpuhp_rb_iter_online(unsigned int cpu,
356 struct lttng_cpuhp_node *node)
357 {
358 struct channel *chan = container_of(node, struct channel,
359 cpuhp_iter_online);
360 struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu);
361 const struct lib_ring_buffer_config *config = &chan->backend.config;
362
363 CHAN_WARN_ON(chan, config->alloc == RING_BUFFER_ALLOC_GLOBAL);
364
365 lib_ring_buffer_iterator_init(chan, buf);
366 return 0;
367 }
368 EXPORT_SYMBOL_GPL(lttng_cpuhp_rb_iter_online);
369
370 #else /* #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
371
372 #ifdef CONFIG_HOTPLUG_CPU
373 static
374 int channel_iterator_cpu_hotplug(struct notifier_block *nb,
375 unsigned long action,
376 void *hcpu)
377 {
378 unsigned int cpu = (unsigned long)hcpu;
379 struct channel *chan = container_of(nb, struct channel,
380 hp_iter_notifier);
381 struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu);
382 const struct lib_ring_buffer_config *config = &chan->backend.config;
383
384 if (!chan->hp_iter_enable)
385 return NOTIFY_DONE;
386
387 CHAN_WARN_ON(chan, config->alloc == RING_BUFFER_ALLOC_GLOBAL);
388
389 switch (action) {
390 case CPU_DOWN_FAILED:
391 case CPU_DOWN_FAILED_FROZEN:
392 case CPU_ONLINE:
393 case CPU_ONLINE_FROZEN:
394 lib_ring_buffer_iterator_init(chan, buf);
395 return NOTIFY_OK;
396 default:
397 return NOTIFY_DONE;
398 }
399 }
400 #endif
401
402 #endif /* #else #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
403
404 int channel_iterator_init(struct channel *chan)
405 {
406 const struct lib_ring_buffer_config *config = &chan->backend.config;
407 struct lib_ring_buffer *buf;
408
409 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
410 int ret;
411
412 INIT_LIST_HEAD(&chan->iter.empty_head);
413 ret = lttng_heap_init(&chan->iter.heap,
414 num_possible_cpus(),
415 GFP_KERNEL, buf_is_higher);
416 if (ret)
417 return ret;
418
419 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0))
420 chan->cpuhp_iter_online.component = LTTNG_RING_BUFFER_ITER;
421 ret = cpuhp_state_add_instance(lttng_rb_hp_online,
422 &chan->cpuhp_iter_online.node);
423 if (ret)
424 return ret;
425 #else /* #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
426 {
427 int cpu;
428
429 /*
430 * In case of non-hotplug cpu, if the ring-buffer is allocated
431 * in early initcall, it will not be notified of secondary cpus.
432 * In that off case, we need to allocate for all possible cpus.
433 */
434 #ifdef CONFIG_HOTPLUG_CPU
435 chan->hp_iter_notifier.notifier_call =
436 channel_iterator_cpu_hotplug;
437 chan->hp_iter_notifier.priority = 10;
438 register_cpu_notifier(&chan->hp_iter_notifier);
439
440 get_online_cpus();
441 for_each_online_cpu(cpu) {
442 buf = per_cpu_ptr(chan->backend.buf, cpu);
443 lib_ring_buffer_iterator_init(chan, buf);
444 }
445 chan->hp_iter_enable = 1;
446 put_online_cpus();
447 #else
448 for_each_possible_cpu(cpu) {
449 buf = per_cpu_ptr(chan->backend.buf, cpu);
450 lib_ring_buffer_iterator_init(chan, buf);
451 }
452 #endif
453 }
454 #endif /* #else #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
455 } else {
456 buf = channel_get_ring_buffer(config, chan, 0);
457 lib_ring_buffer_iterator_init(chan, buf);
458 }
459 return 0;
460 }
461
462 void channel_iterator_unregister_notifiers(struct channel *chan)
463 {
464 const struct lib_ring_buffer_config *config = &chan->backend.config;
465
466 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
467 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0))
468 {
469 int ret;
470
471 ret = cpuhp_state_remove_instance(lttng_rb_hp_online,
472 &chan->cpuhp_iter_online.node);
473 WARN_ON(ret);
474 }
475 #else /* #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
476 chan->hp_iter_enable = 0;
477 unregister_cpu_notifier(&chan->hp_iter_notifier);
478 #endif /* #else #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
479 }
480 }
481
482 void channel_iterator_free(struct channel *chan)
483 {
484 const struct lib_ring_buffer_config *config = &chan->backend.config;
485
486 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
487 lttng_heap_free(&chan->iter.heap);
488 }
489
490 int lib_ring_buffer_iterator_open(struct lib_ring_buffer *buf)
491 {
492 struct channel *chan = buf->backend.chan;
493 const struct lib_ring_buffer_config *config = &chan->backend.config;
494 CHAN_WARN_ON(chan, config->output != RING_BUFFER_ITERATOR);
495 return lib_ring_buffer_open_read(buf);
496 }
497 EXPORT_SYMBOL_GPL(lib_ring_buffer_iterator_open);
498
499 /*
500 * Note: Iterators must not be mixed with other types of outputs, because an
501 * iterator can leave the buffer in "GET" state, which is not consistent with
502 * other types of output (mmap, splice, raw data read).
503 */
504 void lib_ring_buffer_iterator_release(struct lib_ring_buffer *buf)
505 {
506 lib_ring_buffer_release_read(buf);
507 }
508 EXPORT_SYMBOL_GPL(lib_ring_buffer_iterator_release);
509
510 int channel_iterator_open(struct channel *chan)
511 {
512 const struct lib_ring_buffer_config *config = &chan->backend.config;
513 struct lib_ring_buffer *buf;
514 int ret = 0, cpu;
515
516 CHAN_WARN_ON(chan, config->output != RING_BUFFER_ITERATOR);
517
518 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
519 get_online_cpus();
520 /* Allow CPU hotplug to keep track of opened reader */
521 chan->iter.read_open = 1;
522 for_each_channel_cpu(cpu, chan) {
523 buf = channel_get_ring_buffer(config, chan, cpu);
524 ret = lib_ring_buffer_iterator_open(buf);
525 if (ret)
526 goto error;
527 buf->iter.read_open = 1;
528 }
529 put_online_cpus();
530 } else {
531 buf = channel_get_ring_buffer(config, chan, 0);
532 ret = lib_ring_buffer_iterator_open(buf);
533 }
534 return ret;
535 error:
536 /* Error should always happen on CPU 0, hence no close is required. */
537 CHAN_WARN_ON(chan, cpu != 0);
538 put_online_cpus();
539 return ret;
540 }
541 EXPORT_SYMBOL_GPL(channel_iterator_open);
542
543 void channel_iterator_release(struct channel *chan)
544 {
545 const struct lib_ring_buffer_config *config = &chan->backend.config;
546 struct lib_ring_buffer *buf;
547 int cpu;
548
549 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
550 get_online_cpus();
551 for_each_channel_cpu(cpu, chan) {
552 buf = channel_get_ring_buffer(config, chan, cpu);
553 if (buf->iter.read_open) {
554 lib_ring_buffer_iterator_release(buf);
555 buf->iter.read_open = 0;
556 }
557 }
558 chan->iter.read_open = 0;
559 put_online_cpus();
560 } else {
561 buf = channel_get_ring_buffer(config, chan, 0);
562 lib_ring_buffer_iterator_release(buf);
563 }
564 }
565 EXPORT_SYMBOL_GPL(channel_iterator_release);
566
567 void lib_ring_buffer_iterator_reset(struct lib_ring_buffer *buf)
568 {
569 struct channel *chan = buf->backend.chan;
570
571 if (buf->iter.state != ITER_GET_SUBBUF)
572 lib_ring_buffer_put_next_subbuf(buf);
573 buf->iter.state = ITER_GET_SUBBUF;
574 /* Remove from heap (if present). */
575 if (lttng_heap_cherrypick(&chan->iter.heap, buf))
576 list_add(&buf->iter.empty_node, &chan->iter.empty_head);
577 buf->iter.timestamp = 0;
578 buf->iter.header_len = 0;
579 buf->iter.payload_len = 0;
580 buf->iter.consumed = 0;
581 buf->iter.read_offset = 0;
582 buf->iter.data_size = 0;
583 /* Don't reset allocated and read_open */
584 }
585
586 void channel_iterator_reset(struct channel *chan)
587 {
588 const struct lib_ring_buffer_config *config = &chan->backend.config;
589 struct lib_ring_buffer *buf;
590 int cpu;
591
592 /* Empty heap, put into empty_head */
593 while ((buf = lttng_heap_remove(&chan->iter.heap)) != NULL)
594 list_add(&buf->iter.empty_node, &chan->iter.empty_head);
595
596 for_each_channel_cpu(cpu, chan) {
597 buf = channel_get_ring_buffer(config, chan, cpu);
598 lib_ring_buffer_iterator_reset(buf);
599 }
600 /* Don't reset read_open */
601 chan->iter.last_qs = 0;
602 chan->iter.last_timestamp = 0;
603 chan->iter.last_cpu = 0;
604 chan->iter.len_left = 0;
605 }
606
607 /*
608 * Ring buffer payload extraction read() implementation.
609 */
610 static
611 ssize_t channel_ring_buffer_file_read(struct file *filp,
612 char __user *user_buf,
613 size_t count,
614 loff_t *ppos,
615 struct channel *chan,
616 struct lib_ring_buffer *buf,
617 int fusionmerge)
618 {
619 const struct lib_ring_buffer_config *config = &chan->backend.config;
620 size_t read_count = 0, read_offset;
621 ssize_t len;
622
623 might_sleep();
624 if (!access_ok(VERIFY_WRITE, user_buf, count))
625 return -EFAULT;
626
627 /* Finish copy of previous record */
628 if (*ppos != 0) {
629 if (read_count < count) {
630 len = chan->iter.len_left;
631 read_offset = *ppos;
632 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU
633 && fusionmerge)
634 buf = lttng_heap_maximum(&chan->iter.heap);
635 CHAN_WARN_ON(chan, !buf);
636 goto skip_get_next;
637 }
638 }
639
640 while (read_count < count) {
641 size_t copy_len, space_left;
642
643 if (fusionmerge)
644 len = channel_get_next_record(chan, &buf);
645 else
646 len = lib_ring_buffer_get_next_record(chan, buf);
647 len_test:
648 if (len < 0) {
649 /*
650 * Check if buffer is finalized (end of file).
651 */
652 if (len == -ENODATA) {
653 /* A 0 read_count will tell about end of file */
654 goto nodata;
655 }
656 if (filp->f_flags & O_NONBLOCK) {
657 if (!read_count)
658 read_count = -EAGAIN;
659 goto nodata;
660 } else {
661 int error;
662
663 /*
664 * No data available at the moment, return what
665 * we got.
666 */
667 if (read_count)
668 goto nodata;
669
670 /*
671 * Wait for returned len to be >= 0 or -ENODATA.
672 */
673 if (fusionmerge)
674 error = wait_event_interruptible(
675 chan->read_wait,
676 ((len = channel_get_next_record(chan,
677 &buf)), len != -EAGAIN));
678 else
679 error = wait_event_interruptible(
680 buf->read_wait,
681 ((len = lib_ring_buffer_get_next_record(
682 chan, buf)), len != -EAGAIN));
683 CHAN_WARN_ON(chan, len == -EBUSY);
684 if (error) {
685 read_count = error;
686 goto nodata;
687 }
688 CHAN_WARN_ON(chan, len < 0 && len != -ENODATA);
689 goto len_test;
690 }
691 }
692 read_offset = buf->iter.read_offset;
693 skip_get_next:
694 space_left = count - read_count;
695 if (len <= space_left) {
696 copy_len = len;
697 chan->iter.len_left = 0;
698 *ppos = 0;
699 } else {
700 copy_len = space_left;
701 chan->iter.len_left = len - copy_len;
702 *ppos = read_offset + copy_len;
703 }
704 if (__lib_ring_buffer_copy_to_user(&buf->backend, read_offset,
705 &user_buf[read_count],
706 copy_len)) {
707 /*
708 * Leave the len_left and ppos values at their current
709 * state, as we currently have a valid event to read.
710 */
711 return -EFAULT;
712 }
713 read_count += copy_len;
714 };
715 return read_count;
716
717 nodata:
718 *ppos = 0;
719 chan->iter.len_left = 0;
720 return read_count;
721 }
722
723 /**
724 * lib_ring_buffer_file_read - Read buffer record payload.
725 * @filp: file structure pointer.
726 * @buffer: user buffer to read data into.
727 * @count: number of bytes to read.
728 * @ppos: file read position.
729 *
730 * Returns a negative value on error, or the number of bytes read on success.
731 * ppos is used to save the position _within the current record_ between calls
732 * to read().
733 */
734 static
735 ssize_t lib_ring_buffer_file_read(struct file *filp,
736 char __user *user_buf,
737 size_t count,
738 loff_t *ppos)
739 {
740 struct inode *inode = filp->lttng_f_dentry->d_inode;
741 struct lib_ring_buffer *buf = inode->i_private;
742 struct channel *chan = buf->backend.chan;
743
744 return channel_ring_buffer_file_read(filp, user_buf, count, ppos,
745 chan, buf, 0);
746 }
747
748 /**
749 * channel_file_read - Read channel record payload.
750 * @filp: file structure pointer.
751 * @buffer: user buffer to read data into.
752 * @count: number of bytes to read.
753 * @ppos: file read position.
754 *
755 * Returns a negative value on error, or the number of bytes read on success.
756 * ppos is used to save the position _within the current record_ between calls
757 * to read().
758 */
759 static
760 ssize_t channel_file_read(struct file *filp,
761 char __user *user_buf,
762 size_t count,
763 loff_t *ppos)
764 {
765 struct inode *inode = filp->lttng_f_dentry->d_inode;
766 struct channel *chan = inode->i_private;
767 const struct lib_ring_buffer_config *config = &chan->backend.config;
768
769 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
770 return channel_ring_buffer_file_read(filp, user_buf, count,
771 ppos, chan, NULL, 1);
772 else {
773 struct lib_ring_buffer *buf =
774 channel_get_ring_buffer(config, chan, 0);
775 return channel_ring_buffer_file_read(filp, user_buf, count,
776 ppos, chan, buf, 0);
777 }
778 }
779
780 static
781 int lib_ring_buffer_file_open(struct inode *inode, struct file *file)
782 {
783 struct lib_ring_buffer *buf = inode->i_private;
784 int ret;
785
786 ret = lib_ring_buffer_iterator_open(buf);
787 if (ret)
788 return ret;
789
790 file->private_data = buf;
791 ret = nonseekable_open(inode, file);
792 if (ret)
793 goto release_iter;
794 return 0;
795
796 release_iter:
797 lib_ring_buffer_iterator_release(buf);
798 return ret;
799 }
800
801 static
802 int lib_ring_buffer_file_release(struct inode *inode, struct file *file)
803 {
804 struct lib_ring_buffer *buf = inode->i_private;
805
806 lib_ring_buffer_iterator_release(buf);
807 return 0;
808 }
809
810 static
811 int channel_file_open(struct inode *inode, struct file *file)
812 {
813 struct channel *chan = inode->i_private;
814 int ret;
815
816 ret = channel_iterator_open(chan);
817 if (ret)
818 return ret;
819
820 file->private_data = chan;
821 ret = nonseekable_open(inode, file);
822 if (ret)
823 goto release_iter;
824 return 0;
825
826 release_iter:
827 channel_iterator_release(chan);
828 return ret;
829 }
830
831 static
832 int channel_file_release(struct inode *inode, struct file *file)
833 {
834 struct channel *chan = inode->i_private;
835
836 channel_iterator_release(chan);
837 return 0;
838 }
839
840 const struct file_operations channel_payload_file_operations = {
841 .owner = THIS_MODULE,
842 .open = channel_file_open,
843 .release = channel_file_release,
844 .read = channel_file_read,
845 .llseek = vfs_lib_ring_buffer_no_llseek,
846 };
847 EXPORT_SYMBOL_GPL(channel_payload_file_operations);
848
849 const struct file_operations lib_ring_buffer_payload_file_operations = {
850 .owner = THIS_MODULE,
851 .open = lib_ring_buffer_file_open,
852 .release = lib_ring_buffer_file_release,
853 .read = lib_ring_buffer_file_read,
854 .llseek = vfs_lib_ring_buffer_no_llseek,
855 };
856 EXPORT_SYMBOL_GPL(lib_ring_buffer_payload_file_operations);
This page took 0.052905 seconds and 4 git commands to generate.