Ring buffer: use shmp (shared-memory pointers) for per-channel shm structures
[lttng-ust.git] / libringbuffer / ring_buffer_backend.c
1 /*
2 * ring_buffer_backend.c
3 *
4 * Copyright (C) 2005-2010 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
5 *
6 * Dual LGPL v2.1/GPL v2 license.
7 */
8
9 #include <urcu/arch.h>
10
11 #include "ust/core.h"
12
13 #include "config.h"
14 #include "backend.h"
15 #include "frontend.h"
16 #include "smp.h"
17
18 /**
19 * lib_ring_buffer_backend_allocate - allocate a channel buffer
20 * @config: ring buffer instance configuration
21 * @buf: the buffer struct
22 * @size: total size of the buffer
23 * @num_subbuf: number of subbuffers
24 * @extra_reader_sb: need extra subbuffer for reader
25 */
26 static
27 int lib_ring_buffer_backend_allocate(const struct lib_ring_buffer_config *config,
28 struct lib_ring_buffer_backend *bufb,
29 size_t size, size_t num_subbuf,
30 int extra_reader_sb,
31 struct shm_header *shm_header)
32 {
33 struct channel_backend *chanb = &shmp(bufb->chan)->backend;
34 unsigned long subbuf_size, mmap_offset = 0;
35 unsigned long num_subbuf_alloc;
36 unsigned long i;
37
38 subbuf_size = chanb->subbuf_size;
39 num_subbuf_alloc = num_subbuf;
40
41 if (extra_reader_sb)
42 num_subbuf_alloc++;
43
44 set_shmp(bufb->array, zalloc_shm(shm_header,
45 sizeof(*bufb->array) * num_subbuf_alloc));
46 if (unlikely(!shmp(bufb->array)))
47 goto array_error;
48
49 set_shmp(bufb->memory_map, zalloc_shm(shm_header,
50 subbuf_size * num_subbuf_alloc));
51 if (unlikely(!shmp(bufb->memory_map)))
52 goto memory_map_error;
53
54 /* Allocate backend pages array elements */
55 for (i = 0; i < num_subbuf_alloc; i++) {
56 set_shmp(bufb->array[i],
57 zalloc_shm(shm_header,
58 sizeof(struct lib_ring_buffer_backend_pages) +
59 subbuf_size));
60 if (!shmp(bufb->array[i]))
61 goto free_array;
62 }
63
64 /* Allocate write-side subbuffer table */
65 bufb->buf_wsb = zalloc_shm(shm_header,
66 sizeof(struct lib_ring_buffer_backend_subbuffer)
67 * num_subbuf);
68 if (unlikely(!shmp(bufb->buf_wsb)))
69 goto free_array;
70
71 for (i = 0; i < num_subbuf; i++)
72 shmp(bufb->buf_wsb)[i].id = subbuffer_id(config, 0, 1, i);
73
74 /* Assign read-side subbuffer table */
75 if (extra_reader_sb)
76 bufb->buf_rsb.id = subbuffer_id(config, 0, 1,
77 num_subbuf_alloc - 1);
78 else
79 bufb->buf_rsb.id = subbuffer_id(config, 0, 1, 0);
80
81 /* Assign pages to page index */
82 for (i = 0; i < num_subbuf_alloc; i++) {
83 set_shmp(shmp(bufb->array)[i]->p,
84 &shmp(bufb->memory_map)[i * subbuf_size]);
85 if (config->output == RING_BUFFER_MMAP) {
86 shmp(bufb->array)[i]->mmap_offset = mmap_offset;
87 mmap_offset += subbuf_size;
88 }
89 }
90
91 return 0;
92
93 free_array:
94 /* bufb->array[i] will be freed by shm teardown */
95 memory_map_error:
96 /* bufb->array will be freed by shm teardown */
97 array_error:
98 return -ENOMEM;
99 }
100
101 int lib_ring_buffer_backend_create(struct lib_ring_buffer_backend *bufb,
102 struct channel_backend *chanb, int cpu,
103 struct shm_header *shm_header)
104 {
105 const struct lib_ring_buffer_config *config = chanb->config;
106
107 set_shmp(&bufb->chan, caa_container_of(chanb, struct channel, backend));
108 bufb->cpu = cpu;
109
110 return lib_ring_buffer_backend_allocate(config, bufb, chanb->buf_size,
111 chanb->num_subbuf,
112 chanb->extra_reader_sb,
113 shm_header);
114 }
115
116 void lib_ring_buffer_backend_free(struct lib_ring_buffer_backend *bufb)
117 {
118 /* bufb->buf_wsb will be freed by shm teardown */
119 /* bufb->array[i] will be freed by shm teardown */
120 /* bufb->array will be freed by shm teardown */
121 bufb->allocated = 0;
122 }
123
124 void lib_ring_buffer_backend_reset(struct lib_ring_buffer_backend *bufb)
125 {
126 struct channel_backend *chanb = &shmp(bufb->chan)->backend;
127 const struct lib_ring_buffer_config *config = chanb->config;
128 unsigned long num_subbuf_alloc;
129 unsigned int i;
130
131 num_subbuf_alloc = chanb->num_subbuf;
132 if (chanb->extra_reader_sb)
133 num_subbuf_alloc++;
134
135 for (i = 0; i < chanb->num_subbuf; i++)
136 shmp(bufb->buf_wsb)[i].id = subbuffer_id(config, 0, 1, i);
137 if (chanb->extra_reader_sb)
138 bufb->buf_rsb.id = subbuffer_id(config, 0, 1,
139 num_subbuf_alloc - 1);
140 else
141 bufb->buf_rsb.id = subbuffer_id(config, 0, 1, 0);
142
143 for (i = 0; i < num_subbuf_alloc; i++) {
144 /* Don't reset mmap_offset */
145 v_set(config, &shmp(bufb->array)[i]->records_commit, 0);
146 v_set(config, &shmp(bufb->array)[i]->records_unread, 0);
147 shmp(bufb->array)[i]->data_size = 0;
148 /* Don't reset backend page and virt addresses */
149 }
150 /* Don't reset num_pages_per_subbuf, cpu, allocated */
151 v_set(config, &bufb->records_read, 0);
152 }
153
154 /*
155 * The frontend is responsible for also calling ring_buffer_backend_reset for
156 * each buffer when calling channel_backend_reset.
157 */
158 void channel_backend_reset(struct channel_backend *chanb)
159 {
160 struct channel *chan = caa_container_of(chanb, struct channel, backend);
161 const struct lib_ring_buffer_config *config = chanb->config;
162
163 /*
164 * Don't reset buf_size, subbuf_size, subbuf_size_order,
165 * num_subbuf_order, buf_size_order, extra_reader_sb, num_subbuf,
166 * priv, notifiers, config, cpumask and name.
167 */
168 chanb->start_tsc = config->cb.ring_buffer_clock_read(chan);
169 }
170
171 /**
172 * channel_backend_init - initialize a channel backend
173 * @chanb: channel backend
174 * @name: channel name
175 * @config: client ring buffer configuration
176 * @priv: client private data
177 * @parent: dentry of parent directory, %NULL for root directory
178 * @subbuf_size: size of sub-buffers (> PAGE_SIZE, power of 2)
179 * @num_subbuf: number of sub-buffers (power of 2)
180 * @shm_header: shared memory header
181 *
182 * Returns channel pointer if successful, %NULL otherwise.
183 *
184 * Creates per-cpu channel buffers using the sizes and attributes
185 * specified. The created channel buffer files will be named
186 * name_0...name_N-1. File permissions will be %S_IRUSR.
187 *
188 * Called with CPU hotplug disabled.
189 */
190 int channel_backend_init(struct channel_backend *chanb,
191 const char *name,
192 const struct lib_ring_buffer_config *config,
193 void *priv, size_t subbuf_size, size_t num_subbuf,
194 struct shm_header *shm_header)
195 {
196 struct channel *chan = caa_container_of(chanb, struct channel, backend);
197 unsigned int i;
198 int ret;
199
200 if (!name)
201 return -EPERM;
202
203 if (!(subbuf_size && num_subbuf))
204 return -EPERM;
205
206 /* Check that the subbuffer size is larger than a page. */
207 if (subbuf_size < PAGE_SIZE)
208 return -EINVAL;
209
210 /*
211 * Make sure the number of subbuffers and subbuffer size are power of 2.
212 */
213 CHAN_WARN_ON(chanb, hweight32(subbuf_size) != 1);
214 CHAN_WARN_ON(chanb, hweight32(num_subbuf) != 1);
215
216 ret = subbuffer_id_check_index(config, num_subbuf);
217 if (ret)
218 return ret;
219
220 chanb->priv = priv;
221 chanb->buf_size = num_subbuf * subbuf_size;
222 chanb->subbuf_size = subbuf_size;
223 chanb->buf_size_order = get_count_order(chanb->buf_size);
224 chanb->subbuf_size_order = get_count_order(subbuf_size);
225 chanb->num_subbuf_order = get_count_order(num_subbuf);
226 chanb->extra_reader_sb =
227 (config->mode == RING_BUFFER_OVERWRITE) ? 1 : 0;
228 chanb->num_subbuf = num_subbuf;
229 strncpy(chanb->name, name, NAME_MAX);
230 chanb->name[NAME_MAX - 1] = '\0';
231 chanb->config = config;
232
233 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
234 struct lib_ring_buffer *buf;
235 size_t alloc_size;
236
237 /* Allocating the buffer per-cpu structures */
238 alloc_size = sizeof(struct lib_ring_buffer);
239 buf = zalloc_shm(shm_header, alloc_size * num_possible_cpus());
240 if (!buf)
241 goto end;
242 set_shmp(chanb->buf, buf);
243
244 /*
245 * We need to allocate for all possible cpus.
246 */
247 for_each_possible_cpu(i) {
248 ret = lib_ring_buffer_create(&shmp(chanb->buf)[i],
249 chanb, i, shm_header);
250 if (ret)
251 goto free_bufs; /* cpu hotplug locked */
252 }
253 } else {
254 struct lib_ring_buffer *buf;
255 size_t alloc_size;
256
257 alloc_size = sizeof(struct lib_ring_buffer);
258 chanb->buf = zmalloc(sizeof(struct lib_ring_buffer));
259 buf = zalloc_shm(shm_header, alloc_size);
260 if (!buf)
261 goto end;
262 set_shmp(chanb->buf, buf);
263 ret = lib_ring_buffer_create(shmp(chanb->buf), chanb, -1,
264 shm_header);
265 if (ret)
266 goto free_bufs;
267 }
268 chanb->start_tsc = config->cb.ring_buffer_clock_read(chan);
269
270 return 0;
271
272 free_bufs:
273 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
274 for_each_possible_cpu(i) {
275 struct lib_ring_buffer *buf = &shmp(chanb->buf)[i];
276
277 if (!buf->backend.allocated)
278 continue;
279 lib_ring_buffer_free(buf);
280 }
281 }
282 /* We only free the buffer data upon shm teardown */
283 end:
284 return -ENOMEM;
285 }
286
287 /**
288 * channel_backend_free - destroy the channel
289 * @chan: the channel
290 *
291 * Destroy all channel buffers and frees the channel.
292 */
293 void channel_backend_free(struct channel_backend *chanb)
294 {
295 const struct lib_ring_buffer_config *config = chanb->config;
296 unsigned int i;
297
298 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
299 for_each_possible_cpu(i) {
300 struct lib_ring_buffer *buf = &shmp(chanb->buf)[i];
301
302 if (!buf->backend.allocated)
303 continue;
304 lib_ring_buffer_free(buf);
305 }
306 } else {
307 struct lib_ring_buffer *buf = shmp(chanb->buf);
308
309 CHAN_WARN_ON(chanb, !buf->backend.allocated);
310 lib_ring_buffer_free(buf);
311 }
312 /* We only free the buffer data upon shm teardown */
313 }
314
315 /**
316 * lib_ring_buffer_read - read data from ring_buffer_buffer.
317 * @bufb : buffer backend
318 * @offset : offset within the buffer
319 * @dest : destination address
320 * @len : length to copy to destination
321 *
322 * Should be protected by get_subbuf/put_subbuf.
323 * Returns the length copied.
324 */
325 size_t lib_ring_buffer_read(struct lib_ring_buffer_backend *bufb, size_t offset,
326 void *dest, size_t len)
327 {
328 struct channel_backend *chanb = &shmp(bufb->chan)->backend;
329 const struct lib_ring_buffer_config *config = chanb->config;
330 ssize_t orig_len;
331 struct lib_ring_buffer_backend_pages *rpages;
332 unsigned long sb_bindex, id;
333
334 orig_len = len;
335 offset &= chanb->buf_size - 1;
336
337 if (unlikely(!len))
338 return 0;
339 id = bufb->buf_rsb.id;
340 sb_bindex = subbuffer_id_get_index(config, id);
341 rpages = shmp(bufb->array)[sb_bindex];
342 /*
343 * Underlying layer should never ask for reads across
344 * subbuffers.
345 */
346 CHAN_WARN_ON(chanb, offset >= chanb->buf_size);
347 CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
348 && subbuffer_id_is_noref(config, id));
349 memcpy(dest, shmp(rpages->p) + (offset & ~(chanb->subbuf_size - 1)), len);
350 return orig_len;
351 }
352
353 /**
354 * lib_ring_buffer_read_cstr - read a C-style string from ring_buffer.
355 * @bufb : buffer backend
356 * @offset : offset within the buffer
357 * @dest : destination address
358 * @len : destination's length
359 *
360 * return string's length
361 * Should be protected by get_subbuf/put_subbuf.
362 */
363 int lib_ring_buffer_read_cstr(struct lib_ring_buffer_backend *bufb, size_t offset,
364 void *dest, size_t len)
365 {
366 struct channel_backend *chanb = &shmp(bufb->chan)->backend;
367 const struct lib_ring_buffer_config *config = chanb->config;
368 ssize_t string_len, orig_offset;
369 char *str;
370 struct lib_ring_buffer_backend_pages *rpages;
371 unsigned long sb_bindex, id;
372
373 offset &= chanb->buf_size - 1;
374 orig_offset = offset;
375 id = bufb->buf_rsb.id;
376 sb_bindex = subbuffer_id_get_index(config, id);
377 rpages = shmp(bufb->array)[sb_bindex];
378 /*
379 * Underlying layer should never ask for reads across
380 * subbuffers.
381 */
382 CHAN_WARN_ON(chanb, offset >= chanb->buf_size);
383 CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
384 && subbuffer_id_is_noref(config, id));
385 str = (char *)shmp(rpages->p) + (offset & ~(chanb->subbuf_size - 1));
386 string_len = strnlen(str, len);
387 if (dest && len) {
388 memcpy(dest, str, string_len);
389 ((char *)dest)[0] = 0;
390 }
391 return offset - orig_offset;
392 }
393
394 /**
395 * lib_ring_buffer_read_offset_address - get address of a buffer location
396 * @bufb : buffer backend
397 * @offset : offset within the buffer.
398 *
399 * Return the address where a given offset is located (for read).
400 * Should be used to get the current subbuffer header pointer. Given we know
401 * it's never on a page boundary, it's safe to write directly to this address,
402 * as long as the write is never bigger than a page size.
403 */
404 void *lib_ring_buffer_read_offset_address(struct lib_ring_buffer_backend *bufb,
405 size_t offset)
406 {
407 struct lib_ring_buffer_backend_pages *rpages;
408 struct channel_backend *chanb = &shmp(bufb->chan)->backend;
409 const struct lib_ring_buffer_config *config = chanb->config;
410 unsigned long sb_bindex, id;
411
412 offset &= chanb->buf_size - 1;
413 id = bufb->buf_rsb.id;
414 sb_bindex = subbuffer_id_get_index(config, id);
415 rpages = shmp(bufb->array)[sb_bindex];
416 CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
417 && subbuffer_id_is_noref(config, id));
418 return shmp(rpages->p) + (offset & ~(chanb->subbuf_size - 1));
419 }
420
421 /**
422 * lib_ring_buffer_offset_address - get address of a location within the buffer
423 * @bufb : buffer backend
424 * @offset : offset within the buffer.
425 *
426 * Return the address where a given offset is located.
427 * Should be used to get the current subbuffer header pointer. Given we know
428 * it's always at the beginning of a page, it's safe to write directly to this
429 * address, as long as the write is never bigger than a page size.
430 */
431 void *lib_ring_buffer_offset_address(struct lib_ring_buffer_backend *bufb,
432 size_t offset)
433 {
434 size_t sbidx;
435 struct lib_ring_buffer_backend_pages *rpages;
436 struct channel_backend *chanb = &shmp(bufb->chan)->backend;
437 const struct lib_ring_buffer_config *config = chanb->config;
438 unsigned long sb_bindex, id;
439
440 offset &= chanb->buf_size - 1;
441 sbidx = offset >> chanb->subbuf_size_order;
442 id = shmp(bufb->buf_wsb)[sbidx].id;
443 sb_bindex = subbuffer_id_get_index(config, id);
444 rpages = shmp(bufb->array)[sb_bindex];
445 CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
446 && subbuffer_id_is_noref(config, id));
447 return shmp(rpages->p) + (offset & ~(chanb->subbuf_size - 1));
448 }
This page took 0.04017 seconds and 5 git commands to generate.