Build libringbuffer, remove kernel includes
[lttng-ust.git] / libringbuffer / ring_buffer_iterator.c
1 /*
2 * ring_buffer_iterator.c
3 *
4 * (C) Copyright 2010 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
5 *
6 * Ring buffer and channel iterators. Get each event of a channel in order. Uses
7 * a prio heap for per-cpu buffers, giving a O(log(NR_CPUS)) algorithmic
8 * complexity for the "get next event" operation.
9 *
10 * Author:
11 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
12 *
13 * Dual LGPL v2.1/GPL v2 license.
14 */
15
16 #include "iterator.h"
17
18 /*
19 * Safety factor taking into account internal kernel interrupt latency.
20 * Assuming 250ms worse-case latency.
21 */
22 #define MAX_SYSTEM_LATENCY 250
23
24 /*
25 * Maximum delta expected between trace clocks. At most 1 jiffy delta.
26 */
27 #define MAX_CLOCK_DELTA (jiffies_to_usecs(1) * 1000)
28
29 /**
30 * lib_ring_buffer_get_next_record - Get the next record in a buffer.
31 * @chan: channel
32 * @buf: buffer
33 *
34 * Returns the size of the event read, -EAGAIN if buffer is empty, -ENODATA if
35 * buffer is empty and finalized. The buffer must already be opened for reading.
36 */
37 ssize_t lib_ring_buffer_get_next_record(struct channel *chan,
38 struct lib_ring_buffer *buf)
39 {
40 const struct lib_ring_buffer_config *config = chan->backend.config;
41 struct lib_ring_buffer_iter *iter = &buf->iter;
42 int ret;
43
44 restart:
45 switch (iter->state) {
46 case ITER_GET_SUBBUF:
47 ret = lib_ring_buffer_get_next_subbuf(buf);
48 if (ret && !ACCESS_ONCE(buf->finalized)
49 && config->alloc == RING_BUFFER_ALLOC_GLOBAL) {
50 /*
51 * Use "pull" scheme for global buffers. The reader
52 * itself flushes the buffer to "pull" data not visible
53 * to readers yet. Flush current subbuffer and re-try.
54 *
55 * Per-CPU buffers rather use a "push" scheme because
56 * the IPI needed to flush all CPU's buffers is too
57 * costly. In the "push" scheme, the reader waits for
58 * the writer periodic deferrable timer to flush the
59 * buffers (keeping track of a quiescent state
60 * timestamp). Therefore, the writer "pushes" data out
61 * of the buffers rather than letting the reader "pull"
62 * data from the buffer.
63 */
64 lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
65 ret = lib_ring_buffer_get_next_subbuf(buf);
66 }
67 if (ret)
68 return ret;
69 iter->consumed = buf->cons_snapshot;
70 iter->data_size = lib_ring_buffer_get_read_data_size(config, buf);
71 iter->read_offset = iter->consumed;
72 /* skip header */
73 iter->read_offset += config->cb.subbuffer_header_size();
74 iter->state = ITER_TEST_RECORD;
75 goto restart;
76 case ITER_TEST_RECORD:
77 if (iter->read_offset - iter->consumed >= iter->data_size) {
78 iter->state = ITER_PUT_SUBBUF;
79 } else {
80 CHAN_WARN_ON(chan, !config->cb.record_get);
81 config->cb.record_get(config, chan, buf,
82 iter->read_offset,
83 &iter->header_len,
84 &iter->payload_len,
85 &iter->timestamp);
86 iter->read_offset += iter->header_len;
87 subbuffer_consume_record(config, &buf->backend);
88 iter->state = ITER_NEXT_RECORD;
89 return iter->payload_len;
90 }
91 goto restart;
92 case ITER_NEXT_RECORD:
93 iter->read_offset += iter->payload_len;
94 iter->state = ITER_TEST_RECORD;
95 goto restart;
96 case ITER_PUT_SUBBUF:
97 lib_ring_buffer_put_next_subbuf(buf);
98 iter->state = ITER_GET_SUBBUF;
99 goto restart;
100 default:
101 CHAN_WARN_ON(chan, 1); /* Should not happen */
102 return -EPERM;
103 }
104 }
105 EXPORT_SYMBOL_GPL(lib_ring_buffer_get_next_record);
106
107 static int buf_is_higher(void *a, void *b)
108 {
109 struct lib_ring_buffer *bufa = a;
110 struct lib_ring_buffer *bufb = b;
111
112 /* Consider lowest timestamps to be at the top of the heap */
113 return (bufa->iter.timestamp < bufb->iter.timestamp);
114 }
115
116 static
117 void lib_ring_buffer_get_empty_buf_records(const struct lib_ring_buffer_config *config,
118 struct channel *chan)
119 {
120 struct lttng_ptr_heap *heap = &chan->iter.heap;
121 struct lib_ring_buffer *buf, *tmp;
122 ssize_t len;
123
124 list_for_each_entry_safe(buf, tmp, &chan->iter.empty_head,
125 iter.empty_node) {
126 len = lib_ring_buffer_get_next_record(chan, buf);
127
128 /*
129 * Deal with -EAGAIN and -ENODATA.
130 * len >= 0 means record contains data.
131 * -EBUSY should never happen, because we support only one
132 * reader.
133 */
134 switch (len) {
135 case -EAGAIN:
136 /* Keep node in empty list */
137 break;
138 case -ENODATA:
139 /*
140 * Buffer is finalized. Don't add to list of empty
141 * buffer, because it has no more data to provide, ever.
142 */
143 list_del(&buf->iter.empty_node);
144 break;
145 case -EBUSY:
146 CHAN_WARN_ON(chan, 1);
147 break;
148 default:
149 /*
150 * Insert buffer into the heap, remove from empty buffer
151 * list.
152 */
153 CHAN_WARN_ON(chan, len < 0);
154 list_del(&buf->iter.empty_node);
155 CHAN_WARN_ON(chan, lttng_heap_insert(heap, buf));
156 }
157 }
158 }
159
160 static
161 void lib_ring_buffer_wait_for_qs(const struct lib_ring_buffer_config *config,
162 struct channel *chan)
163 {
164 u64 timestamp_qs;
165 unsigned long wait_msecs;
166
167 /*
168 * No need to wait if no empty buffers are present.
169 */
170 if (list_empty(&chan->iter.empty_head))
171 return;
172
173 timestamp_qs = config->cb.ring_buffer_clock_read(chan);
174 /*
175 * We need to consider previously empty buffers.
176 * Do a get next buf record on each of them. Add them to
177 * the heap if they have data. If at least one of them
178 * don't have data, we need to wait for
179 * switch_timer_interval + MAX_SYSTEM_LATENCY (so we are sure the
180 * buffers have been switched either by the timer or idle entry) and
181 * check them again, adding them if they have data.
182 */
183 lib_ring_buffer_get_empty_buf_records(config, chan);
184
185 /*
186 * No need to wait if no empty buffers are present.
187 */
188 if (list_empty(&chan->iter.empty_head))
189 return;
190
191 /*
192 * We need to wait for the buffer switch timer to run. If the
193 * CPU is idle, idle entry performed the switch.
194 * TODO: we could optimize further by skipping the sleep if all
195 * empty buffers belong to idle or offline cpus.
196 */
197 wait_msecs = jiffies_to_msecs(chan->switch_timer_interval);
198 wait_msecs += MAX_SYSTEM_LATENCY;
199 msleep(wait_msecs);
200 lib_ring_buffer_get_empty_buf_records(config, chan);
201 /*
202 * Any buffer still in the empty list here cannot possibly
203 * contain an event with a timestamp prior to "timestamp_qs".
204 * The new quiescent state timestamp is the one we grabbed
205 * before waiting for buffer data. It is therefore safe to
206 * ignore empty buffers up to last_qs timestamp for fusion
207 * merge.
208 */
209 chan->iter.last_qs = timestamp_qs;
210 }
211
212 /**
213 * channel_get_next_record - Get the next record in a channel.
214 * @chan: channel
215 * @ret_buf: the buffer in which the event is located (output)
216 *
217 * Returns the size of new current event, -EAGAIN if all buffers are empty,
218 * -ENODATA if all buffers are empty and finalized. The channel must already be
219 * opened for reading.
220 */
221
222 ssize_t channel_get_next_record(struct channel *chan,
223 struct lib_ring_buffer **ret_buf)
224 {
225 const struct lib_ring_buffer_config *config = chan->backend.config;
226 struct lib_ring_buffer *buf;
227 struct lttng_ptr_heap *heap;
228 ssize_t len;
229
230 if (config->alloc == RING_BUFFER_ALLOC_GLOBAL) {
231 *ret_buf = channel_get_ring_buffer(config, chan, 0);
232 return lib_ring_buffer_get_next_record(chan, *ret_buf);
233 }
234
235 heap = &chan->iter.heap;
236
237 /*
238 * get next record for topmost buffer.
239 */
240 buf = lttng_heap_maximum(heap);
241 if (buf) {
242 len = lib_ring_buffer_get_next_record(chan, buf);
243 /*
244 * Deal with -EAGAIN and -ENODATA.
245 * len >= 0 means record contains data.
246 */
247 switch (len) {
248 case -EAGAIN:
249 buf->iter.timestamp = 0;
250 list_add(&buf->iter.empty_node, &chan->iter.empty_head);
251 /* Remove topmost buffer from the heap */
252 CHAN_WARN_ON(chan, lttng_heap_remove(heap) != buf);
253 break;
254 case -ENODATA:
255 /*
256 * Buffer is finalized. Remove buffer from heap and
257 * don't add to list of empty buffer, because it has no
258 * more data to provide, ever.
259 */
260 CHAN_WARN_ON(chan, lttng_heap_remove(heap) != buf);
261 break;
262 case -EBUSY:
263 CHAN_WARN_ON(chan, 1);
264 break;
265 default:
266 /*
267 * Reinsert buffer into the heap. Note that heap can be
268 * partially empty, so we need to use
269 * lttng_heap_replace_max().
270 */
271 CHAN_WARN_ON(chan, len < 0);
272 CHAN_WARN_ON(chan, lttng_heap_replace_max(heap, buf) != buf);
273 break;
274 }
275 }
276
277 buf = lttng_heap_maximum(heap);
278 if (!buf || buf->iter.timestamp > chan->iter.last_qs) {
279 /*
280 * Deal with buffers previously showing no data.
281 * Add buffers containing data to the heap, update
282 * last_qs.
283 */
284 lib_ring_buffer_wait_for_qs(config, chan);
285 }
286
287 *ret_buf = buf = lttng_heap_maximum(heap);
288 if (buf) {
289 /*
290 * If this warning triggers, you probably need to check your
291 * system interrupt latency. Typical causes: too many printk()
292 * output going to a serial console with interrupts off.
293 * Allow for MAX_CLOCK_DELTA ns timestamp delta going backward.
294 * Observed on SMP KVM setups with trace_clock().
295 */
296 if (chan->iter.last_timestamp
297 > (buf->iter.timestamp + MAX_CLOCK_DELTA)) {
298 printk(KERN_WARNING "ring_buffer: timestamps going "
299 "backward. Last time %llu ns, cpu %d, "
300 "current time %llu ns, cpu %d, "
301 "delta %llu ns.\n",
302 chan->iter.last_timestamp, chan->iter.last_cpu,
303 buf->iter.timestamp, buf->backend.cpu,
304 chan->iter.last_timestamp - buf->iter.timestamp);
305 CHAN_WARN_ON(chan, 1);
306 }
307 chan->iter.last_timestamp = buf->iter.timestamp;
308 chan->iter.last_cpu = buf->backend.cpu;
309 return buf->iter.payload_len;
310 } else {
311 /* Heap is empty */
312 if (list_empty(&chan->iter.empty_head))
313 return -ENODATA; /* All buffers finalized */
314 else
315 return -EAGAIN; /* Temporarily empty */
316 }
317 }
318 EXPORT_SYMBOL_GPL(channel_get_next_record);
319
320 static
321 void lib_ring_buffer_iterator_init(struct channel *chan, struct lib_ring_buffer *buf)
322 {
323 if (buf->iter.allocated)
324 return;
325
326 buf->iter.allocated = 1;
327 if (chan->iter.read_open && !buf->iter.read_open) {
328 CHAN_WARN_ON(chan, lib_ring_buffer_open_read(buf) != 0);
329 buf->iter.read_open = 1;
330 }
331
332 /* Add to list of buffers without any current record */
333 if (chan->backend.config->alloc == RING_BUFFER_ALLOC_PER_CPU)
334 list_add(&buf->iter.empty_node, &chan->iter.empty_head);
335 }
336
337 #ifdef CONFIG_HOTPLUG_CPU
338 static
339 int __cpuinit channel_iterator_cpu_hotplug(struct notifier_block *nb,
340 unsigned long action,
341 void *hcpu)
342 {
343 unsigned int cpu = (unsigned long)hcpu;
344 struct channel *chan = container_of(nb, struct channel,
345 hp_iter_notifier);
346 struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu);
347 const struct lib_ring_buffer_config *config = chan->backend.config;
348
349 if (!chan->hp_iter_enable)
350 return NOTIFY_DONE;
351
352 CHAN_WARN_ON(chan, config->alloc == RING_BUFFER_ALLOC_GLOBAL);
353
354 switch (action) {
355 case CPU_DOWN_FAILED:
356 case CPU_DOWN_FAILED_FROZEN:
357 case CPU_ONLINE:
358 case CPU_ONLINE_FROZEN:
359 lib_ring_buffer_iterator_init(chan, buf);
360 return NOTIFY_OK;
361 default:
362 return NOTIFY_DONE;
363 }
364 }
365 #endif
366
367 int channel_iterator_init(struct channel *chan)
368 {
369 const struct lib_ring_buffer_config *config = chan->backend.config;
370 struct lib_ring_buffer *buf;
371
372 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
373 int cpu, ret;
374
375 INIT_LIST_HEAD(&chan->iter.empty_head);
376 ret = lttng_heap_init(&chan->iter.heap,
377 num_possible_cpus(),
378 GFP_KERNEL, buf_is_higher);
379 if (ret)
380 return ret;
381 /*
382 * In case of non-hotplug cpu, if the ring-buffer is allocated
383 * in early initcall, it will not be notified of secondary cpus.
384 * In that off case, we need to allocate for all possible cpus.
385 */
386 #ifdef CONFIG_HOTPLUG_CPU
387 chan->hp_iter_notifier.notifier_call =
388 channel_iterator_cpu_hotplug;
389 chan->hp_iter_notifier.priority = 10;
390 register_cpu_notifier(&chan->hp_iter_notifier);
391 get_online_cpus();
392 for_each_online_cpu(cpu) {
393 buf = per_cpu_ptr(chan->backend.buf, cpu);
394 lib_ring_buffer_iterator_init(chan, buf);
395 }
396 chan->hp_iter_enable = 1;
397 put_online_cpus();
398 #else
399 for_each_possible_cpu(cpu) {
400 buf = per_cpu_ptr(chan->backend.buf, cpu);
401 lib_ring_buffer_iterator_init(chan, buf);
402 }
403 #endif
404 } else {
405 buf = channel_get_ring_buffer(config, chan, 0);
406 lib_ring_buffer_iterator_init(chan, buf);
407 }
408 return 0;
409 }
410
411 void channel_iterator_unregister_notifiers(struct channel *chan)
412 {
413 const struct lib_ring_buffer_config *config = chan->backend.config;
414
415 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
416 chan->hp_iter_enable = 0;
417 unregister_cpu_notifier(&chan->hp_iter_notifier);
418 }
419 }
420
421 void channel_iterator_free(struct channel *chan)
422 {
423 const struct lib_ring_buffer_config *config = chan->backend.config;
424
425 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
426 lttng_heap_free(&chan->iter.heap);
427 }
428
429 int lib_ring_buffer_iterator_open(struct lib_ring_buffer *buf)
430 {
431 struct channel *chan = buf->backend.chan;
432 const struct lib_ring_buffer_config *config = chan->backend.config;
433 CHAN_WARN_ON(chan, config->output != RING_BUFFER_ITERATOR);
434 return lib_ring_buffer_open_read(buf);
435 }
436 EXPORT_SYMBOL_GPL(lib_ring_buffer_iterator_open);
437
438 /*
439 * Note: Iterators must not be mixed with other types of outputs, because an
440 * iterator can leave the buffer in "GET" state, which is not consistent with
441 * other types of output (mmap, splice, raw data read).
442 */
443 void lib_ring_buffer_iterator_release(struct lib_ring_buffer *buf)
444 {
445 lib_ring_buffer_release_read(buf);
446 }
447 EXPORT_SYMBOL_GPL(lib_ring_buffer_iterator_release);
448
449 int channel_iterator_open(struct channel *chan)
450 {
451 const struct lib_ring_buffer_config *config = chan->backend.config;
452 struct lib_ring_buffer *buf;
453 int ret = 0, cpu;
454
455 CHAN_WARN_ON(chan, config->output != RING_BUFFER_ITERATOR);
456
457 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
458 get_online_cpus();
459 /* Allow CPU hotplug to keep track of opened reader */
460 chan->iter.read_open = 1;
461 for_each_channel_cpu(cpu, chan) {
462 buf = channel_get_ring_buffer(config, chan, cpu);
463 ret = lib_ring_buffer_iterator_open(buf);
464 if (ret)
465 goto error;
466 buf->iter.read_open = 1;
467 }
468 put_online_cpus();
469 } else {
470 buf = channel_get_ring_buffer(config, chan, 0);
471 ret = lib_ring_buffer_iterator_open(buf);
472 }
473 return ret;
474 error:
475 /* Error should always happen on CPU 0, hence no close is required. */
476 CHAN_WARN_ON(chan, cpu != 0);
477 put_online_cpus();
478 return ret;
479 }
480 EXPORT_SYMBOL_GPL(channel_iterator_open);
481
482 void channel_iterator_release(struct channel *chan)
483 {
484 const struct lib_ring_buffer_config *config = chan->backend.config;
485 struct lib_ring_buffer *buf;
486 int cpu;
487
488 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
489 get_online_cpus();
490 for_each_channel_cpu(cpu, chan) {
491 buf = channel_get_ring_buffer(config, chan, cpu);
492 if (buf->iter.read_open) {
493 lib_ring_buffer_iterator_release(buf);
494 buf->iter.read_open = 0;
495 }
496 }
497 chan->iter.read_open = 0;
498 put_online_cpus();
499 } else {
500 buf = channel_get_ring_buffer(config, chan, 0);
501 lib_ring_buffer_iterator_release(buf);
502 }
503 }
504 EXPORT_SYMBOL_GPL(channel_iterator_release);
505
506 void lib_ring_buffer_iterator_reset(struct lib_ring_buffer *buf)
507 {
508 struct channel *chan = buf->backend.chan;
509
510 if (buf->iter.state != ITER_GET_SUBBUF)
511 lib_ring_buffer_put_next_subbuf(buf);
512 buf->iter.state = ITER_GET_SUBBUF;
513 /* Remove from heap (if present). */
514 if (lttng_heap_cherrypick(&chan->iter.heap, buf))
515 list_add(&buf->iter.empty_node, &chan->iter.empty_head);
516 buf->iter.timestamp = 0;
517 buf->iter.header_len = 0;
518 buf->iter.payload_len = 0;
519 buf->iter.consumed = 0;
520 buf->iter.read_offset = 0;
521 buf->iter.data_size = 0;
522 /* Don't reset allocated and read_open */
523 }
524
525 void channel_iterator_reset(struct channel *chan)
526 {
527 const struct lib_ring_buffer_config *config = chan->backend.config;
528 struct lib_ring_buffer *buf;
529 int cpu;
530
531 /* Empty heap, put into empty_head */
532 while ((buf = lttng_heap_remove(&chan->iter.heap)) != NULL)
533 list_add(&buf->iter.empty_node, &chan->iter.empty_head);
534
535 for_each_channel_cpu(cpu, chan) {
536 buf = channel_get_ring_buffer(config, chan, cpu);
537 lib_ring_buffer_iterator_reset(buf);
538 }
539 /* Don't reset read_open */
540 chan->iter.last_qs = 0;
541 chan->iter.last_timestamp = 0;
542 chan->iter.last_cpu = 0;
543 chan->iter.len_left = 0;
544 }
545
546 /*
547 * Ring buffer payload extraction read() implementation.
548 */
549 static
550 ssize_t channel_ring_buffer_file_read(struct file *filp,
551 char __user *user_buf,
552 size_t count,
553 loff_t *ppos,
554 struct channel *chan,
555 struct lib_ring_buffer *buf,
556 int fusionmerge)
557 {
558 const struct lib_ring_buffer_config *config = chan->backend.config;
559 size_t read_count = 0, read_offset;
560 ssize_t len;
561
562 might_sleep();
563 if (!access_ok(VERIFY_WRITE, user_buf, count))
564 return -EFAULT;
565
566 /* Finish copy of previous record */
567 if (*ppos != 0) {
568 if (read_count < count) {
569 len = chan->iter.len_left;
570 read_offset = *ppos;
571 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU
572 && fusionmerge)
573 buf = lttng_heap_maximum(&chan->iter.heap);
574 CHAN_WARN_ON(chan, !buf);
575 goto skip_get_next;
576 }
577 }
578
579 while (read_count < count) {
580 size_t copy_len, space_left;
581
582 if (fusionmerge)
583 len = channel_get_next_record(chan, &buf);
584 else
585 len = lib_ring_buffer_get_next_record(chan, buf);
586 len_test:
587 if (len < 0) {
588 /*
589 * Check if buffer is finalized (end of file).
590 */
591 if (len == -ENODATA) {
592 /* A 0 read_count will tell about end of file */
593 goto nodata;
594 }
595 if (filp->f_flags & O_NONBLOCK) {
596 if (!read_count)
597 read_count = -EAGAIN;
598 goto nodata;
599 } else {
600 int error;
601
602 /*
603 * No data available at the moment, return what
604 * we got.
605 */
606 if (read_count)
607 goto nodata;
608
609 /*
610 * Wait for returned len to be >= 0 or -ENODATA.
611 */
612 if (fusionmerge)
613 error = wait_event_interruptible(
614 chan->read_wait,
615 ((len = channel_get_next_record(chan,
616 &buf)), len != -EAGAIN));
617 else
618 error = wait_event_interruptible(
619 buf->read_wait,
620 ((len = lib_ring_buffer_get_next_record(
621 chan, buf)), len != -EAGAIN));
622 CHAN_WARN_ON(chan, len == -EBUSY);
623 if (error) {
624 read_count = error;
625 goto nodata;
626 }
627 CHAN_WARN_ON(chan, len < 0 && len != -ENODATA);
628 goto len_test;
629 }
630 }
631 read_offset = buf->iter.read_offset;
632 skip_get_next:
633 space_left = count - read_count;
634 if (len <= space_left) {
635 copy_len = len;
636 chan->iter.len_left = 0;
637 *ppos = 0;
638 } else {
639 copy_len = space_left;
640 chan->iter.len_left = len - copy_len;
641 *ppos = read_offset + copy_len;
642 }
643 if (__lib_ring_buffer_copy_to_user(&buf->backend, read_offset,
644 &user_buf[read_count],
645 copy_len)) {
646 /*
647 * Leave the len_left and ppos values at their current
648 * state, as we currently have a valid event to read.
649 */
650 return -EFAULT;
651 }
652 read_count += copy_len;
653 };
654 return read_count;
655
656 nodata:
657 *ppos = 0;
658 chan->iter.len_left = 0;
659 return read_count;
660 }
661
662 /**
663 * lib_ring_buffer_file_read - Read buffer record payload.
664 * @filp: file structure pointer.
665 * @buffer: user buffer to read data into.
666 * @count: number of bytes to read.
667 * @ppos: file read position.
668 *
669 * Returns a negative value on error, or the number of bytes read on success.
670 * ppos is used to save the position _within the current record_ between calls
671 * to read().
672 */
673 static
674 ssize_t lib_ring_buffer_file_read(struct file *filp,
675 char __user *user_buf,
676 size_t count,
677 loff_t *ppos)
678 {
679 struct inode *inode = filp->f_dentry->d_inode;
680 struct lib_ring_buffer *buf = inode->i_private;
681 struct channel *chan = buf->backend.chan;
682
683 return channel_ring_buffer_file_read(filp, user_buf, count, ppos,
684 chan, buf, 0);
685 }
686
687 /**
688 * channel_file_read - Read channel record payload.
689 * @filp: file structure pointer.
690 * @buffer: user buffer to read data into.
691 * @count: number of bytes to read.
692 * @ppos: file read position.
693 *
694 * Returns a negative value on error, or the number of bytes read on success.
695 * ppos is used to save the position _within the current record_ between calls
696 * to read().
697 */
698 static
699 ssize_t channel_file_read(struct file *filp,
700 char __user *user_buf,
701 size_t count,
702 loff_t *ppos)
703 {
704 struct inode *inode = filp->f_dentry->d_inode;
705 struct channel *chan = inode->i_private;
706 const struct lib_ring_buffer_config *config = chan->backend.config;
707
708 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
709 return channel_ring_buffer_file_read(filp, user_buf, count,
710 ppos, chan, NULL, 1);
711 else {
712 struct lib_ring_buffer *buf =
713 channel_get_ring_buffer(config, chan, 0);
714 return channel_ring_buffer_file_read(filp, user_buf, count,
715 ppos, chan, buf, 0);
716 }
717 }
718
719 static
720 int lib_ring_buffer_file_open(struct inode *inode, struct file *file)
721 {
722 struct lib_ring_buffer *buf = inode->i_private;
723 int ret;
724
725 ret = lib_ring_buffer_iterator_open(buf);
726 if (ret)
727 return ret;
728
729 file->private_data = buf;
730 ret = nonseekable_open(inode, file);
731 if (ret)
732 goto release_iter;
733 return 0;
734
735 release_iter:
736 lib_ring_buffer_iterator_release(buf);
737 return ret;
738 }
739
740 static
741 int lib_ring_buffer_file_release(struct inode *inode, struct file *file)
742 {
743 struct lib_ring_buffer *buf = inode->i_private;
744
745 lib_ring_buffer_iterator_release(buf);
746 return 0;
747 }
748
749 static
750 int channel_file_open(struct inode *inode, struct file *file)
751 {
752 struct channel *chan = inode->i_private;
753 int ret;
754
755 ret = channel_iterator_open(chan);
756 if (ret)
757 return ret;
758
759 file->private_data = chan;
760 ret = nonseekable_open(inode, file);
761 if (ret)
762 goto release_iter;
763 return 0;
764
765 release_iter:
766 channel_iterator_release(chan);
767 return ret;
768 }
769
770 static
771 int channel_file_release(struct inode *inode, struct file *file)
772 {
773 struct channel *chan = inode->i_private;
774
775 channel_iterator_release(chan);
776 return 0;
777 }
778
779 const struct file_operations channel_payload_file_operations = {
780 .open = channel_file_open,
781 .release = channel_file_release,
782 .read = channel_file_read,
783 .llseek = lib_ring_buffer_no_llseek,
784 };
785 EXPORT_SYMBOL_GPL(channel_payload_file_operations);
786
787 const struct file_operations lib_ring_buffer_payload_file_operations = {
788 .open = lib_ring_buffer_file_open,
789 .release = lib_ring_buffer_file_release,
790 .read = lib_ring_buffer_file_read,
791 .llseek = lib_ring_buffer_no_llseek,
792 };
793 EXPORT_SYMBOL_GPL(lib_ring_buffer_payload_file_operations);
This page took 0.060946 seconds and 4 git commands to generate.