Merge LTTng commit 360f38ea4fee91e2403c03cb43841ef6769aaac7
[lttng-ust.git] / libringbuffer / ring_buffer_backend.c
CommitLineData
852c2936
MD
1/*
2 * ring_buffer_backend.c
3 *
4 * Copyright (C) 2005-2010 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
5 *
6 * Dual LGPL v2.1/GPL v2 license.
7 */
8
4931a13e
MD
9#include "config.h"
10#include "backend.h"
11#include "frontend.h"
852c2936
MD
12
13/**
14 * lib_ring_buffer_backend_allocate - allocate a channel buffer
15 * @config: ring buffer instance configuration
16 * @buf: the buffer struct
17 * @size: total size of the buffer
18 * @num_subbuf: number of subbuffers
19 * @extra_reader_sb: need extra subbuffer for reader
20 */
21static
22int lib_ring_buffer_backend_allocate(const struct lib_ring_buffer_config *config,
23 struct lib_ring_buffer_backend *bufb,
24 size_t size, size_t num_subbuf,
25 int extra_reader_sb)
26{
27 struct channel_backend *chanb = &bufb->chan->backend;
28 unsigned long j, num_pages, num_pages_per_subbuf, page_idx = 0;
29 unsigned long subbuf_size, mmap_offset = 0;
30 unsigned long num_subbuf_alloc;
31 struct page **pages;
32 void **virt;
33 unsigned long i;
34
35 num_pages = size >> PAGE_SHIFT;
36 num_pages_per_subbuf = num_pages >> get_count_order(num_subbuf);
37 subbuf_size = chanb->subbuf_size;
38 num_subbuf_alloc = num_subbuf;
39
40 if (extra_reader_sb) {
41 num_pages += num_pages_per_subbuf; /* Add pages for reader */
42 num_subbuf_alloc++;
43 }
44
45 pages = kmalloc_node(ALIGN(sizeof(*pages) * num_pages,
46 1 << INTERNODE_CACHE_SHIFT),
47 GFP_KERNEL, cpu_to_node(max(bufb->cpu, 0)));
48 if (unlikely(!pages))
49 goto pages_error;
50
51 virt = kmalloc_node(ALIGN(sizeof(*virt) * num_pages,
52 1 << INTERNODE_CACHE_SHIFT),
53 GFP_KERNEL, cpu_to_node(max(bufb->cpu, 0)));
54 if (unlikely(!virt))
55 goto virt_error;
56
57 bufb->array = kmalloc_node(ALIGN(sizeof(*bufb->array)
58 * num_subbuf_alloc,
59 1 << INTERNODE_CACHE_SHIFT),
60 GFP_KERNEL, cpu_to_node(max(bufb->cpu, 0)));
61 if (unlikely(!bufb->array))
62 goto array_error;
63
64 for (i = 0; i < num_pages; i++) {
65 pages[i] = alloc_pages_node(cpu_to_node(max(bufb->cpu, 0)),
66 GFP_KERNEL | __GFP_ZERO, 0);
67 if (unlikely(!pages[i]))
68 goto depopulate;
69 virt[i] = page_address(pages[i]);
70 }
71 bufb->num_pages_per_subbuf = num_pages_per_subbuf;
72
73 /* Allocate backend pages array elements */
74 for (i = 0; i < num_subbuf_alloc; i++) {
75 bufb->array[i] =
76 kzalloc_node(ALIGN(
77 sizeof(struct lib_ring_buffer_backend_pages) +
78 sizeof(struct lib_ring_buffer_backend_page)
79 * num_pages_per_subbuf,
80 1 << INTERNODE_CACHE_SHIFT),
81 GFP_KERNEL, cpu_to_node(max(bufb->cpu, 0)));
82 if (!bufb->array[i])
83 goto free_array;
84 }
85
86 /* Allocate write-side subbuffer table */
87 bufb->buf_wsb = kzalloc_node(ALIGN(
88 sizeof(struct lib_ring_buffer_backend_subbuffer)
89 * num_subbuf,
90 1 << INTERNODE_CACHE_SHIFT),
91 GFP_KERNEL, cpu_to_node(max(bufb->cpu, 0)));
92 if (unlikely(!bufb->buf_wsb))
93 goto free_array;
94
95 for (i = 0; i < num_subbuf; i++)
96 bufb->buf_wsb[i].id = subbuffer_id(config, 0, 1, i);
97
98 /* Assign read-side subbuffer table */
99 if (extra_reader_sb)
100 bufb->buf_rsb.id = subbuffer_id(config, 0, 1,
101 num_subbuf_alloc - 1);
102 else
103 bufb->buf_rsb.id = subbuffer_id(config, 0, 1, 0);
104
105 /* Assign pages to page index */
106 for (i = 0; i < num_subbuf_alloc; i++) {
107 for (j = 0; j < num_pages_per_subbuf; j++) {
108 CHAN_WARN_ON(chanb, page_idx > num_pages);
109 bufb->array[i]->p[j].virt = virt[page_idx];
110 bufb->array[i]->p[j].page = pages[page_idx];
111 page_idx++;
112 }
113 if (config->output == RING_BUFFER_MMAP) {
114 bufb->array[i]->mmap_offset = mmap_offset;
115 mmap_offset += subbuf_size;
116 }
117 }
118
119 /*
120 * If kmalloc ever uses vmalloc underneath, make sure the buffer pages
121 * will not fault.
122 */
123 wrapper_vmalloc_sync_all();
124 kfree(virt);
125 kfree(pages);
126 return 0;
127
128free_array:
129 for (i = 0; (i < num_subbuf_alloc && bufb->array[i]); i++)
130 kfree(bufb->array[i]);
131depopulate:
132 /* Free all allocated pages */
133 for (i = 0; (i < num_pages && pages[i]); i++)
134 __free_page(pages[i]);
135 kfree(bufb->array);
136array_error:
137 kfree(virt);
138virt_error:
139 kfree(pages);
140pages_error:
141 return -ENOMEM;
142}
143
144int lib_ring_buffer_backend_create(struct lib_ring_buffer_backend *bufb,
145 struct channel_backend *chanb, int cpu)
146{
147 const struct lib_ring_buffer_config *config = chanb->config;
148
149 bufb->chan = container_of(chanb, struct channel, backend);
150 bufb->cpu = cpu;
151
152 return lib_ring_buffer_backend_allocate(config, bufb, chanb->buf_size,
153 chanb->num_subbuf,
154 chanb->extra_reader_sb);
155}
156
157void lib_ring_buffer_backend_free(struct lib_ring_buffer_backend *bufb)
158{
159 struct channel_backend *chanb = &bufb->chan->backend;
160 unsigned long i, j, num_subbuf_alloc;
161
162 num_subbuf_alloc = chanb->num_subbuf;
163 if (chanb->extra_reader_sb)
164 num_subbuf_alloc++;
165
166 kfree(bufb->buf_wsb);
167 for (i = 0; i < num_subbuf_alloc; i++) {
168 for (j = 0; j < bufb->num_pages_per_subbuf; j++)
169 __free_page(bufb->array[i]->p[j].page);
170 kfree(bufb->array[i]);
171 }
172 kfree(bufb->array);
173 bufb->allocated = 0;
174}
175
176void lib_ring_buffer_backend_reset(struct lib_ring_buffer_backend *bufb)
177{
178 struct channel_backend *chanb = &bufb->chan->backend;
179 const struct lib_ring_buffer_config *config = chanb->config;
180 unsigned long num_subbuf_alloc;
181 unsigned int i;
182
183 num_subbuf_alloc = chanb->num_subbuf;
184 if (chanb->extra_reader_sb)
185 num_subbuf_alloc++;
186
187 for (i = 0; i < chanb->num_subbuf; i++)
188 bufb->buf_wsb[i].id = subbuffer_id(config, 0, 1, i);
189 if (chanb->extra_reader_sb)
190 bufb->buf_rsb.id = subbuffer_id(config, 0, 1,
191 num_subbuf_alloc - 1);
192 else
193 bufb->buf_rsb.id = subbuffer_id(config, 0, 1, 0);
194
195 for (i = 0; i < num_subbuf_alloc; i++) {
196 /* Don't reset mmap_offset */
197 v_set(config, &bufb->array[i]->records_commit, 0);
198 v_set(config, &bufb->array[i]->records_unread, 0);
199 bufb->array[i]->data_size = 0;
200 /* Don't reset backend page and virt addresses */
201 }
202 /* Don't reset num_pages_per_subbuf, cpu, allocated */
203 v_set(config, &bufb->records_read, 0);
204}
205
206/*
207 * The frontend is responsible for also calling ring_buffer_backend_reset for
208 * each buffer when calling channel_backend_reset.
209 */
210void channel_backend_reset(struct channel_backend *chanb)
211{
212 struct channel *chan = container_of(chanb, struct channel, backend);
213 const struct lib_ring_buffer_config *config = chanb->config;
214
215 /*
216 * Don't reset buf_size, subbuf_size, subbuf_size_order,
217 * num_subbuf_order, buf_size_order, extra_reader_sb, num_subbuf,
218 * priv, notifiers, config, cpumask and name.
219 */
220 chanb->start_tsc = config->cb.ring_buffer_clock_read(chan);
221}
222
223#ifdef CONFIG_HOTPLUG_CPU
224/**
225 * lib_ring_buffer_cpu_hp_callback - CPU hotplug callback
226 * @nb: notifier block
227 * @action: hotplug action to take
228 * @hcpu: CPU number
229 *
230 * Returns the success/failure of the operation. (%NOTIFY_OK, %NOTIFY_BAD)
231 */
232static
233int __cpuinit lib_ring_buffer_cpu_hp_callback(struct notifier_block *nb,
234 unsigned long action,
235 void *hcpu)
236{
237 unsigned int cpu = (unsigned long)hcpu;
238 struct channel_backend *chanb = container_of(nb, struct channel_backend,
239 cpu_hp_notifier);
240 const struct lib_ring_buffer_config *config = chanb->config;
241 struct lib_ring_buffer *buf;
242 int ret;
243
244 CHAN_WARN_ON(chanb, config->alloc == RING_BUFFER_ALLOC_GLOBAL);
245
246 switch (action) {
247 case CPU_UP_PREPARE:
248 case CPU_UP_PREPARE_FROZEN:
249 buf = per_cpu_ptr(chanb->buf, cpu);
250 ret = lib_ring_buffer_create(buf, chanb, cpu);
251 if (ret) {
252 printk(KERN_ERR
253 "ring_buffer_cpu_hp_callback: cpu %d "
254 "buffer creation failed\n", cpu);
255 return NOTIFY_BAD;
256 }
257 break;
258 case CPU_DEAD:
259 case CPU_DEAD_FROZEN:
260 /* No need to do a buffer switch here, because it will happen
261 * when tracing is stopped, or will be done by switch timer CPU
262 * DEAD callback. */
263 break;
264 }
265 return NOTIFY_OK;
266}
267#endif
268
269/**
270 * channel_backend_init - initialize a channel backend
271 * @chanb: channel backend
272 * @name: channel name
273 * @config: client ring buffer configuration
274 * @priv: client private data
275 * @parent: dentry of parent directory, %NULL for root directory
276 * @subbuf_size: size of sub-buffers (> PAGE_SIZE, power of 2)
277 * @num_subbuf: number of sub-buffers (power of 2)
278 *
279 * Returns channel pointer if successful, %NULL otherwise.
280 *
281 * Creates per-cpu channel buffers using the sizes and attributes
282 * specified. The created channel buffer files will be named
283 * name_0...name_N-1. File permissions will be %S_IRUSR.
284 *
285 * Called with CPU hotplug disabled.
286 */
287int channel_backend_init(struct channel_backend *chanb,
288 const char *name,
289 const struct lib_ring_buffer_config *config,
290 void *priv, size_t subbuf_size, size_t num_subbuf)
291{
292 struct channel *chan = container_of(chanb, struct channel, backend);
293 unsigned int i;
294 int ret;
295
296 if (!name)
297 return -EPERM;
298
299 if (!(subbuf_size && num_subbuf))
300 return -EPERM;
301
302 /* Check that the subbuffer size is larger than a page. */
303 if (subbuf_size < PAGE_SIZE)
304 return -EINVAL;
305
306 /*
307 * Make sure the number of subbuffers and subbuffer size are power of 2.
308 */
309 CHAN_WARN_ON(chanb, hweight32(subbuf_size) != 1);
310 CHAN_WARN_ON(chanb, hweight32(num_subbuf) != 1);
311
312 ret = subbuffer_id_check_index(config, num_subbuf);
313 if (ret)
314 return ret;
315
316 chanb->priv = priv;
317 chanb->buf_size = num_subbuf * subbuf_size;
318 chanb->subbuf_size = subbuf_size;
319 chanb->buf_size_order = get_count_order(chanb->buf_size);
320 chanb->subbuf_size_order = get_count_order(subbuf_size);
321 chanb->num_subbuf_order = get_count_order(num_subbuf);
322 chanb->extra_reader_sb =
323 (config->mode == RING_BUFFER_OVERWRITE) ? 1 : 0;
324 chanb->num_subbuf = num_subbuf;
325 strlcpy(chanb->name, name, NAME_MAX);
326 chanb->config = config;
327
328 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
329 if (!zalloc_cpumask_var(&chanb->cpumask, GFP_KERNEL))
330 return -ENOMEM;
331 }
332
333 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
334 /* Allocating the buffer per-cpu structures */
335 chanb->buf = alloc_percpu(struct lib_ring_buffer);
336 if (!chanb->buf)
337 goto free_cpumask;
338
339 /*
340 * In case of non-hotplug cpu, if the ring-buffer is allocated
341 * in early initcall, it will not be notified of secondary cpus.
342 * In that off case, we need to allocate for all possible cpus.
343 */
344#ifdef CONFIG_HOTPLUG_CPU
345 /*
346 * buf->backend.allocated test takes care of concurrent CPU
347 * hotplug.
348 * Priority higher than frontend, so we create the ring buffer
349 * before we start the timer.
350 */
351 chanb->cpu_hp_notifier.notifier_call =
352 lib_ring_buffer_cpu_hp_callback;
353 chanb->cpu_hp_notifier.priority = 5;
354 register_hotcpu_notifier(&chanb->cpu_hp_notifier);
355
356 get_online_cpus();
357 for_each_online_cpu(i) {
358 ret = lib_ring_buffer_create(per_cpu_ptr(chanb->buf, i),
359 chanb, i);
360 if (ret)
361 goto free_bufs; /* cpu hotplug locked */
362 }
363 put_online_cpus();
364#else
365 for_each_possible_cpu(i) {
366 ret = lib_ring_buffer_create(per_cpu_ptr(chanb->buf, i),
367 chanb, i);
368 if (ret)
369 goto free_bufs; /* cpu hotplug locked */
370 }
371#endif
372 } else {
373 chanb->buf = kzalloc(sizeof(struct lib_ring_buffer), GFP_KERNEL);
374 if (!chanb->buf)
375 goto free_cpumask;
376 ret = lib_ring_buffer_create(chanb->buf, chanb, -1);
377 if (ret)
378 goto free_bufs;
379 }
380 chanb->start_tsc = config->cb.ring_buffer_clock_read(chan);
381
382 return 0;
383
384free_bufs:
385 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
386 for_each_possible_cpu(i) {
387 struct lib_ring_buffer *buf = per_cpu_ptr(chanb->buf, i);
388
389 if (!buf->backend.allocated)
390 continue;
391 lib_ring_buffer_free(buf);
392 }
393#ifdef CONFIG_HOTPLUG_CPU
394 put_online_cpus();
395#endif
396 free_percpu(chanb->buf);
397 } else
398 kfree(chanb->buf);
399free_cpumask:
400 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
401 free_cpumask_var(chanb->cpumask);
402 return -ENOMEM;
403}
404
405/**
406 * channel_backend_unregister_notifiers - unregister notifiers
407 * @chan: the channel
408 *
409 * Holds CPU hotplug.
410 */
411void channel_backend_unregister_notifiers(struct channel_backend *chanb)
412{
413 const struct lib_ring_buffer_config *config = chanb->config;
414
415 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
416 unregister_hotcpu_notifier(&chanb->cpu_hp_notifier);
417}
418
419/**
420 * channel_backend_free - destroy the channel
421 * @chan: the channel
422 *
423 * Destroy all channel buffers and frees the channel.
424 */
425void channel_backend_free(struct channel_backend *chanb)
426{
427 const struct lib_ring_buffer_config *config = chanb->config;
428 unsigned int i;
429
430 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
431 for_each_possible_cpu(i) {
432 struct lib_ring_buffer *buf = per_cpu_ptr(chanb->buf, i);
433
434 if (!buf->backend.allocated)
435 continue;
436 lib_ring_buffer_free(buf);
437 }
438 free_cpumask_var(chanb->cpumask);
439 free_percpu(chanb->buf);
440 } else {
441 struct lib_ring_buffer *buf = chanb->buf;
442
443 CHAN_WARN_ON(chanb, !buf->backend.allocated);
444 lib_ring_buffer_free(buf);
445 kfree(buf);
446 }
447}
448
449/**
450 * lib_ring_buffer_write - write data to a ring_buffer buffer.
451 * @bufb : buffer backend
452 * @offset : offset within the buffer
453 * @src : source address
454 * @len : length to write
455 * @pagecpy : page size copied so far
456 */
457void _lib_ring_buffer_write(struct lib_ring_buffer_backend *bufb, size_t offset,
458 const void *src, size_t len, ssize_t pagecpy)
459{
460 struct channel_backend *chanb = &bufb->chan->backend;
461 const struct lib_ring_buffer_config *config = chanb->config;
462 size_t sbidx, index;
463 struct lib_ring_buffer_backend_pages *rpages;
464 unsigned long sb_bindex, id;
465
466 do {
467 len -= pagecpy;
468 src += pagecpy;
469 offset += pagecpy;
470 sbidx = offset >> chanb->subbuf_size_order;
471 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
472
473 /*
474 * Underlying layer should never ask for writes across
475 * subbuffers.
476 */
477 CHAN_WARN_ON(chanb, offset >= chanb->buf_size);
478
479 pagecpy = min_t(size_t, len, PAGE_SIZE - (offset & ~PAGE_MASK));
480 id = bufb->buf_wsb[sbidx].id;
481 sb_bindex = subbuffer_id_get_index(config, id);
482 rpages = bufb->array[sb_bindex];
483 CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
484 && subbuffer_id_is_noref(config, id));
485 lib_ring_buffer_do_copy(config,
486 rpages->p[index].virt
487 + (offset & ~PAGE_MASK),
488 src, pagecpy);
489 } while (unlikely(len != pagecpy));
490}
491EXPORT_SYMBOL_GPL(_lib_ring_buffer_write);
492
493/**
494 * lib_ring_buffer_read - read data from ring_buffer_buffer.
495 * @bufb : buffer backend
496 * @offset : offset within the buffer
497 * @dest : destination address
498 * @len : length to copy to destination
499 *
500 * Should be protected by get_subbuf/put_subbuf.
501 * Returns the length copied.
502 */
503size_t lib_ring_buffer_read(struct lib_ring_buffer_backend *bufb, size_t offset,
504 void *dest, size_t len)
505{
506 struct channel_backend *chanb = &bufb->chan->backend;
507 const struct lib_ring_buffer_config *config = chanb->config;
508 size_t index;
509 ssize_t pagecpy, orig_len;
510 struct lib_ring_buffer_backend_pages *rpages;
511 unsigned long sb_bindex, id;
512
513 orig_len = len;
514 offset &= chanb->buf_size - 1;
515 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
516 if (unlikely(!len))
517 return 0;
518 for (;;) {
519 pagecpy = min_t(size_t, len, PAGE_SIZE - (offset & ~PAGE_MASK));
520 id = bufb->buf_rsb.id;
521 sb_bindex = subbuffer_id_get_index(config, id);
522 rpages = bufb->array[sb_bindex];
523 CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
524 && subbuffer_id_is_noref(config, id));
525 memcpy(dest, rpages->p[index].virt + (offset & ~PAGE_MASK),
526 pagecpy);
527 len -= pagecpy;
528 if (likely(!len))
529 break;
530 dest += pagecpy;
531 offset += pagecpy;
532 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
533 /*
534 * Underlying layer should never ask for reads across
535 * subbuffers.
536 */
537 CHAN_WARN_ON(chanb, offset >= chanb->buf_size);
538 }
539 return orig_len;
540}
541EXPORT_SYMBOL_GPL(lib_ring_buffer_read);
542
543/**
544 * __lib_ring_buffer_copy_to_user - read data from ring_buffer to userspace
545 * @bufb : buffer backend
546 * @offset : offset within the buffer
547 * @dest : destination userspace address
548 * @len : length to copy to destination
549 *
550 * Should be protected by get_subbuf/put_subbuf.
551 * access_ok() must have been performed on dest addresses prior to call this
552 * function.
553 * Returns -EFAULT on error, 0 if ok.
554 */
555int __lib_ring_buffer_copy_to_user(struct lib_ring_buffer_backend *bufb,
556 size_t offset, void __user *dest, size_t len)
557{
558 struct channel_backend *chanb = &bufb->chan->backend;
559 const struct lib_ring_buffer_config *config = chanb->config;
560 size_t index;
561 ssize_t pagecpy, orig_len;
562 struct lib_ring_buffer_backend_pages *rpages;
563 unsigned long sb_bindex, id;
564
565 orig_len = len;
566 offset &= chanb->buf_size - 1;
567 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
568 if (unlikely(!len))
569 return 0;
570 for (;;) {
571 pagecpy = min_t(size_t, len, PAGE_SIZE - (offset & ~PAGE_MASK));
572 id = bufb->buf_rsb.id;
573 sb_bindex = subbuffer_id_get_index(config, id);
574 rpages = bufb->array[sb_bindex];
575 CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
576 && subbuffer_id_is_noref(config, id));
577 if (__copy_to_user(dest,
578 rpages->p[index].virt + (offset & ~PAGE_MASK),
579 pagecpy))
580 return -EFAULT;
581 len -= pagecpy;
582 if (likely(!len))
583 break;
584 dest += pagecpy;
585 offset += pagecpy;
586 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
587 /*
588 * Underlying layer should never ask for reads across
589 * subbuffers.
590 */
591 CHAN_WARN_ON(chanb, offset >= chanb->buf_size);
592 }
593 return 0;
594}
595EXPORT_SYMBOL_GPL(__lib_ring_buffer_copy_to_user);
596
597/**
598 * lib_ring_buffer_read_cstr - read a C-style string from ring_buffer.
599 * @bufb : buffer backend
600 * @offset : offset within the buffer
601 * @dest : destination address
602 * @len : destination's length
603 *
604 * return string's length
605 * Should be protected by get_subbuf/put_subbuf.
606 */
607int lib_ring_buffer_read_cstr(struct lib_ring_buffer_backend *bufb, size_t offset,
608 void *dest, size_t len)
609{
610 struct channel_backend *chanb = &bufb->chan->backend;
611 const struct lib_ring_buffer_config *config = chanb->config;
612 size_t index;
613 ssize_t pagecpy, pagelen, strpagelen, orig_offset;
614 char *str;
615 struct lib_ring_buffer_backend_pages *rpages;
616 unsigned long sb_bindex, id;
617
618 offset &= chanb->buf_size - 1;
619 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
620 orig_offset = offset;
621 for (;;) {
622 id = bufb->buf_rsb.id;
623 sb_bindex = subbuffer_id_get_index(config, id);
624 rpages = bufb->array[sb_bindex];
625 CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
626 && subbuffer_id_is_noref(config, id));
627 str = (char *)rpages->p[index].virt + (offset & ~PAGE_MASK);
628 pagelen = PAGE_SIZE - (offset & ~PAGE_MASK);
629 strpagelen = strnlen(str, pagelen);
630 if (len) {
631 pagecpy = min_t(size_t, len, strpagelen);
632 if (dest) {
633 memcpy(dest, str, pagecpy);
634 dest += pagecpy;
635 }
636 len -= pagecpy;
637 }
638 offset += strpagelen;
639 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
640 if (strpagelen < pagelen)
641 break;
642 /*
643 * Underlying layer should never ask for reads across
644 * subbuffers.
645 */
646 CHAN_WARN_ON(chanb, offset >= chanb->buf_size);
647 }
648 if (dest && len)
649 ((char *)dest)[0] = 0;
650 return offset - orig_offset;
651}
652EXPORT_SYMBOL_GPL(lib_ring_buffer_read_cstr);
653
654/**
655 * lib_ring_buffer_read_get_page - Get a whole page to read from
656 * @bufb : buffer backend
657 * @offset : offset within the buffer
658 * @virt : pointer to page address (output)
659 *
660 * Should be protected by get_subbuf/put_subbuf.
661 * Returns the pointer to the page struct pointer.
662 */
663struct page **lib_ring_buffer_read_get_page(struct lib_ring_buffer_backend *bufb,
664 size_t offset, void ***virt)
665{
666 size_t index;
667 struct lib_ring_buffer_backend_pages *rpages;
668 struct channel_backend *chanb = &bufb->chan->backend;
669 const struct lib_ring_buffer_config *config = chanb->config;
670 unsigned long sb_bindex, id;
671
672 offset &= chanb->buf_size - 1;
673 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
674 id = bufb->buf_rsb.id;
675 sb_bindex = subbuffer_id_get_index(config, id);
676 rpages = bufb->array[sb_bindex];
677 CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
678 && subbuffer_id_is_noref(config, id));
679 *virt = &rpages->p[index].virt;
680 return &rpages->p[index].page;
681}
682EXPORT_SYMBOL_GPL(lib_ring_buffer_read_get_page);
683
684/**
685 * lib_ring_buffer_read_offset_address - get address of a buffer location
686 * @bufb : buffer backend
687 * @offset : offset within the buffer.
688 *
689 * Return the address where a given offset is located (for read).
690 * Should be used to get the current subbuffer header pointer. Given we know
691 * it's never on a page boundary, it's safe to write directly to this address,
692 * as long as the write is never bigger than a page size.
693 */
694void *lib_ring_buffer_read_offset_address(struct lib_ring_buffer_backend *bufb,
695 size_t offset)
696{
697 size_t index;
698 struct lib_ring_buffer_backend_pages *rpages;
699 struct channel_backend *chanb = &bufb->chan->backend;
700 const struct lib_ring_buffer_config *config = chanb->config;
701 unsigned long sb_bindex, id;
702
703 offset &= chanb->buf_size - 1;
704 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
705 id = bufb->buf_rsb.id;
706 sb_bindex = subbuffer_id_get_index(config, id);
707 rpages = bufb->array[sb_bindex];
708 CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
709 && subbuffer_id_is_noref(config, id));
710 return rpages->p[index].virt + (offset & ~PAGE_MASK);
711}
712EXPORT_SYMBOL_GPL(lib_ring_buffer_read_offset_address);
713
714/**
715 * lib_ring_buffer_offset_address - get address of a location within the buffer
716 * @bufb : buffer backend
717 * @offset : offset within the buffer.
718 *
719 * Return the address where a given offset is located.
720 * Should be used to get the current subbuffer header pointer. Given we know
721 * it's always at the beginning of a page, it's safe to write directly to this
722 * address, as long as the write is never bigger than a page size.
723 */
724void *lib_ring_buffer_offset_address(struct lib_ring_buffer_backend *bufb,
725 size_t offset)
726{
727 size_t sbidx, index;
728 struct lib_ring_buffer_backend_pages *rpages;
729 struct channel_backend *chanb = &bufb->chan->backend;
730 const struct lib_ring_buffer_config *config = chanb->config;
731 unsigned long sb_bindex, id;
732
733 offset &= chanb->buf_size - 1;
734 sbidx = offset >> chanb->subbuf_size_order;
735 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
736 id = bufb->buf_wsb[sbidx].id;
737 sb_bindex = subbuffer_id_get_index(config, id);
738 rpages = bufb->array[sb_bindex];
739 CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
740 && subbuffer_id_is_noref(config, id));
741 return rpages->p[index].virt + (offset & ~PAGE_MASK);
742}
743EXPORT_SYMBOL_GPL(lib_ring_buffer_offset_address);
This page took 0.051076 seconds and 4 git commands to generate.