fix: cpu/hotplug: Remove deprecated CPU-hotplug functions. (v5.15)
[lttng-modules.git] / src / lib / ringbuffer / ring_buffer_iterator.c
CommitLineData
b7cdc182 1/* SPDX-License-Identifier: (GPL-2.0-only OR LGPL-2.1-only)
9f36eaed 2 *
f3bc08c5
MD
3 * ring_buffer_iterator.c
4 *
f3bc08c5
MD
5 * Ring buffer and channel iterators. Get each event of a channel in order. Uses
6 * a prio heap for per-cpu buffers, giving a O(log(NR_CPUS)) algorithmic
7 * complexity for the "get next event" operation.
8 *
886d51a3 9 * Copyright (C) 2010-2012 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
f3bc08c5
MD
10 */
11
24591303 12#include <ringbuffer/iterator.h>
ffcc8734 13#include <wrapper/cpu.h>
c075712b 14#include <wrapper/file.h>
80bb2600 15#include <wrapper/uaccess.h>
f3bc08c5
MD
16#include <linux/jiffies.h>
17#include <linux/delay.h>
18#include <linux/module.h>
19
20/*
21 * Safety factor taking into account internal kernel interrupt latency.
22 * Assuming 250ms worse-case latency.
23 */
24#define MAX_SYSTEM_LATENCY 250
25
26/*
27 * Maximum delta expected between trace clocks. At most 1 jiffy delta.
28 */
29#define MAX_CLOCK_DELTA (jiffies_to_usecs(1) * 1000)
30
31/**
32 * lib_ring_buffer_get_next_record - Get the next record in a buffer.
33 * @chan: channel
34 * @buf: buffer
35 *
36 * Returns the size of the event read, -EAGAIN if buffer is empty, -ENODATA if
37 * buffer is empty and finalized. The buffer must already be opened for reading.
38 */
860c213b 39ssize_t lib_ring_buffer_get_next_record(struct lttng_kernel_ring_buffer_channel *chan,
e20c0fec 40 struct lttng_kernel_ring_buffer *buf)
f3bc08c5 41{
e20c0fec
MD
42 const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config;
43 struct lttng_kernel_ring_buffer_iter *iter = &buf->iter;
f3bc08c5
MD
44 int ret;
45
46restart:
47 switch (iter->state) {
48 case ITER_GET_SUBBUF:
49 ret = lib_ring_buffer_get_next_subbuf(buf);
585e5dcc 50 if (ret && !LTTNG_READ_ONCE(buf->finalized)
f3bc08c5
MD
51 && config->alloc == RING_BUFFER_ALLOC_GLOBAL) {
52 /*
53 * Use "pull" scheme for global buffers. The reader
54 * itself flushes the buffer to "pull" data not visible
55 * to readers yet. Flush current subbuffer and re-try.
56 *
57 * Per-CPU buffers rather use a "push" scheme because
58 * the IPI needed to flush all CPU's buffers is too
59 * costly. In the "push" scheme, the reader waits for
da9f3fb7 60 * the writer periodic timer to flush the
f3bc08c5
MD
61 * buffers (keeping track of a quiescent state
62 * timestamp). Therefore, the writer "pushes" data out
63 * of the buffers rather than letting the reader "pull"
64 * data from the buffer.
65 */
66 lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
67 ret = lib_ring_buffer_get_next_subbuf(buf);
68 }
69 if (ret)
70 return ret;
71 iter->consumed = buf->cons_snapshot;
72 iter->data_size = lib_ring_buffer_get_read_data_size(config, buf);
73 iter->read_offset = iter->consumed;
74 /* skip header */
75 iter->read_offset += config->cb.subbuffer_header_size();
76 iter->state = ITER_TEST_RECORD;
77 goto restart;
78 case ITER_TEST_RECORD:
79 if (iter->read_offset - iter->consumed >= iter->data_size) {
80 iter->state = ITER_PUT_SUBBUF;
81 } else {
82 CHAN_WARN_ON(chan, !config->cb.record_get);
83 config->cb.record_get(config, chan, buf,
84 iter->read_offset,
85 &iter->header_len,
86 &iter->payload_len,
87 &iter->timestamp);
88 iter->read_offset += iter->header_len;
89 subbuffer_consume_record(config, &buf->backend);
90 iter->state = ITER_NEXT_RECORD;
91 return iter->payload_len;
92 }
93 goto restart;
94 case ITER_NEXT_RECORD:
95 iter->read_offset += iter->payload_len;
96 iter->state = ITER_TEST_RECORD;
97 goto restart;
98 case ITER_PUT_SUBBUF:
99 lib_ring_buffer_put_next_subbuf(buf);
100 iter->state = ITER_GET_SUBBUF;
101 goto restart;
102 default:
103 CHAN_WARN_ON(chan, 1); /* Should not happen */
104 return -EPERM;
105 }
106}
107EXPORT_SYMBOL_GPL(lib_ring_buffer_get_next_record);
108
e20c0fec 109void lib_ring_buffer_put_current_record(struct lttng_kernel_ring_buffer *buf)
47d4b31d 110{
e20c0fec 111 struct lttng_kernel_ring_buffer_iter *iter;
47d4b31d
MD
112
113 if (!buf)
114 return;
115 iter = &buf->iter;
116 if (iter->state != ITER_NEXT_RECORD)
117 return;
118 iter->read_offset += iter->payload_len;
119 iter->state = ITER_TEST_RECORD;
120 if (iter->read_offset - iter->consumed >= iter->data_size) {
121 lib_ring_buffer_put_next_subbuf(buf);
122 iter->state = ITER_GET_SUBBUF;
123 }
124}
125EXPORT_SYMBOL_GPL(lib_ring_buffer_put_current_record);
126
f3bc08c5
MD
127static int buf_is_higher(void *a, void *b)
128{
e20c0fec
MD
129 struct lttng_kernel_ring_buffer *bufa = a;
130 struct lttng_kernel_ring_buffer *bufb = b;
f3bc08c5
MD
131
132 /* Consider lowest timestamps to be at the top of the heap */
133 return (bufa->iter.timestamp < bufb->iter.timestamp);
134}
135
136static
e20c0fec 137void lib_ring_buffer_get_empty_buf_records(const struct lttng_kernel_ring_buffer_config *config,
860c213b 138 struct lttng_kernel_ring_buffer_channel *chan)
f3bc08c5 139{
a88db018 140 struct lttng_ptr_heap *heap = &chan->iter.heap;
e20c0fec 141 struct lttng_kernel_ring_buffer *buf, *tmp;
f3bc08c5
MD
142 ssize_t len;
143
144 list_for_each_entry_safe(buf, tmp, &chan->iter.empty_head,
145 iter.empty_node) {
146 len = lib_ring_buffer_get_next_record(chan, buf);
147
148 /*
149 * Deal with -EAGAIN and -ENODATA.
150 * len >= 0 means record contains data.
151 * -EBUSY should never happen, because we support only one
152 * reader.
153 */
154 switch (len) {
155 case -EAGAIN:
156 /* Keep node in empty list */
157 break;
158 case -ENODATA:
159 /*
160 * Buffer is finalized. Don't add to list of empty
161 * buffer, because it has no more data to provide, ever.
162 */
163 list_del(&buf->iter.empty_node);
164 break;
165 case -EBUSY:
166 CHAN_WARN_ON(chan, 1);
167 break;
168 default:
169 /*
170 * Insert buffer into the heap, remove from empty buffer
ab2277d6 171 * list.
f3bc08c5
MD
172 */
173 CHAN_WARN_ON(chan, len < 0);
174 list_del(&buf->iter.empty_node);
a88db018 175 CHAN_WARN_ON(chan, lttng_heap_insert(heap, buf));
f3bc08c5
MD
176 }
177 }
178}
179
180static
e20c0fec 181void lib_ring_buffer_wait_for_qs(const struct lttng_kernel_ring_buffer_config *config,
860c213b 182 struct lttng_kernel_ring_buffer_channel *chan)
f3bc08c5
MD
183{
184 u64 timestamp_qs;
185 unsigned long wait_msecs;
186
187 /*
188 * No need to wait if no empty buffers are present.
189 */
190 if (list_empty(&chan->iter.empty_head))
191 return;
192
193 timestamp_qs = config->cb.ring_buffer_clock_read(chan);
194 /*
195 * We need to consider previously empty buffers.
196 * Do a get next buf record on each of them. Add them to
197 * the heap if they have data. If at least one of them
198 * don't have data, we need to wait for
199 * switch_timer_interval + MAX_SYSTEM_LATENCY (so we are sure the
200 * buffers have been switched either by the timer or idle entry) and
201 * check them again, adding them if they have data.
202 */
203 lib_ring_buffer_get_empty_buf_records(config, chan);
204
205 /*
206 * No need to wait if no empty buffers are present.
207 */
208 if (list_empty(&chan->iter.empty_head))
209 return;
210
211 /*
212 * We need to wait for the buffer switch timer to run. If the
213 * CPU is idle, idle entry performed the switch.
214 * TODO: we could optimize further by skipping the sleep if all
215 * empty buffers belong to idle or offline cpus.
216 */
217 wait_msecs = jiffies_to_msecs(chan->switch_timer_interval);
218 wait_msecs += MAX_SYSTEM_LATENCY;
219 msleep(wait_msecs);
220 lib_ring_buffer_get_empty_buf_records(config, chan);
221 /*
222 * Any buffer still in the empty list here cannot possibly
223 * contain an event with a timestamp prior to "timestamp_qs".
224 * The new quiescent state timestamp is the one we grabbed
225 * before waiting for buffer data. It is therefore safe to
226 * ignore empty buffers up to last_qs timestamp for fusion
227 * merge.
228 */
229 chan->iter.last_qs = timestamp_qs;
230}
231
232/**
233 * channel_get_next_record - Get the next record in a channel.
234 * @chan: channel
235 * @ret_buf: the buffer in which the event is located (output)
236 *
237 * Returns the size of new current event, -EAGAIN if all buffers are empty,
238 * -ENODATA if all buffers are empty and finalized. The channel must already be
239 * opened for reading.
240 */
241
860c213b 242ssize_t channel_get_next_record(struct lttng_kernel_ring_buffer_channel *chan,
e20c0fec 243 struct lttng_kernel_ring_buffer **ret_buf)
f3bc08c5 244{
e20c0fec
MD
245 const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config;
246 struct lttng_kernel_ring_buffer *buf;
a88db018 247 struct lttng_ptr_heap *heap;
f3bc08c5
MD
248 ssize_t len;
249
250 if (config->alloc == RING_BUFFER_ALLOC_GLOBAL) {
251 *ret_buf = channel_get_ring_buffer(config, chan, 0);
252 return lib_ring_buffer_get_next_record(chan, *ret_buf);
253 }
254
255 heap = &chan->iter.heap;
256
257 /*
258 * get next record for topmost buffer.
259 */
a88db018 260 buf = lttng_heap_maximum(heap);
f3bc08c5
MD
261 if (buf) {
262 len = lib_ring_buffer_get_next_record(chan, buf);
263 /*
264 * Deal with -EAGAIN and -ENODATA.
265 * len >= 0 means record contains data.
266 */
267 switch (len) {
268 case -EAGAIN:
269 buf->iter.timestamp = 0;
270 list_add(&buf->iter.empty_node, &chan->iter.empty_head);
271 /* Remove topmost buffer from the heap */
a88db018 272 CHAN_WARN_ON(chan, lttng_heap_remove(heap) != buf);
f3bc08c5
MD
273 break;
274 case -ENODATA:
275 /*
276 * Buffer is finalized. Remove buffer from heap and
277 * don't add to list of empty buffer, because it has no
278 * more data to provide, ever.
279 */
a88db018 280 CHAN_WARN_ON(chan, lttng_heap_remove(heap) != buf);
f3bc08c5
MD
281 break;
282 case -EBUSY:
283 CHAN_WARN_ON(chan, 1);
284 break;
285 default:
286 /*
287 * Reinsert buffer into the heap. Note that heap can be
288 * partially empty, so we need to use
a88db018 289 * lttng_heap_replace_max().
f3bc08c5
MD
290 */
291 CHAN_WARN_ON(chan, len < 0);
a88db018 292 CHAN_WARN_ON(chan, lttng_heap_replace_max(heap, buf) != buf);
f3bc08c5
MD
293 break;
294 }
295 }
296
a88db018 297 buf = lttng_heap_maximum(heap);
f3bc08c5
MD
298 if (!buf || buf->iter.timestamp > chan->iter.last_qs) {
299 /*
300 * Deal with buffers previously showing no data.
301 * Add buffers containing data to the heap, update
302 * last_qs.
303 */
304 lib_ring_buffer_wait_for_qs(config, chan);
305 }
306
a88db018 307 *ret_buf = buf = lttng_heap_maximum(heap);
f3bc08c5
MD
308 if (buf) {
309 /*
310 * If this warning triggers, you probably need to check your
311 * system interrupt latency. Typical causes: too many printk()
312 * output going to a serial console with interrupts off.
313 * Allow for MAX_CLOCK_DELTA ns timestamp delta going backward.
314 * Observed on SMP KVM setups with trace_clock().
315 */
316 if (chan->iter.last_timestamp
317 > (buf->iter.timestamp + MAX_CLOCK_DELTA)) {
5a15f70c 318 printk(KERN_WARNING "LTTng: ring_buffer: timestamps going "
f3bc08c5
MD
319 "backward. Last time %llu ns, cpu %d, "
320 "current time %llu ns, cpu %d, "
321 "delta %llu ns.\n",
322 chan->iter.last_timestamp, chan->iter.last_cpu,
323 buf->iter.timestamp, buf->backend.cpu,
324 chan->iter.last_timestamp - buf->iter.timestamp);
325 CHAN_WARN_ON(chan, 1);
326 }
327 chan->iter.last_timestamp = buf->iter.timestamp;
328 chan->iter.last_cpu = buf->backend.cpu;
329 return buf->iter.payload_len;
330 } else {
331 /* Heap is empty */
332 if (list_empty(&chan->iter.empty_head))
333 return -ENODATA; /* All buffers finalized */
334 else
335 return -EAGAIN; /* Temporarily empty */
336 }
337}
338EXPORT_SYMBOL_GPL(channel_get_next_record);
339
340static
e20c0fec 341void lib_ring_buffer_iterator_init(struct lttng_kernel_ring_buffer_channel *chan, struct lttng_kernel_ring_buffer *buf)
f3bc08c5
MD
342{
343 if (buf->iter.allocated)
344 return;
345
346 buf->iter.allocated = 1;
347 if (chan->iter.read_open && !buf->iter.read_open) {
348 CHAN_WARN_ON(chan, lib_ring_buffer_open_read(buf) != 0);
349 buf->iter.read_open = 1;
350 }
351
352 /* Add to list of buffers without any current record */
5a8fd222 353 if (chan->backend.config.alloc == RING_BUFFER_ALLOC_PER_CPU)
f3bc08c5
MD
354 list_add(&buf->iter.empty_node, &chan->iter.empty_head);
355}
356
5f4c791e 357#if (LTTNG_LINUX_VERSION_CODE >= LTTNG_KERNEL_VERSION(4,10,0))
1e367326
MD
358
359int lttng_cpuhp_rb_iter_online(unsigned int cpu,
360 struct lttng_cpuhp_node *node)
361{
860c213b 362 struct lttng_kernel_ring_buffer_channel *chan = container_of(node, struct lttng_kernel_ring_buffer_channel,
1e367326 363 cpuhp_iter_online);
e20c0fec
MD
364 struct lttng_kernel_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu);
365 const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config;
1e367326
MD
366
367 CHAN_WARN_ON(chan, config->alloc == RING_BUFFER_ALLOC_GLOBAL);
368
369 lib_ring_buffer_iterator_init(chan, buf);
370 return 0;
371}
372EXPORT_SYMBOL_GPL(lttng_cpuhp_rb_iter_online);
373
5f4c791e 374#else /* #if (LTTNG_LINUX_VERSION_CODE >= LTTNG_KERNEL_VERSION(4,10,0)) */
1e367326 375
f3bc08c5
MD
376#ifdef CONFIG_HOTPLUG_CPU
377static
e8f071d5 378int channel_iterator_cpu_hotplug(struct notifier_block *nb,
f3bc08c5
MD
379 unsigned long action,
380 void *hcpu)
381{
382 unsigned int cpu = (unsigned long)hcpu;
860c213b 383 struct lttng_kernel_ring_buffer_channel *chan = container_of(nb, struct lttng_kernel_ring_buffer_channel,
f3bc08c5 384 hp_iter_notifier);
e20c0fec
MD
385 struct lttng_kernel_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu);
386 const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
387
388 if (!chan->hp_iter_enable)
389 return NOTIFY_DONE;
390
391 CHAN_WARN_ON(chan, config->alloc == RING_BUFFER_ALLOC_GLOBAL);
392
393 switch (action) {
394 case CPU_DOWN_FAILED:
395 case CPU_DOWN_FAILED_FROZEN:
396 case CPU_ONLINE:
397 case CPU_ONLINE_FROZEN:
398 lib_ring_buffer_iterator_init(chan, buf);
399 return NOTIFY_OK;
400 default:
401 return NOTIFY_DONE;
402 }
403}
404#endif
405
5f4c791e 406#endif /* #else #if (LTTNG_LINUX_VERSION_CODE >= LTTNG_KERNEL_VERSION(4,10,0)) */
1e367326 407
860c213b 408int channel_iterator_init(struct lttng_kernel_ring_buffer_channel *chan)
f3bc08c5 409{
e20c0fec
MD
410 const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config;
411 struct lttng_kernel_ring_buffer *buf;
f3bc08c5
MD
412
413 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
1e367326 414 int ret;
f3bc08c5
MD
415
416 INIT_LIST_HEAD(&chan->iter.empty_head);
a88db018 417 ret = lttng_heap_init(&chan->iter.heap,
41affe31 418 num_possible_cpus(),
f3bc08c5
MD
419 GFP_KERNEL, buf_is_higher);
420 if (ret)
421 return ret;
1e367326 422
5f4c791e 423#if (LTTNG_LINUX_VERSION_CODE >= LTTNG_KERNEL_VERSION(4,10,0))
1e367326
MD
424 chan->cpuhp_iter_online.component = LTTNG_RING_BUFFER_ITER;
425 ret = cpuhp_state_add_instance(lttng_rb_hp_online,
426 &chan->cpuhp_iter_online.node);
427 if (ret)
428 return ret;
5f4c791e 429#else /* #if (LTTNG_LINUX_VERSION_CODE >= LTTNG_KERNEL_VERSION(4,10,0)) */
1e367326
MD
430 {
431 int cpu;
432
433 /*
434 * In case of non-hotplug cpu, if the ring-buffer is allocated
435 * in early initcall, it will not be notified of secondary cpus.
436 * In that off case, we need to allocate for all possible cpus.
437 */
f3bc08c5 438#ifdef CONFIG_HOTPLUG_CPU
1e367326
MD
439 chan->hp_iter_notifier.notifier_call =
440 channel_iterator_cpu_hotplug;
441 chan->hp_iter_notifier.priority = 10;
442 register_cpu_notifier(&chan->hp_iter_notifier);
443
ffcc8734 444 lttng_cpus_read_lock();
1e367326
MD
445 for_each_online_cpu(cpu) {
446 buf = per_cpu_ptr(chan->backend.buf, cpu);
447 lib_ring_buffer_iterator_init(chan, buf);
448 }
449 chan->hp_iter_enable = 1;
ffcc8734 450 lttng_cpus_read_unlock();
f3bc08c5 451#else
1e367326
MD
452 for_each_possible_cpu(cpu) {
453 buf = per_cpu_ptr(chan->backend.buf, cpu);
454 lib_ring_buffer_iterator_init(chan, buf);
455 }
f3bc08c5 456#endif
1e367326 457 }
5f4c791e 458#endif /* #else #if (LTTNG_LINUX_VERSION_CODE >= LTTNG_KERNEL_VERSION(4,10,0)) */
f3bc08c5
MD
459 } else {
460 buf = channel_get_ring_buffer(config, chan, 0);
461 lib_ring_buffer_iterator_init(chan, buf);
462 }
463 return 0;
464}
465
860c213b 466void channel_iterator_unregister_notifiers(struct lttng_kernel_ring_buffer_channel *chan)
f3bc08c5 467{
e20c0fec 468 const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
469
470 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
5f4c791e 471#if (LTTNG_LINUX_VERSION_CODE >= LTTNG_KERNEL_VERSION(4,10,0))
1e367326
MD
472 {
473 int ret;
474
475 ret = cpuhp_state_remove_instance(lttng_rb_hp_online,
476 &chan->cpuhp_iter_online.node);
477 WARN_ON(ret);
478 }
5f4c791e 479#else /* #if (LTTNG_LINUX_VERSION_CODE >= LTTNG_KERNEL_VERSION(4,10,0)) */
f3bc08c5
MD
480 chan->hp_iter_enable = 0;
481 unregister_cpu_notifier(&chan->hp_iter_notifier);
5f4c791e 482#endif /* #else #if (LTTNG_LINUX_VERSION_CODE >= LTTNG_KERNEL_VERSION(4,10,0)) */
f3bc08c5
MD
483 }
484}
485
860c213b 486void channel_iterator_free(struct lttng_kernel_ring_buffer_channel *chan)
f3bc08c5 487{
e20c0fec 488 const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
489
490 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
a88db018 491 lttng_heap_free(&chan->iter.heap);
f3bc08c5
MD
492}
493
e20c0fec 494int lib_ring_buffer_iterator_open(struct lttng_kernel_ring_buffer *buf)
f3bc08c5 495{
860c213b 496 struct lttng_kernel_ring_buffer_channel *chan = buf->backend.chan;
e20c0fec 497 const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
498 CHAN_WARN_ON(chan, config->output != RING_BUFFER_ITERATOR);
499 return lib_ring_buffer_open_read(buf);
500}
501EXPORT_SYMBOL_GPL(lib_ring_buffer_iterator_open);
502
503/*
504 * Note: Iterators must not be mixed with other types of outputs, because an
505 * iterator can leave the buffer in "GET" state, which is not consistent with
506 * other types of output (mmap, splice, raw data read).
507 */
e20c0fec 508void lib_ring_buffer_iterator_release(struct lttng_kernel_ring_buffer *buf)
f3bc08c5
MD
509{
510 lib_ring_buffer_release_read(buf);
511}
512EXPORT_SYMBOL_GPL(lib_ring_buffer_iterator_release);
513
860c213b 514int channel_iterator_open(struct lttng_kernel_ring_buffer_channel *chan)
f3bc08c5 515{
e20c0fec
MD
516 const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config;
517 struct lttng_kernel_ring_buffer *buf;
f3bc08c5
MD
518 int ret = 0, cpu;
519
520 CHAN_WARN_ON(chan, config->output != RING_BUFFER_ITERATOR);
521
522 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
ffcc8734 523 lttng_cpus_read_lock();
f3bc08c5
MD
524 /* Allow CPU hotplug to keep track of opened reader */
525 chan->iter.read_open = 1;
526 for_each_channel_cpu(cpu, chan) {
527 buf = channel_get_ring_buffer(config, chan, cpu);
528 ret = lib_ring_buffer_iterator_open(buf);
529 if (ret)
530 goto error;
531 buf->iter.read_open = 1;
532 }
ffcc8734 533 lttng_cpus_read_unlock();
f3bc08c5
MD
534 } else {
535 buf = channel_get_ring_buffer(config, chan, 0);
536 ret = lib_ring_buffer_iterator_open(buf);
537 }
538 return ret;
539error:
540 /* Error should always happen on CPU 0, hence no close is required. */
541 CHAN_WARN_ON(chan, cpu != 0);
ffcc8734 542 lttng_cpus_read_unlock();
f3bc08c5
MD
543 return ret;
544}
545EXPORT_SYMBOL_GPL(channel_iterator_open);
546
860c213b 547void channel_iterator_release(struct lttng_kernel_ring_buffer_channel *chan)
f3bc08c5 548{
e20c0fec
MD
549 const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config;
550 struct lttng_kernel_ring_buffer *buf;
f3bc08c5
MD
551 int cpu;
552
553 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
ffcc8734 554 lttng_cpus_read_lock();
f3bc08c5
MD
555 for_each_channel_cpu(cpu, chan) {
556 buf = channel_get_ring_buffer(config, chan, cpu);
557 if (buf->iter.read_open) {
558 lib_ring_buffer_iterator_release(buf);
559 buf->iter.read_open = 0;
560 }
561 }
562 chan->iter.read_open = 0;
ffcc8734 563 lttng_cpus_read_unlock();
f3bc08c5
MD
564 } else {
565 buf = channel_get_ring_buffer(config, chan, 0);
566 lib_ring_buffer_iterator_release(buf);
567 }
568}
569EXPORT_SYMBOL_GPL(channel_iterator_release);
570
e20c0fec 571void lib_ring_buffer_iterator_reset(struct lttng_kernel_ring_buffer *buf)
f3bc08c5 572{
860c213b 573 struct lttng_kernel_ring_buffer_channel *chan = buf->backend.chan;
f3bc08c5
MD
574
575 if (buf->iter.state != ITER_GET_SUBBUF)
576 lib_ring_buffer_put_next_subbuf(buf);
577 buf->iter.state = ITER_GET_SUBBUF;
578 /* Remove from heap (if present). */
a88db018 579 if (lttng_heap_cherrypick(&chan->iter.heap, buf))
f3bc08c5
MD
580 list_add(&buf->iter.empty_node, &chan->iter.empty_head);
581 buf->iter.timestamp = 0;
582 buf->iter.header_len = 0;
583 buf->iter.payload_len = 0;
584 buf->iter.consumed = 0;
585 buf->iter.read_offset = 0;
586 buf->iter.data_size = 0;
587 /* Don't reset allocated and read_open */
588}
589
860c213b 590void channel_iterator_reset(struct lttng_kernel_ring_buffer_channel *chan)
f3bc08c5 591{
e20c0fec
MD
592 const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config;
593 struct lttng_kernel_ring_buffer *buf;
f3bc08c5
MD
594 int cpu;
595
596 /* Empty heap, put into empty_head */
a88db018 597 while ((buf = lttng_heap_remove(&chan->iter.heap)) != NULL)
f3bc08c5
MD
598 list_add(&buf->iter.empty_node, &chan->iter.empty_head);
599
600 for_each_channel_cpu(cpu, chan) {
601 buf = channel_get_ring_buffer(config, chan, cpu);
602 lib_ring_buffer_iterator_reset(buf);
603 }
604 /* Don't reset read_open */
605 chan->iter.last_qs = 0;
606 chan->iter.last_timestamp = 0;
607 chan->iter.last_cpu = 0;
608 chan->iter.len_left = 0;
609}
610
611/*
612 * Ring buffer payload extraction read() implementation.
613 */
614static
615ssize_t channel_ring_buffer_file_read(struct file *filp,
616 char __user *user_buf,
617 size_t count,
618 loff_t *ppos,
860c213b 619 struct lttng_kernel_ring_buffer_channel *chan,
e20c0fec 620 struct lttng_kernel_ring_buffer *buf,
f3bc08c5
MD
621 int fusionmerge)
622{
e20c0fec 623 const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
624 size_t read_count = 0, read_offset;
625 ssize_t len;
626
627 might_sleep();
80bb2600 628 if (!lttng_access_ok(VERIFY_WRITE, user_buf, count))
f3bc08c5
MD
629 return -EFAULT;
630
631 /* Finish copy of previous record */
632 if (*ppos != 0) {
633 if (read_count < count) {
634 len = chan->iter.len_left;
635 read_offset = *ppos;
636 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU
637 && fusionmerge)
a88db018 638 buf = lttng_heap_maximum(&chan->iter.heap);
f3bc08c5
MD
639 CHAN_WARN_ON(chan, !buf);
640 goto skip_get_next;
641 }
642 }
643
644 while (read_count < count) {
645 size_t copy_len, space_left;
646
647 if (fusionmerge)
648 len = channel_get_next_record(chan, &buf);
649 else
650 len = lib_ring_buffer_get_next_record(chan, buf);
651len_test:
652 if (len < 0) {
653 /*
654 * Check if buffer is finalized (end of file).
655 */
656 if (len == -ENODATA) {
657 /* A 0 read_count will tell about end of file */
658 goto nodata;
659 }
660 if (filp->f_flags & O_NONBLOCK) {
661 if (!read_count)
662 read_count = -EAGAIN;
663 goto nodata;
664 } else {
665 int error;
666
667 /*
668 * No data available at the moment, return what
669 * we got.
670 */
671 if (read_count)
672 goto nodata;
673
674 /*
675 * Wait for returned len to be >= 0 or -ENODATA.
676 */
677 if (fusionmerge)
678 error = wait_event_interruptible(
679 chan->read_wait,
680 ((len = channel_get_next_record(chan,
681 &buf)), len != -EAGAIN));
682 else
683 error = wait_event_interruptible(
684 buf->read_wait,
685 ((len = lib_ring_buffer_get_next_record(
686 chan, buf)), len != -EAGAIN));
687 CHAN_WARN_ON(chan, len == -EBUSY);
688 if (error) {
689 read_count = error;
690 goto nodata;
691 }
692 CHAN_WARN_ON(chan, len < 0 && len != -ENODATA);
693 goto len_test;
694 }
695 }
696 read_offset = buf->iter.read_offset;
697skip_get_next:
698 space_left = count - read_count;
699 if (len <= space_left) {
700 copy_len = len;
701 chan->iter.len_left = 0;
702 *ppos = 0;
703 } else {
704 copy_len = space_left;
705 chan->iter.len_left = len - copy_len;
706 *ppos = read_offset + copy_len;
707 }
708 if (__lib_ring_buffer_copy_to_user(&buf->backend, read_offset,
709 &user_buf[read_count],
710 copy_len)) {
711 /*
712 * Leave the len_left and ppos values at their current
713 * state, as we currently have a valid event to read.
714 */
715 return -EFAULT;
716 }
717 read_count += copy_len;
47d4b31d
MD
718 }
719 goto put_record;
f3bc08c5
MD
720
721nodata:
722 *ppos = 0;
723 chan->iter.len_left = 0;
47d4b31d
MD
724put_record:
725 lib_ring_buffer_put_current_record(buf);
f3bc08c5
MD
726 return read_count;
727}
728
729/**
730 * lib_ring_buffer_file_read - Read buffer record payload.
731 * @filp: file structure pointer.
732 * @buffer: user buffer to read data into.
733 * @count: number of bytes to read.
734 * @ppos: file read position.
735 *
736 * Returns a negative value on error, or the number of bytes read on success.
737 * ppos is used to save the position _within the current record_ between calls
738 * to read().
739 */
740static
741ssize_t lib_ring_buffer_file_read(struct file *filp,
742 char __user *user_buf,
743 size_t count,
744 loff_t *ppos)
745{
b06ed645 746 struct inode *inode = filp->lttng_f_dentry->d_inode;
e20c0fec 747 struct lttng_kernel_ring_buffer *buf = inode->i_private;
860c213b 748 struct lttng_kernel_ring_buffer_channel *chan = buf->backend.chan;
f3bc08c5
MD
749
750 return channel_ring_buffer_file_read(filp, user_buf, count, ppos,
751 chan, buf, 0);
752}
753
754/**
755 * channel_file_read - Read channel record payload.
756 * @filp: file structure pointer.
757 * @buffer: user buffer to read data into.
758 * @count: number of bytes to read.
759 * @ppos: file read position.
760 *
761 * Returns a negative value on error, or the number of bytes read on success.
762 * ppos is used to save the position _within the current record_ between calls
763 * to read().
764 */
765static
766ssize_t channel_file_read(struct file *filp,
767 char __user *user_buf,
768 size_t count,
769 loff_t *ppos)
770{
b06ed645 771 struct inode *inode = filp->lttng_f_dentry->d_inode;
860c213b 772 struct lttng_kernel_ring_buffer_channel *chan = inode->i_private;
e20c0fec 773 const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
774
775 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
776 return channel_ring_buffer_file_read(filp, user_buf, count,
777 ppos, chan, NULL, 1);
778 else {
e20c0fec 779 struct lttng_kernel_ring_buffer *buf =
f3bc08c5
MD
780 channel_get_ring_buffer(config, chan, 0);
781 return channel_ring_buffer_file_read(filp, user_buf, count,
782 ppos, chan, buf, 0);
783 }
784}
785
786static
787int lib_ring_buffer_file_open(struct inode *inode, struct file *file)
788{
e20c0fec 789 struct lttng_kernel_ring_buffer *buf = inode->i_private;
f3bc08c5
MD
790 int ret;
791
792 ret = lib_ring_buffer_iterator_open(buf);
793 if (ret)
794 return ret;
795
796 file->private_data = buf;
797 ret = nonseekable_open(inode, file);
798 if (ret)
799 goto release_iter;
800 return 0;
801
802release_iter:
803 lib_ring_buffer_iterator_release(buf);
804 return ret;
805}
806
807static
808int lib_ring_buffer_file_release(struct inode *inode, struct file *file)
809{
e20c0fec 810 struct lttng_kernel_ring_buffer *buf = inode->i_private;
f3bc08c5
MD
811
812 lib_ring_buffer_iterator_release(buf);
813 return 0;
814}
815
816static
817int channel_file_open(struct inode *inode, struct file *file)
818{
860c213b 819 struct lttng_kernel_ring_buffer_channel *chan = inode->i_private;
f3bc08c5
MD
820 int ret;
821
822 ret = channel_iterator_open(chan);
823 if (ret)
824 return ret;
825
826 file->private_data = chan;
827 ret = nonseekable_open(inode, file);
828 if (ret)
829 goto release_iter;
830 return 0;
831
832release_iter:
833 channel_iterator_release(chan);
834 return ret;
835}
836
837static
838int channel_file_release(struct inode *inode, struct file *file)
839{
860c213b 840 struct lttng_kernel_ring_buffer_channel *chan = inode->i_private;
f3bc08c5
MD
841
842 channel_iterator_release(chan);
843 return 0;
844}
845
846const struct file_operations channel_payload_file_operations = {
a33c9927 847 .owner = THIS_MODULE,
f3bc08c5
MD
848 .open = channel_file_open,
849 .release = channel_file_release,
850 .read = channel_file_read,
d83004aa 851 .llseek = vfs_lib_ring_buffer_no_llseek,
f3bc08c5
MD
852};
853EXPORT_SYMBOL_GPL(channel_payload_file_operations);
854
855const struct file_operations lib_ring_buffer_payload_file_operations = {
a33c9927 856 .owner = THIS_MODULE,
f3bc08c5
MD
857 .open = lib_ring_buffer_file_open,
858 .release = lib_ring_buffer_file_release,
859 .read = lib_ring_buffer_file_read,
d83004aa 860 .llseek = vfs_lib_ring_buffer_no_llseek,
f3bc08c5
MD
861};
862EXPORT_SYMBOL_GPL(lib_ring_buffer_payload_file_operations);
This page took 0.086112 seconds and 4 git commands to generate.