fix: rcu: Fix data-race due to atomic_t copy-by-value (v5.6)
[lttng-modules.git] / lib / ringbuffer / ring_buffer_iterator.c
CommitLineData
f3bc08c5
MD
1/*
2 * ring_buffer_iterator.c
3 *
f3bc08c5
MD
4 * Ring buffer and channel iterators. Get each event of a channel in order. Uses
5 * a prio heap for per-cpu buffers, giving a O(log(NR_CPUS)) algorithmic
6 * complexity for the "get next event" operation.
7 *
886d51a3
MD
8 * Copyright (C) 2010-2012 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
9 *
10 * This library is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License as published by the Free Software Foundation; only
13 * version 2.1 of the License.
14 *
15 * This library is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 * Lesser General Public License for more details.
19 *
20 * You should have received a copy of the GNU Lesser General Public
21 * License along with this library; if not, write to the Free Software
22 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
23 *
f3bc08c5
MD
24 * Author:
25 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
f3bc08c5
MD
26 */
27
c075712b
MD
28#include <wrapper/ringbuffer/iterator.h>
29#include <wrapper/file.h>
0039dbe9 30#include <wrapper/uaccess.h>
f3bc08c5
MD
31#include <linux/jiffies.h>
32#include <linux/delay.h>
33#include <linux/module.h>
34
35/*
36 * Safety factor taking into account internal kernel interrupt latency.
37 * Assuming 250ms worse-case latency.
38 */
39#define MAX_SYSTEM_LATENCY 250
40
41/*
42 * Maximum delta expected between trace clocks. At most 1 jiffy delta.
43 */
44#define MAX_CLOCK_DELTA (jiffies_to_usecs(1) * 1000)
45
46/**
47 * lib_ring_buffer_get_next_record - Get the next record in a buffer.
48 * @chan: channel
49 * @buf: buffer
50 *
51 * Returns the size of the event read, -EAGAIN if buffer is empty, -ENODATA if
52 * buffer is empty and finalized. The buffer must already be opened for reading.
53 */
54ssize_t lib_ring_buffer_get_next_record(struct channel *chan,
55 struct lib_ring_buffer *buf)
56{
5a8fd222 57 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
58 struct lib_ring_buffer_iter *iter = &buf->iter;
59 int ret;
60
61restart:
62 switch (iter->state) {
63 case ITER_GET_SUBBUF:
64 ret = lib_ring_buffer_get_next_subbuf(buf);
acb98454 65 if (ret && !READ_ONCE(buf->finalized)
f3bc08c5
MD
66 && config->alloc == RING_BUFFER_ALLOC_GLOBAL) {
67 /*
68 * Use "pull" scheme for global buffers. The reader
69 * itself flushes the buffer to "pull" data not visible
70 * to readers yet. Flush current subbuffer and re-try.
71 *
72 * Per-CPU buffers rather use a "push" scheme because
73 * the IPI needed to flush all CPU's buffers is too
74 * costly. In the "push" scheme, the reader waits for
da9f3fb7 75 * the writer periodic timer to flush the
f3bc08c5
MD
76 * buffers (keeping track of a quiescent state
77 * timestamp). Therefore, the writer "pushes" data out
78 * of the buffers rather than letting the reader "pull"
79 * data from the buffer.
80 */
81 lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
82 ret = lib_ring_buffer_get_next_subbuf(buf);
83 }
84 if (ret)
85 return ret;
86 iter->consumed = buf->cons_snapshot;
87 iter->data_size = lib_ring_buffer_get_read_data_size(config, buf);
88 iter->read_offset = iter->consumed;
89 /* skip header */
90 iter->read_offset += config->cb.subbuffer_header_size();
91 iter->state = ITER_TEST_RECORD;
92 goto restart;
93 case ITER_TEST_RECORD:
94 if (iter->read_offset - iter->consumed >= iter->data_size) {
95 iter->state = ITER_PUT_SUBBUF;
96 } else {
97 CHAN_WARN_ON(chan, !config->cb.record_get);
98 config->cb.record_get(config, chan, buf,
99 iter->read_offset,
100 &iter->header_len,
101 &iter->payload_len,
102 &iter->timestamp);
103 iter->read_offset += iter->header_len;
104 subbuffer_consume_record(config, &buf->backend);
105 iter->state = ITER_NEXT_RECORD;
106 return iter->payload_len;
107 }
108 goto restart;
109 case ITER_NEXT_RECORD:
110 iter->read_offset += iter->payload_len;
111 iter->state = ITER_TEST_RECORD;
112 goto restart;
113 case ITER_PUT_SUBBUF:
114 lib_ring_buffer_put_next_subbuf(buf);
115 iter->state = ITER_GET_SUBBUF;
116 goto restart;
117 default:
118 CHAN_WARN_ON(chan, 1); /* Should not happen */
119 return -EPERM;
120 }
121}
122EXPORT_SYMBOL_GPL(lib_ring_buffer_get_next_record);
123
124static int buf_is_higher(void *a, void *b)
125{
126 struct lib_ring_buffer *bufa = a;
127 struct lib_ring_buffer *bufb = b;
128
129 /* Consider lowest timestamps to be at the top of the heap */
130 return (bufa->iter.timestamp < bufb->iter.timestamp);
131}
132
133static
134void lib_ring_buffer_get_empty_buf_records(const struct lib_ring_buffer_config *config,
135 struct channel *chan)
136{
a88db018 137 struct lttng_ptr_heap *heap = &chan->iter.heap;
f3bc08c5
MD
138 struct lib_ring_buffer *buf, *tmp;
139 ssize_t len;
140
141 list_for_each_entry_safe(buf, tmp, &chan->iter.empty_head,
142 iter.empty_node) {
143 len = lib_ring_buffer_get_next_record(chan, buf);
144
145 /*
146 * Deal with -EAGAIN and -ENODATA.
147 * len >= 0 means record contains data.
148 * -EBUSY should never happen, because we support only one
149 * reader.
150 */
151 switch (len) {
152 case -EAGAIN:
153 /* Keep node in empty list */
154 break;
155 case -ENODATA:
156 /*
157 * Buffer is finalized. Don't add to list of empty
158 * buffer, because it has no more data to provide, ever.
159 */
160 list_del(&buf->iter.empty_node);
161 break;
162 case -EBUSY:
163 CHAN_WARN_ON(chan, 1);
164 break;
165 default:
166 /*
167 * Insert buffer into the heap, remove from empty buffer
ab2277d6 168 * list.
f3bc08c5
MD
169 */
170 CHAN_WARN_ON(chan, len < 0);
171 list_del(&buf->iter.empty_node);
a88db018 172 CHAN_WARN_ON(chan, lttng_heap_insert(heap, buf));
f3bc08c5
MD
173 }
174 }
175}
176
177static
178void lib_ring_buffer_wait_for_qs(const struct lib_ring_buffer_config *config,
179 struct channel *chan)
180{
181 u64 timestamp_qs;
182 unsigned long wait_msecs;
183
184 /*
185 * No need to wait if no empty buffers are present.
186 */
187 if (list_empty(&chan->iter.empty_head))
188 return;
189
190 timestamp_qs = config->cb.ring_buffer_clock_read(chan);
191 /*
192 * We need to consider previously empty buffers.
193 * Do a get next buf record on each of them. Add them to
194 * the heap if they have data. If at least one of them
195 * don't have data, we need to wait for
196 * switch_timer_interval + MAX_SYSTEM_LATENCY (so we are sure the
197 * buffers have been switched either by the timer or idle entry) and
198 * check them again, adding them if they have data.
199 */
200 lib_ring_buffer_get_empty_buf_records(config, chan);
201
202 /*
203 * No need to wait if no empty buffers are present.
204 */
205 if (list_empty(&chan->iter.empty_head))
206 return;
207
208 /*
209 * We need to wait for the buffer switch timer to run. If the
210 * CPU is idle, idle entry performed the switch.
211 * TODO: we could optimize further by skipping the sleep if all
212 * empty buffers belong to idle or offline cpus.
213 */
214 wait_msecs = jiffies_to_msecs(chan->switch_timer_interval);
215 wait_msecs += MAX_SYSTEM_LATENCY;
216 msleep(wait_msecs);
217 lib_ring_buffer_get_empty_buf_records(config, chan);
218 /*
219 * Any buffer still in the empty list here cannot possibly
220 * contain an event with a timestamp prior to "timestamp_qs".
221 * The new quiescent state timestamp is the one we grabbed
222 * before waiting for buffer data. It is therefore safe to
223 * ignore empty buffers up to last_qs timestamp for fusion
224 * merge.
225 */
226 chan->iter.last_qs = timestamp_qs;
227}
228
229/**
230 * channel_get_next_record - Get the next record in a channel.
231 * @chan: channel
232 * @ret_buf: the buffer in which the event is located (output)
233 *
234 * Returns the size of new current event, -EAGAIN if all buffers are empty,
235 * -ENODATA if all buffers are empty and finalized. The channel must already be
236 * opened for reading.
237 */
238
239ssize_t channel_get_next_record(struct channel *chan,
240 struct lib_ring_buffer **ret_buf)
241{
5a8fd222 242 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5 243 struct lib_ring_buffer *buf;
a88db018 244 struct lttng_ptr_heap *heap;
f3bc08c5
MD
245 ssize_t len;
246
247 if (config->alloc == RING_BUFFER_ALLOC_GLOBAL) {
248 *ret_buf = channel_get_ring_buffer(config, chan, 0);
249 return lib_ring_buffer_get_next_record(chan, *ret_buf);
250 }
251
252 heap = &chan->iter.heap;
253
254 /*
255 * get next record for topmost buffer.
256 */
a88db018 257 buf = lttng_heap_maximum(heap);
f3bc08c5
MD
258 if (buf) {
259 len = lib_ring_buffer_get_next_record(chan, buf);
260 /*
261 * Deal with -EAGAIN and -ENODATA.
262 * len >= 0 means record contains data.
263 */
264 switch (len) {
265 case -EAGAIN:
266 buf->iter.timestamp = 0;
267 list_add(&buf->iter.empty_node, &chan->iter.empty_head);
268 /* Remove topmost buffer from the heap */
a88db018 269 CHAN_WARN_ON(chan, lttng_heap_remove(heap) != buf);
f3bc08c5
MD
270 break;
271 case -ENODATA:
272 /*
273 * Buffer is finalized. Remove buffer from heap and
274 * don't add to list of empty buffer, because it has no
275 * more data to provide, ever.
276 */
a88db018 277 CHAN_WARN_ON(chan, lttng_heap_remove(heap) != buf);
f3bc08c5
MD
278 break;
279 case -EBUSY:
280 CHAN_WARN_ON(chan, 1);
281 break;
282 default:
283 /*
284 * Reinsert buffer into the heap. Note that heap can be
285 * partially empty, so we need to use
a88db018 286 * lttng_heap_replace_max().
f3bc08c5
MD
287 */
288 CHAN_WARN_ON(chan, len < 0);
a88db018 289 CHAN_WARN_ON(chan, lttng_heap_replace_max(heap, buf) != buf);
f3bc08c5
MD
290 break;
291 }
292 }
293
a88db018 294 buf = lttng_heap_maximum(heap);
f3bc08c5
MD
295 if (!buf || buf->iter.timestamp > chan->iter.last_qs) {
296 /*
297 * Deal with buffers previously showing no data.
298 * Add buffers containing data to the heap, update
299 * last_qs.
300 */
301 lib_ring_buffer_wait_for_qs(config, chan);
302 }
303
a88db018 304 *ret_buf = buf = lttng_heap_maximum(heap);
f3bc08c5
MD
305 if (buf) {
306 /*
307 * If this warning triggers, you probably need to check your
308 * system interrupt latency. Typical causes: too many printk()
309 * output going to a serial console with interrupts off.
310 * Allow for MAX_CLOCK_DELTA ns timestamp delta going backward.
311 * Observed on SMP KVM setups with trace_clock().
312 */
313 if (chan->iter.last_timestamp
314 > (buf->iter.timestamp + MAX_CLOCK_DELTA)) {
315 printk(KERN_WARNING "ring_buffer: timestamps going "
316 "backward. Last time %llu ns, cpu %d, "
317 "current time %llu ns, cpu %d, "
318 "delta %llu ns.\n",
319 chan->iter.last_timestamp, chan->iter.last_cpu,
320 buf->iter.timestamp, buf->backend.cpu,
321 chan->iter.last_timestamp - buf->iter.timestamp);
322 CHAN_WARN_ON(chan, 1);
323 }
324 chan->iter.last_timestamp = buf->iter.timestamp;
325 chan->iter.last_cpu = buf->backend.cpu;
326 return buf->iter.payload_len;
327 } else {
328 /* Heap is empty */
329 if (list_empty(&chan->iter.empty_head))
330 return -ENODATA; /* All buffers finalized */
331 else
332 return -EAGAIN; /* Temporarily empty */
333 }
334}
335EXPORT_SYMBOL_GPL(channel_get_next_record);
336
337static
338void lib_ring_buffer_iterator_init(struct channel *chan, struct lib_ring_buffer *buf)
339{
340 if (buf->iter.allocated)
341 return;
342
343 buf->iter.allocated = 1;
344 if (chan->iter.read_open && !buf->iter.read_open) {
345 CHAN_WARN_ON(chan, lib_ring_buffer_open_read(buf) != 0);
346 buf->iter.read_open = 1;
347 }
348
349 /* Add to list of buffers without any current record */
5a8fd222 350 if (chan->backend.config.alloc == RING_BUFFER_ALLOC_PER_CPU)
f3bc08c5
MD
351 list_add(&buf->iter.empty_node, &chan->iter.empty_head);
352}
353
1e367326
MD
354#if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0))
355
356int lttng_cpuhp_rb_iter_online(unsigned int cpu,
357 struct lttng_cpuhp_node *node)
358{
359 struct channel *chan = container_of(node, struct channel,
360 cpuhp_iter_online);
361 struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu);
362 const struct lib_ring_buffer_config *config = &chan->backend.config;
363
364 CHAN_WARN_ON(chan, config->alloc == RING_BUFFER_ALLOC_GLOBAL);
365
366 lib_ring_buffer_iterator_init(chan, buf);
367 return 0;
368}
369EXPORT_SYMBOL_GPL(lttng_cpuhp_rb_iter_online);
370
371#else /* #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
372
f3bc08c5
MD
373#ifdef CONFIG_HOTPLUG_CPU
374static
e8f071d5 375int channel_iterator_cpu_hotplug(struct notifier_block *nb,
f3bc08c5
MD
376 unsigned long action,
377 void *hcpu)
378{
379 unsigned int cpu = (unsigned long)hcpu;
380 struct channel *chan = container_of(nb, struct channel,
381 hp_iter_notifier);
382 struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu);
5a8fd222 383 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
384
385 if (!chan->hp_iter_enable)
386 return NOTIFY_DONE;
387
388 CHAN_WARN_ON(chan, config->alloc == RING_BUFFER_ALLOC_GLOBAL);
389
390 switch (action) {
391 case CPU_DOWN_FAILED:
392 case CPU_DOWN_FAILED_FROZEN:
393 case CPU_ONLINE:
394 case CPU_ONLINE_FROZEN:
395 lib_ring_buffer_iterator_init(chan, buf);
396 return NOTIFY_OK;
397 default:
398 return NOTIFY_DONE;
399 }
400}
401#endif
402
1e367326
MD
403#endif /* #else #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
404
f3bc08c5
MD
405int channel_iterator_init(struct channel *chan)
406{
5a8fd222 407 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
408 struct lib_ring_buffer *buf;
409
410 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
1e367326 411 int ret;
f3bc08c5
MD
412
413 INIT_LIST_HEAD(&chan->iter.empty_head);
a88db018 414 ret = lttng_heap_init(&chan->iter.heap,
41affe31 415 num_possible_cpus(),
f3bc08c5
MD
416 GFP_KERNEL, buf_is_higher);
417 if (ret)
418 return ret;
1e367326
MD
419
420#if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0))
421 chan->cpuhp_iter_online.component = LTTNG_RING_BUFFER_ITER;
422 ret = cpuhp_state_add_instance(lttng_rb_hp_online,
423 &chan->cpuhp_iter_online.node);
424 if (ret)
425 return ret;
426#else /* #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
427 {
428 int cpu;
429
430 /*
431 * In case of non-hotplug cpu, if the ring-buffer is allocated
432 * in early initcall, it will not be notified of secondary cpus.
433 * In that off case, we need to allocate for all possible cpus.
434 */
f3bc08c5 435#ifdef CONFIG_HOTPLUG_CPU
1e367326
MD
436 chan->hp_iter_notifier.notifier_call =
437 channel_iterator_cpu_hotplug;
438 chan->hp_iter_notifier.priority = 10;
439 register_cpu_notifier(&chan->hp_iter_notifier);
440
441 get_online_cpus();
442 for_each_online_cpu(cpu) {
443 buf = per_cpu_ptr(chan->backend.buf, cpu);
444 lib_ring_buffer_iterator_init(chan, buf);
445 }
446 chan->hp_iter_enable = 1;
447 put_online_cpus();
f3bc08c5 448#else
1e367326
MD
449 for_each_possible_cpu(cpu) {
450 buf = per_cpu_ptr(chan->backend.buf, cpu);
451 lib_ring_buffer_iterator_init(chan, buf);
452 }
f3bc08c5 453#endif
1e367326
MD
454 }
455#endif /* #else #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
f3bc08c5
MD
456 } else {
457 buf = channel_get_ring_buffer(config, chan, 0);
458 lib_ring_buffer_iterator_init(chan, buf);
459 }
460 return 0;
461}
462
463void channel_iterator_unregister_notifiers(struct channel *chan)
464{
5a8fd222 465 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
466
467 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
1e367326
MD
468#if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0))
469 {
470 int ret;
471
472 ret = cpuhp_state_remove_instance(lttng_rb_hp_online,
473 &chan->cpuhp_iter_online.node);
474 WARN_ON(ret);
475 }
476#else /* #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
f3bc08c5
MD
477 chan->hp_iter_enable = 0;
478 unregister_cpu_notifier(&chan->hp_iter_notifier);
1e367326 479#endif /* #else #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
f3bc08c5
MD
480 }
481}
482
483void channel_iterator_free(struct channel *chan)
484{
5a8fd222 485 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
486
487 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
a88db018 488 lttng_heap_free(&chan->iter.heap);
f3bc08c5
MD
489}
490
491int lib_ring_buffer_iterator_open(struct lib_ring_buffer *buf)
492{
493 struct channel *chan = buf->backend.chan;
5a8fd222 494 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
495 CHAN_WARN_ON(chan, config->output != RING_BUFFER_ITERATOR);
496 return lib_ring_buffer_open_read(buf);
497}
498EXPORT_SYMBOL_GPL(lib_ring_buffer_iterator_open);
499
500/*
501 * Note: Iterators must not be mixed with other types of outputs, because an
502 * iterator can leave the buffer in "GET" state, which is not consistent with
503 * other types of output (mmap, splice, raw data read).
504 */
505void lib_ring_buffer_iterator_release(struct lib_ring_buffer *buf)
506{
507 lib_ring_buffer_release_read(buf);
508}
509EXPORT_SYMBOL_GPL(lib_ring_buffer_iterator_release);
510
511int channel_iterator_open(struct channel *chan)
512{
5a8fd222 513 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
514 struct lib_ring_buffer *buf;
515 int ret = 0, cpu;
516
517 CHAN_WARN_ON(chan, config->output != RING_BUFFER_ITERATOR);
518
519 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
520 get_online_cpus();
521 /* Allow CPU hotplug to keep track of opened reader */
522 chan->iter.read_open = 1;
523 for_each_channel_cpu(cpu, chan) {
524 buf = channel_get_ring_buffer(config, chan, cpu);
525 ret = lib_ring_buffer_iterator_open(buf);
526 if (ret)
527 goto error;
528 buf->iter.read_open = 1;
529 }
530 put_online_cpus();
531 } else {
532 buf = channel_get_ring_buffer(config, chan, 0);
533 ret = lib_ring_buffer_iterator_open(buf);
534 }
535 return ret;
536error:
537 /* Error should always happen on CPU 0, hence no close is required. */
538 CHAN_WARN_ON(chan, cpu != 0);
539 put_online_cpus();
540 return ret;
541}
542EXPORT_SYMBOL_GPL(channel_iterator_open);
543
544void channel_iterator_release(struct channel *chan)
545{
5a8fd222 546 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
547 struct lib_ring_buffer *buf;
548 int cpu;
549
550 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
551 get_online_cpus();
552 for_each_channel_cpu(cpu, chan) {
553 buf = channel_get_ring_buffer(config, chan, cpu);
554 if (buf->iter.read_open) {
555 lib_ring_buffer_iterator_release(buf);
556 buf->iter.read_open = 0;
557 }
558 }
559 chan->iter.read_open = 0;
560 put_online_cpus();
561 } else {
562 buf = channel_get_ring_buffer(config, chan, 0);
563 lib_ring_buffer_iterator_release(buf);
564 }
565}
566EXPORT_SYMBOL_GPL(channel_iterator_release);
567
568void lib_ring_buffer_iterator_reset(struct lib_ring_buffer *buf)
569{
570 struct channel *chan = buf->backend.chan;
571
572 if (buf->iter.state != ITER_GET_SUBBUF)
573 lib_ring_buffer_put_next_subbuf(buf);
574 buf->iter.state = ITER_GET_SUBBUF;
575 /* Remove from heap (if present). */
a88db018 576 if (lttng_heap_cherrypick(&chan->iter.heap, buf))
f3bc08c5
MD
577 list_add(&buf->iter.empty_node, &chan->iter.empty_head);
578 buf->iter.timestamp = 0;
579 buf->iter.header_len = 0;
580 buf->iter.payload_len = 0;
581 buf->iter.consumed = 0;
582 buf->iter.read_offset = 0;
583 buf->iter.data_size = 0;
584 /* Don't reset allocated and read_open */
585}
586
587void channel_iterator_reset(struct channel *chan)
588{
5a8fd222 589 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
590 struct lib_ring_buffer *buf;
591 int cpu;
592
593 /* Empty heap, put into empty_head */
a88db018 594 while ((buf = lttng_heap_remove(&chan->iter.heap)) != NULL)
f3bc08c5
MD
595 list_add(&buf->iter.empty_node, &chan->iter.empty_head);
596
597 for_each_channel_cpu(cpu, chan) {
598 buf = channel_get_ring_buffer(config, chan, cpu);
599 lib_ring_buffer_iterator_reset(buf);
600 }
601 /* Don't reset read_open */
602 chan->iter.last_qs = 0;
603 chan->iter.last_timestamp = 0;
604 chan->iter.last_cpu = 0;
605 chan->iter.len_left = 0;
606}
607
608/*
609 * Ring buffer payload extraction read() implementation.
610 */
611static
612ssize_t channel_ring_buffer_file_read(struct file *filp,
613 char __user *user_buf,
614 size_t count,
615 loff_t *ppos,
616 struct channel *chan,
617 struct lib_ring_buffer *buf,
618 int fusionmerge)
619{
5a8fd222 620 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
621 size_t read_count = 0, read_offset;
622 ssize_t len;
623
624 might_sleep();
0039dbe9 625 if (!lttng_access_ok(VERIFY_WRITE, user_buf, count))
f3bc08c5
MD
626 return -EFAULT;
627
628 /* Finish copy of previous record */
629 if (*ppos != 0) {
630 if (read_count < count) {
631 len = chan->iter.len_left;
632 read_offset = *ppos;
633 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU
634 && fusionmerge)
a88db018 635 buf = lttng_heap_maximum(&chan->iter.heap);
f3bc08c5
MD
636 CHAN_WARN_ON(chan, !buf);
637 goto skip_get_next;
638 }
639 }
640
641 while (read_count < count) {
642 size_t copy_len, space_left;
643
644 if (fusionmerge)
645 len = channel_get_next_record(chan, &buf);
646 else
647 len = lib_ring_buffer_get_next_record(chan, buf);
648len_test:
649 if (len < 0) {
650 /*
651 * Check if buffer is finalized (end of file).
652 */
653 if (len == -ENODATA) {
654 /* A 0 read_count will tell about end of file */
655 goto nodata;
656 }
657 if (filp->f_flags & O_NONBLOCK) {
658 if (!read_count)
659 read_count = -EAGAIN;
660 goto nodata;
661 } else {
662 int error;
663
664 /*
665 * No data available at the moment, return what
666 * we got.
667 */
668 if (read_count)
669 goto nodata;
670
671 /*
672 * Wait for returned len to be >= 0 or -ENODATA.
673 */
674 if (fusionmerge)
675 error = wait_event_interruptible(
676 chan->read_wait,
677 ((len = channel_get_next_record(chan,
678 &buf)), len != -EAGAIN));
679 else
680 error = wait_event_interruptible(
681 buf->read_wait,
682 ((len = lib_ring_buffer_get_next_record(
683 chan, buf)), len != -EAGAIN));
684 CHAN_WARN_ON(chan, len == -EBUSY);
685 if (error) {
686 read_count = error;
687 goto nodata;
688 }
689 CHAN_WARN_ON(chan, len < 0 && len != -ENODATA);
690 goto len_test;
691 }
692 }
693 read_offset = buf->iter.read_offset;
694skip_get_next:
695 space_left = count - read_count;
696 if (len <= space_left) {
697 copy_len = len;
698 chan->iter.len_left = 0;
699 *ppos = 0;
700 } else {
701 copy_len = space_left;
702 chan->iter.len_left = len - copy_len;
703 *ppos = read_offset + copy_len;
704 }
705 if (__lib_ring_buffer_copy_to_user(&buf->backend, read_offset,
706 &user_buf[read_count],
707 copy_len)) {
708 /*
709 * Leave the len_left and ppos values at their current
710 * state, as we currently have a valid event to read.
711 */
712 return -EFAULT;
713 }
714 read_count += copy_len;
715 };
716 return read_count;
717
718nodata:
719 *ppos = 0;
720 chan->iter.len_left = 0;
721 return read_count;
722}
723
724/**
725 * lib_ring_buffer_file_read - Read buffer record payload.
726 * @filp: file structure pointer.
727 * @buffer: user buffer to read data into.
728 * @count: number of bytes to read.
729 * @ppos: file read position.
730 *
731 * Returns a negative value on error, or the number of bytes read on success.
732 * ppos is used to save the position _within the current record_ between calls
733 * to read().
734 */
735static
736ssize_t lib_ring_buffer_file_read(struct file *filp,
737 char __user *user_buf,
738 size_t count,
739 loff_t *ppos)
740{
b06ed645 741 struct inode *inode = filp->lttng_f_dentry->d_inode;
f3bc08c5
MD
742 struct lib_ring_buffer *buf = inode->i_private;
743 struct channel *chan = buf->backend.chan;
744
745 return channel_ring_buffer_file_read(filp, user_buf, count, ppos,
746 chan, buf, 0);
747}
748
749/**
750 * channel_file_read - Read channel record payload.
751 * @filp: file structure pointer.
752 * @buffer: user buffer to read data into.
753 * @count: number of bytes to read.
754 * @ppos: file read position.
755 *
756 * Returns a negative value on error, or the number of bytes read on success.
757 * ppos is used to save the position _within the current record_ between calls
758 * to read().
759 */
760static
761ssize_t channel_file_read(struct file *filp,
762 char __user *user_buf,
763 size_t count,
764 loff_t *ppos)
765{
b06ed645 766 struct inode *inode = filp->lttng_f_dentry->d_inode;
f3bc08c5 767 struct channel *chan = inode->i_private;
5a8fd222 768 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
769
770 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
771 return channel_ring_buffer_file_read(filp, user_buf, count,
772 ppos, chan, NULL, 1);
773 else {
774 struct lib_ring_buffer *buf =
775 channel_get_ring_buffer(config, chan, 0);
776 return channel_ring_buffer_file_read(filp, user_buf, count,
777 ppos, chan, buf, 0);
778 }
779}
780
781static
782int lib_ring_buffer_file_open(struct inode *inode, struct file *file)
783{
784 struct lib_ring_buffer *buf = inode->i_private;
785 int ret;
786
787 ret = lib_ring_buffer_iterator_open(buf);
788 if (ret)
789 return ret;
790
791 file->private_data = buf;
792 ret = nonseekable_open(inode, file);
793 if (ret)
794 goto release_iter;
795 return 0;
796
797release_iter:
798 lib_ring_buffer_iterator_release(buf);
799 return ret;
800}
801
802static
803int lib_ring_buffer_file_release(struct inode *inode, struct file *file)
804{
805 struct lib_ring_buffer *buf = inode->i_private;
806
807 lib_ring_buffer_iterator_release(buf);
808 return 0;
809}
810
811static
812int channel_file_open(struct inode *inode, struct file *file)
813{
814 struct channel *chan = inode->i_private;
815 int ret;
816
817 ret = channel_iterator_open(chan);
818 if (ret)
819 return ret;
820
821 file->private_data = chan;
822 ret = nonseekable_open(inode, file);
823 if (ret)
824 goto release_iter;
825 return 0;
826
827release_iter:
828 channel_iterator_release(chan);
829 return ret;
830}
831
832static
833int channel_file_release(struct inode *inode, struct file *file)
834{
835 struct channel *chan = inode->i_private;
836
837 channel_iterator_release(chan);
838 return 0;
839}
840
841const struct file_operations channel_payload_file_operations = {
a33c9927 842 .owner = THIS_MODULE,
f3bc08c5
MD
843 .open = channel_file_open,
844 .release = channel_file_release,
845 .read = channel_file_read,
d83004aa 846 .llseek = vfs_lib_ring_buffer_no_llseek,
f3bc08c5
MD
847};
848EXPORT_SYMBOL_GPL(channel_payload_file_operations);
849
850const struct file_operations lib_ring_buffer_payload_file_operations = {
a33c9927 851 .owner = THIS_MODULE,
f3bc08c5
MD
852 .open = lib_ring_buffer_file_open,
853 .release = lib_ring_buffer_file_release,
854 .read = lib_ring_buffer_file_read,
d83004aa 855 .llseek = vfs_lib_ring_buffer_no_llseek,
f3bc08c5
MD
856};
857EXPORT_SYMBOL_GPL(lib_ring_buffer_payload_file_operations);
This page took 0.075133 seconds and 4 git commands to generate.