Commit changes prior to shmp read-only header
[lttng-ust.git] / libringbuffer / ring_buffer_backend.c
CommitLineData
852c2936
MD
1/*
2 * ring_buffer_backend.c
3 *
4 * Copyright (C) 2005-2010 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
5 *
6 * Dual LGPL v2.1/GPL v2 license.
7 */
8
14641deb
MD
9#include <urcu/arch.h>
10
11#include "ust/core.h"
12
4931a13e
MD
13#include "config.h"
14#include "backend.h"
15#include "frontend.h"
a6352fd4 16#include "smp.h"
852c2936
MD
17
18/**
19 * lib_ring_buffer_backend_allocate - allocate a channel buffer
20 * @config: ring buffer instance configuration
21 * @buf: the buffer struct
22 * @size: total size of the buffer
23 * @num_subbuf: number of subbuffers
24 * @extra_reader_sb: need extra subbuffer for reader
25 */
26static
27int lib_ring_buffer_backend_allocate(const struct lib_ring_buffer_config *config,
28 struct lib_ring_buffer_backend *bufb,
29 size_t size, size_t num_subbuf,
a6352fd4
MD
30 int extra_reader_sb,
31 struct shm_header *shm_header)
852c2936 32{
a6352fd4 33 struct channel_backend *chanb = &shmp(bufb->chan)->backend;
852c2936
MD
34 unsigned long subbuf_size, mmap_offset = 0;
35 unsigned long num_subbuf_alloc;
852c2936
MD
36 unsigned long i;
37
852c2936
MD
38 subbuf_size = chanb->subbuf_size;
39 num_subbuf_alloc = num_subbuf;
40
a6352fd4 41 if (extra_reader_sb)
852c2936 42 num_subbuf_alloc++;
852c2936 43
a6352fd4
MD
44 set_shmp(bufb->array, zalloc_shm(shm_header,
45 sizeof(*bufb->array) * num_subbuf_alloc));
46 if (unlikely(!shmp(bufb->array)))
852c2936
MD
47 goto array_error;
48
a6352fd4
MD
49 set_shmp(bufb->memory_map, zalloc_shm(shm_header,
50 subbuf_size * num_subbuf_alloc));
51 if (unlikely(!shmp(bufb->memory_map)))
52 goto memory_map_error;
852c2936
MD
53
54 /* Allocate backend pages array elements */
55 for (i = 0; i < num_subbuf_alloc; i++) {
a6352fd4
MD
56 set_shmp(bufb->array[i],
57 zalloc_shm(shm_header,
852c2936 58 sizeof(struct lib_ring_buffer_backend_pages) +
a6352fd4
MD
59 subbuf_size));
60 if (!shmp(bufb->array[i]))
852c2936
MD
61 goto free_array;
62 }
63
64 /* Allocate write-side subbuffer table */
a6352fd4 65 bufb->buf_wsb = zalloc_shm(shm_header,
852c2936 66 sizeof(struct lib_ring_buffer_backend_subbuffer)
14641deb 67 * num_subbuf);
a6352fd4 68 if (unlikely(!shmp(bufb->buf_wsb)))
852c2936
MD
69 goto free_array;
70
71 for (i = 0; i < num_subbuf; i++)
a6352fd4 72 shmp(bufb->buf_wsb)[i].id = subbuffer_id(config, 0, 1, i);
852c2936
MD
73
74 /* Assign read-side subbuffer table */
75 if (extra_reader_sb)
76 bufb->buf_rsb.id = subbuffer_id(config, 0, 1,
77 num_subbuf_alloc - 1);
78 else
79 bufb->buf_rsb.id = subbuffer_id(config, 0, 1, 0);
80
81 /* Assign pages to page index */
82 for (i = 0; i < num_subbuf_alloc; i++) {
a6352fd4
MD
83 set_shmp(shmp(bufb->array)[i]->p,
84 &shmp(bufb->memory_map)[i * subbuf_size]);
852c2936 85 if (config->output == RING_BUFFER_MMAP) {
a6352fd4 86 shmp(bufb->array)[i]->mmap_offset = mmap_offset;
852c2936
MD
87 mmap_offset += subbuf_size;
88 }
89 }
90
852c2936
MD
91 return 0;
92
93free_array:
a6352fd4
MD
94 /* bufb->array[i] will be freed by shm teardown */
95memory_map_error:
96 /* bufb->array will be freed by shm teardown */
852c2936 97array_error:
852c2936
MD
98 return -ENOMEM;
99}
100
101int lib_ring_buffer_backend_create(struct lib_ring_buffer_backend *bufb,
a6352fd4
MD
102 struct channel_backend *chanb, int cpu,
103 struct shm_header *shm_header)
852c2936
MD
104{
105 const struct lib_ring_buffer_config *config = chanb->config;
106
a6352fd4 107 set_shmp(&bufb->chan, caa_container_of(chanb, struct channel, backend));
852c2936
MD
108 bufb->cpu = cpu;
109
110 return lib_ring_buffer_backend_allocate(config, bufb, chanb->buf_size,
111 chanb->num_subbuf,
a6352fd4
MD
112 chanb->extra_reader_sb,
113 shm_header);
852c2936
MD
114}
115
116void lib_ring_buffer_backend_free(struct lib_ring_buffer_backend *bufb)
117{
a6352fd4
MD
118 /* bufb->buf_wsb will be freed by shm teardown */
119 /* bufb->array[i] will be freed by shm teardown */
120 /* bufb->array will be freed by shm teardown */
852c2936
MD
121 bufb->allocated = 0;
122}
123
124void lib_ring_buffer_backend_reset(struct lib_ring_buffer_backend *bufb)
125{
a6352fd4 126 struct channel_backend *chanb = &shmp(bufb->chan)->backend;
852c2936
MD
127 const struct lib_ring_buffer_config *config = chanb->config;
128 unsigned long num_subbuf_alloc;
129 unsigned int i;
130
131 num_subbuf_alloc = chanb->num_subbuf;
132 if (chanb->extra_reader_sb)
133 num_subbuf_alloc++;
134
135 for (i = 0; i < chanb->num_subbuf; i++)
a6352fd4 136 shmp(bufb->buf_wsb)[i].id = subbuffer_id(config, 0, 1, i);
852c2936
MD
137 if (chanb->extra_reader_sb)
138 bufb->buf_rsb.id = subbuffer_id(config, 0, 1,
139 num_subbuf_alloc - 1);
140 else
141 bufb->buf_rsb.id = subbuffer_id(config, 0, 1, 0);
142
143 for (i = 0; i < num_subbuf_alloc; i++) {
144 /* Don't reset mmap_offset */
a6352fd4
MD
145 v_set(config, &shmp(bufb->array)[i]->records_commit, 0);
146 v_set(config, &shmp(bufb->array)[i]->records_unread, 0);
147 shmp(bufb->array)[i]->data_size = 0;
852c2936
MD
148 /* Don't reset backend page and virt addresses */
149 }
150 /* Don't reset num_pages_per_subbuf, cpu, allocated */
151 v_set(config, &bufb->records_read, 0);
152}
153
154/*
155 * The frontend is responsible for also calling ring_buffer_backend_reset for
156 * each buffer when calling channel_backend_reset.
157 */
158void channel_backend_reset(struct channel_backend *chanb)
159{
14641deb 160 struct channel *chan = caa_container_of(chanb, struct channel, backend);
852c2936
MD
161 const struct lib_ring_buffer_config *config = chanb->config;
162
163 /*
164 * Don't reset buf_size, subbuf_size, subbuf_size_order,
165 * num_subbuf_order, buf_size_order, extra_reader_sb, num_subbuf,
166 * priv, notifiers, config, cpumask and name.
167 */
168 chanb->start_tsc = config->cb.ring_buffer_clock_read(chan);
169}
170
852c2936
MD
171/**
172 * channel_backend_init - initialize a channel backend
173 * @chanb: channel backend
174 * @name: channel name
175 * @config: client ring buffer configuration
176 * @priv: client private data
177 * @parent: dentry of parent directory, %NULL for root directory
178 * @subbuf_size: size of sub-buffers (> PAGE_SIZE, power of 2)
179 * @num_subbuf: number of sub-buffers (power of 2)
a6352fd4 180 * @shm_header: shared memory header
852c2936
MD
181 *
182 * Returns channel pointer if successful, %NULL otherwise.
183 *
184 * Creates per-cpu channel buffers using the sizes and attributes
185 * specified. The created channel buffer files will be named
186 * name_0...name_N-1. File permissions will be %S_IRUSR.
187 *
188 * Called with CPU hotplug disabled.
189 */
190int channel_backend_init(struct channel_backend *chanb,
191 const char *name,
192 const struct lib_ring_buffer_config *config,
a6352fd4
MD
193 void *priv, size_t subbuf_size, size_t num_subbuf,
194 struct shm_header *shm_header)
852c2936 195{
14641deb 196 struct channel *chan = caa_container_of(chanb, struct channel, backend);
852c2936
MD
197 unsigned int i;
198 int ret;
199
200 if (!name)
201 return -EPERM;
202
203 if (!(subbuf_size && num_subbuf))
204 return -EPERM;
205
206 /* Check that the subbuffer size is larger than a page. */
207 if (subbuf_size < PAGE_SIZE)
208 return -EINVAL;
209
210 /*
211 * Make sure the number of subbuffers and subbuffer size are power of 2.
212 */
213 CHAN_WARN_ON(chanb, hweight32(subbuf_size) != 1);
214 CHAN_WARN_ON(chanb, hweight32(num_subbuf) != 1);
215
216 ret = subbuffer_id_check_index(config, num_subbuf);
217 if (ret)
218 return ret;
219
220 chanb->priv = priv;
221 chanb->buf_size = num_subbuf * subbuf_size;
222 chanb->subbuf_size = subbuf_size;
223 chanb->buf_size_order = get_count_order(chanb->buf_size);
224 chanb->subbuf_size_order = get_count_order(subbuf_size);
225 chanb->num_subbuf_order = get_count_order(num_subbuf);
226 chanb->extra_reader_sb =
227 (config->mode == RING_BUFFER_OVERWRITE) ? 1 : 0;
228 chanb->num_subbuf = num_subbuf;
a6352fd4
MD
229 strncpy(chanb->name, name, NAME_MAX);
230 chanb->name[NAME_MAX - 1] = '\0';
852c2936
MD
231 chanb->config = config;
232
233 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
a6352fd4
MD
234 struct lib_ring_buffer *buf;
235 size_t alloc_size;
852c2936 236
852c2936 237 /* Allocating the buffer per-cpu structures */
a6352fd4
MD
238 alloc_size = sizeof(struct lib_ring_buffer);
239 buf = zalloc_shm(shm_header, alloc_size * num_possible_cpus());
240 if (!buf)
241 goto end;
242 set_shmp(chanb->buf, buf);
852c2936
MD
243
244 /*
a6352fd4 245 * We need to allocate for all possible cpus.
852c2936 246 */
852c2936 247 for_each_possible_cpu(i) {
a6352fd4
MD
248 ret = lib_ring_buffer_create(&shmp(chanb->buf)[i],
249 chanb, i, shm_header);
852c2936
MD
250 if (ret)
251 goto free_bufs; /* cpu hotplug locked */
252 }
852c2936 253 } else {
a6352fd4
MD
254 struct lib_ring_buffer *buf;
255 size_t alloc_size;
256
257 alloc_size = sizeof(struct lib_ring_buffer);
258 chanb->buf = zmalloc(sizeof(struct lib_ring_buffer));
259 buf = zalloc_shm(shm_header, alloc_size);
260 if (!buf)
261 goto end;
262 set_shmp(chanb->buf, buf);
263 ret = lib_ring_buffer_create(shmp(chanb->buf), chanb, -1,
264 shm_header);
852c2936
MD
265 if (ret)
266 goto free_bufs;
267 }
268 chanb->start_tsc = config->cb.ring_buffer_clock_read(chan);
269
270 return 0;
271
272free_bufs:
273 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
274 for_each_possible_cpu(i) {
a6352fd4 275 struct lib_ring_buffer *buf = &shmp(chanb->buf)[i];
852c2936
MD
276
277 if (!buf->backend.allocated)
278 continue;
279 lib_ring_buffer_free(buf);
280 }
a6352fd4
MD
281 }
282 /* We only free the buffer data upon shm teardown */
283end:
852c2936
MD
284 return -ENOMEM;
285}
286
852c2936
MD
287/**
288 * channel_backend_free - destroy the channel
289 * @chan: the channel
290 *
291 * Destroy all channel buffers and frees the channel.
292 */
293void channel_backend_free(struct channel_backend *chanb)
294{
295 const struct lib_ring_buffer_config *config = chanb->config;
296 unsigned int i;
297
298 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
299 for_each_possible_cpu(i) {
a6352fd4 300 struct lib_ring_buffer *buf = &shmp(chanb->buf)[i];
852c2936
MD
301
302 if (!buf->backend.allocated)
303 continue;
304 lib_ring_buffer_free(buf);
305 }
852c2936 306 } else {
a6352fd4 307 struct lib_ring_buffer *buf = shmp(chanb->buf);
852c2936
MD
308
309 CHAN_WARN_ON(chanb, !buf->backend.allocated);
310 lib_ring_buffer_free(buf);
852c2936 311 }
a6352fd4 312 /* We only free the buffer data upon shm teardown */
852c2936
MD
313}
314
852c2936
MD
315/**
316 * lib_ring_buffer_read - read data from ring_buffer_buffer.
317 * @bufb : buffer backend
318 * @offset : offset within the buffer
319 * @dest : destination address
320 * @len : length to copy to destination
321 *
322 * Should be protected by get_subbuf/put_subbuf.
323 * Returns the length copied.
324 */
325size_t lib_ring_buffer_read(struct lib_ring_buffer_backend *bufb, size_t offset,
326 void *dest, size_t len)
327{
a6352fd4 328 struct channel_backend *chanb = &shmp(bufb->chan)->backend;
852c2936 329 const struct lib_ring_buffer_config *config = chanb->config;
a6352fd4 330 ssize_t orig_len;
852c2936
MD
331 struct lib_ring_buffer_backend_pages *rpages;
332 unsigned long sb_bindex, id;
333
334 orig_len = len;
335 offset &= chanb->buf_size - 1;
a6352fd4 336
852c2936
MD
337 if (unlikely(!len))
338 return 0;
a6352fd4
MD
339 id = bufb->buf_rsb.id;
340 sb_bindex = subbuffer_id_get_index(config, id);
341 rpages = shmp(bufb->array)[sb_bindex];
342 /*
343 * Underlying layer should never ask for reads across
344 * subbuffers.
345 */
346 CHAN_WARN_ON(chanb, offset >= chanb->buf_size);
347 CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
348 && subbuffer_id_is_noref(config, id));
349 memcpy(dest, shmp(rpages->p) + (offset & ~(chanb->subbuf_size - 1)), len);
852c2936
MD
350 return orig_len;
351}
852c2936 352
852c2936
MD
353/**
354 * lib_ring_buffer_read_cstr - read a C-style string from ring_buffer.
355 * @bufb : buffer backend
356 * @offset : offset within the buffer
357 * @dest : destination address
358 * @len : destination's length
359 *
360 * return string's length
361 * Should be protected by get_subbuf/put_subbuf.
362 */
363int lib_ring_buffer_read_cstr(struct lib_ring_buffer_backend *bufb, size_t offset,
364 void *dest, size_t len)
365{
a6352fd4 366 struct channel_backend *chanb = &shmp(bufb->chan)->backend;
852c2936 367 const struct lib_ring_buffer_config *config = chanb->config;
a6352fd4 368 ssize_t string_len, orig_offset;
852c2936
MD
369 char *str;
370 struct lib_ring_buffer_backend_pages *rpages;
371 unsigned long sb_bindex, id;
372
373 offset &= chanb->buf_size - 1;
852c2936 374 orig_offset = offset;
852c2936
MD
375 id = bufb->buf_rsb.id;
376 sb_bindex = subbuffer_id_get_index(config, id);
a6352fd4
MD
377 rpages = shmp(bufb->array)[sb_bindex];
378 /*
379 * Underlying layer should never ask for reads across
380 * subbuffers.
381 */
382 CHAN_WARN_ON(chanb, offset >= chanb->buf_size);
852c2936
MD
383 CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
384 && subbuffer_id_is_noref(config, id));
a6352fd4
MD
385 str = (char *)shmp(rpages->p) + (offset & ~(chanb->subbuf_size - 1));
386 string_len = strnlen(str, len);
387 if (dest && len) {
388 memcpy(dest, str, string_len);
389 ((char *)dest)[0] = 0;
390 }
391 return offset - orig_offset;
852c2936 392}
852c2936
MD
393
394/**
395 * lib_ring_buffer_read_offset_address - get address of a buffer location
396 * @bufb : buffer backend
397 * @offset : offset within the buffer.
398 *
399 * Return the address where a given offset is located (for read).
400 * Should be used to get the current subbuffer header pointer. Given we know
401 * it's never on a page boundary, it's safe to write directly to this address,
402 * as long as the write is never bigger than a page size.
403 */
404void *lib_ring_buffer_read_offset_address(struct lib_ring_buffer_backend *bufb,
405 size_t offset)
406{
852c2936 407 struct lib_ring_buffer_backend_pages *rpages;
a6352fd4 408 struct channel_backend *chanb = &shmp(bufb->chan)->backend;
852c2936
MD
409 const struct lib_ring_buffer_config *config = chanb->config;
410 unsigned long sb_bindex, id;
411
412 offset &= chanb->buf_size - 1;
852c2936
MD
413 id = bufb->buf_rsb.id;
414 sb_bindex = subbuffer_id_get_index(config, id);
a6352fd4 415 rpages = shmp(bufb->array)[sb_bindex];
852c2936
MD
416 CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
417 && subbuffer_id_is_noref(config, id));
a6352fd4 418 return shmp(rpages->p) + (offset & ~(chanb->subbuf_size - 1));
852c2936 419}
852c2936
MD
420
421/**
422 * lib_ring_buffer_offset_address - get address of a location within the buffer
423 * @bufb : buffer backend
424 * @offset : offset within the buffer.
425 *
426 * Return the address where a given offset is located.
427 * Should be used to get the current subbuffer header pointer. Given we know
428 * it's always at the beginning of a page, it's safe to write directly to this
429 * address, as long as the write is never bigger than a page size.
430 */
431void *lib_ring_buffer_offset_address(struct lib_ring_buffer_backend *bufb,
432 size_t offset)
433{
a6352fd4 434 size_t sbidx;
852c2936 435 struct lib_ring_buffer_backend_pages *rpages;
a6352fd4 436 struct channel_backend *chanb = &shmp(bufb->chan)->backend;
852c2936
MD
437 const struct lib_ring_buffer_config *config = chanb->config;
438 unsigned long sb_bindex, id;
439
440 offset &= chanb->buf_size - 1;
441 sbidx = offset >> chanb->subbuf_size_order;
a6352fd4 442 id = shmp(bufb->buf_wsb)[sbidx].id;
852c2936 443 sb_bindex = subbuffer_id_get_index(config, id);
a6352fd4 444 rpages = shmp(bufb->array)[sb_bindex];
852c2936
MD
445 CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
446 && subbuffer_id_is_noref(config, id));
a6352fd4 447 return shmp(rpages->p) + (offset & ~(chanb->subbuf_size - 1));
852c2936 448}
This page took 0.042081 seconds and 4 git commands to generate.