b6580fa6d42359158195a086c79e16490728849a
[lttng-modules.git] / src / lib / ringbuffer / ring_buffer_splice.c
1 /* SPDX-License-Identifier: GPL-2.0-only
2 *
3 * ring_buffer_splice.c
4 *
5 * Copyright (C) 2002-2005 - Tom Zanussi <zanussi@us.ibm.com>, IBM Corp
6 * Copyright (C) 1999-2005 - Karim Yaghmour <karim@opersys.com>
7 * Copyright (C) 2008-2012 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
8 *
9 * Re-using code from kernel/relay.c, which is why it is licensed under
10 * the GPL-2.0.
11 */
12
13 #include <linux/module.h>
14 #include <linux/fs.h>
15 #include <lttng/kernel-version.h>
16
17 #include <wrapper/splice.h>
18 #include <ringbuffer/backend.h>
19 #include <ringbuffer/frontend.h>
20 #include <ringbuffer/vfs.h>
21
22 #ifdef DEBUG
23 #define printk_dbg(fmt, args...) printk(fmt, args)
24 #else
25 #define printk_dbg(fmt, args...) \
26 do { \
27 /* do nothing but check printf format */ \
28 if (0) \
29 printk(fmt, ## args); \
30 } while (0)
31 #endif
32
33 loff_t vfs_lib_ring_buffer_no_llseek(struct file *file, loff_t offset,
34 int origin)
35 {
36 return -ESPIPE;
37 }
38 EXPORT_SYMBOL_GPL(vfs_lib_ring_buffer_no_llseek);
39
40 /*
41 * Release pages from the buffer so splice pipe_to_file can move them.
42 * Called after the pipe has been populated with buffer pages.
43 */
44 static void lib_ring_buffer_pipe_buf_release(struct pipe_inode_info *pipe,
45 struct pipe_buffer *pbuf)
46 {
47 __free_page(pbuf->page);
48 }
49
50 #if (LTTNG_LINUX_VERSION_CODE >= LTTNG_KERNEL_VERSION(5,8,0))
51 static const struct pipe_buf_operations ring_buffer_pipe_buf_ops = {
52 .release = lib_ring_buffer_pipe_buf_release,
53 .try_steal = generic_pipe_buf_try_steal,
54 .get = generic_pipe_buf_get
55 };
56 #elif (LTTNG_LINUX_VERSION_CODE >= LTTNG_KERNEL_VERSION(5,1,0))
57 static const struct pipe_buf_operations ring_buffer_pipe_buf_ops = {
58 .confirm = generic_pipe_buf_confirm,
59 .release = lib_ring_buffer_pipe_buf_release,
60 .steal = generic_pipe_buf_steal,
61 .get = generic_pipe_buf_get
62 };
63 #elif (LTTNG_LINUX_VERSION_CODE >= LTTNG_KERNEL_VERSION(3,15,0))
64 static const struct pipe_buf_operations ring_buffer_pipe_buf_ops = {
65 .can_merge = 0,
66 .confirm = generic_pipe_buf_confirm,
67 .release = lib_ring_buffer_pipe_buf_release,
68 .steal = generic_pipe_buf_steal,
69 .get = generic_pipe_buf_get
70 };
71 #else
72 static const struct pipe_buf_operations ring_buffer_pipe_buf_ops = {
73 .can_merge = 0,
74 .map = generic_pipe_buf_map,
75 .unmap = generic_pipe_buf_unmap,
76 .confirm = generic_pipe_buf_confirm,
77 .release = lib_ring_buffer_pipe_buf_release,
78 .steal = generic_pipe_buf_steal,
79 .get = generic_pipe_buf_get
80 };
81 #endif
82
83 /*
84 * Page release operation after splice pipe_to_file ends.
85 */
86 static void lib_ring_buffer_page_release(struct splice_pipe_desc *spd,
87 unsigned int i)
88 {
89 __free_page(spd->pages[i]);
90 }
91
92 /*
93 * subbuf_splice_actor - splice up to one subbuf's worth of data
94 */
95 static int subbuf_splice_actor(struct file *in,
96 loff_t *ppos,
97 struct pipe_inode_info *pipe,
98 size_t len,
99 unsigned int flags,
100 struct lib_ring_buffer *buf)
101 {
102 struct channel *chan = buf->backend.chan;
103 const struct lib_ring_buffer_config *config = &chan->backend.config;
104 unsigned int poff, subbuf_pages, nr_pages;
105 struct page *pages[PIPE_DEF_BUFFERS];
106 struct partial_page partial[PIPE_DEF_BUFFERS];
107 struct splice_pipe_desc spd = {
108 .pages = pages,
109 .nr_pages = 0,
110 .partial = partial,
111 #if (LTTNG_LINUX_VERSION_CODE < LTTNG_KERNEL_VERSION(4,12,0))
112 .flags = flags,
113 #endif
114 .ops = &ring_buffer_pipe_buf_ops,
115 .spd_release = lib_ring_buffer_page_release,
116 };
117 unsigned long consumed_old, roffset;
118 unsigned long bytes_avail;
119
120 /*
121 * Check that a GET_SUBBUF ioctl has been done before.
122 */
123 WARN_ON(atomic_long_read(&buf->active_readers) != 1);
124 consumed_old = lib_ring_buffer_get_consumed(config, buf);
125 consumed_old += *ppos;
126
127 /*
128 * Adjust read len, if longer than what is available.
129 * Max read size is 1 subbuffer due to get_subbuf/put_subbuf for
130 * protection.
131 */
132 bytes_avail = chan->backend.subbuf_size;
133 WARN_ON(bytes_avail > chan->backend.buf_size);
134 len = min_t(size_t, len, bytes_avail);
135 subbuf_pages = bytes_avail >> PAGE_SHIFT;
136 nr_pages = min_t(unsigned int, subbuf_pages, PIPE_DEF_BUFFERS);
137 roffset = consumed_old & PAGE_MASK;
138 poff = consumed_old & ~PAGE_MASK;
139 printk_dbg(KERN_DEBUG "LTTng: SPLICE actor len %zu pos %zd write_pos %ld\n",
140 len, (ssize_t)*ppos, lib_ring_buffer_get_offset(config, buf));
141
142 for (; spd.nr_pages < nr_pages; spd.nr_pages++) {
143 unsigned int this_len;
144 unsigned long *pfnp, new_pfn;
145 struct page *new_page;
146 void **virt;
147
148 if (!len)
149 break;
150 printk_dbg(KERN_DEBUG "LTTng: SPLICE actor loop len %zu roffset %ld\n",
151 len, roffset);
152
153 /*
154 * We have to replace the page we are moving into the splice
155 * pipe.
156 */
157 new_page = alloc_pages_node(cpu_to_node(max(buf->backend.cpu,
158 0)),
159 GFP_KERNEL | __GFP_ZERO, 0);
160 if (!new_page)
161 break;
162 new_pfn = page_to_pfn(new_page);
163 this_len = PAGE_SIZE - poff;
164 pfnp = lib_ring_buffer_read_get_pfn(&buf->backend, roffset, &virt);
165 spd.pages[spd.nr_pages] = pfn_to_page(*pfnp);
166 *pfnp = new_pfn;
167 *virt = page_address(new_page);
168 spd.partial[spd.nr_pages].offset = poff;
169 spd.partial[spd.nr_pages].len = this_len;
170
171 poff = 0;
172 roffset += PAGE_SIZE;
173 len -= this_len;
174 }
175
176 if (!spd.nr_pages)
177 return 0;
178
179 return wrapper_splice_to_pipe(pipe, &spd);
180 }
181
182 ssize_t lib_ring_buffer_splice_read(struct file *in, loff_t *ppos,
183 struct pipe_inode_info *pipe, size_t len,
184 unsigned int flags,
185 struct lib_ring_buffer *buf)
186 {
187 struct channel *chan = buf->backend.chan;
188 const struct lib_ring_buffer_config *config = &chan->backend.config;
189 ssize_t spliced;
190 int ret;
191
192 if (config->output != RING_BUFFER_SPLICE)
193 return -EINVAL;
194
195 /*
196 * We require ppos and length to be page-aligned for performance reasons
197 * (no page copy). Size is known using the ioctl
198 * RING_BUFFER_GET_PADDED_SUBBUF_SIZE, which is page-size padded.
199 * We fail when the ppos or len passed is not page-sized, because splice
200 * is not allowed to copy more than the length passed as parameter (so
201 * the ABI does not let us silently copy more than requested to include
202 * padding).
203 */
204 if (*ppos != PAGE_ALIGN(*ppos) || len != PAGE_ALIGN(len))
205 return -EINVAL;
206
207 ret = 0;
208 spliced = 0;
209
210 printk_dbg(KERN_DEBUG "LTTng: SPLICE read len %zu pos %zd\n", len,
211 (ssize_t)*ppos);
212 while (len && !spliced) {
213 ret = subbuf_splice_actor(in, ppos, pipe, len, flags, buf);
214 printk_dbg(KERN_DEBUG "LTTng: SPLICE read loop ret %d\n", ret);
215 if (ret < 0)
216 break;
217 else if (!ret) {
218 if (flags & SPLICE_F_NONBLOCK)
219 ret = -EAGAIN;
220 break;
221 }
222
223 *ppos += ret;
224 if (ret > len)
225 len = 0;
226 else
227 len -= ret;
228 spliced += ret;
229 }
230
231 if (spliced)
232 return spliced;
233
234 return ret;
235 }
236 EXPORT_SYMBOL_GPL(lib_ring_buffer_splice_read);
237
238 ssize_t vfs_lib_ring_buffer_splice_read(struct file *in, loff_t *ppos,
239 struct pipe_inode_info *pipe, size_t len,
240 unsigned int flags)
241 {
242 struct lib_ring_buffer *buf = in->private_data;
243
244 return lib_ring_buffer_splice_read(in, ppos, pipe, len, flags, buf);
245 }
246 EXPORT_SYMBOL_GPL(vfs_lib_ring_buffer_splice_read);
This page took 0.077527 seconds and 3 git commands to generate.