52e632d86d247b0b9bf0ed79a472c06c4d6db006
[lttng-ust.git] / libringbuffer / backend.h
1 #ifndef _LTTNG_RING_BUFFER_BACKEND_H
2 #define _LTTNG_RING_BUFFER_BACKEND_H
3
4 /*
5 * libringbuffer/backend.h
6 *
7 * Ring buffer backend (API).
8 *
9 * Copyright (C) 2011-2012 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
10 *
11 * This library is free software; you can redistribute it and/or
12 * modify it under the terms of the GNU Lesser General Public
13 * License as published by the Free Software Foundation; only
14 * version 2.1 of the License.
15 *
16 * This library is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19 * Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public
22 * License along with this library; if not, write to the Free Software
23 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
24 *
25 * Credits to Steven Rostedt for proposing to use an extra-subbuffer owned by
26 * the reader in flight recorder mode.
27 */
28
29 #include <stddef.h>
30 #include <unistd.h>
31
32 /* Internal helpers */
33 #include "backend_internal.h"
34 #include "frontend_internal.h"
35
36 /* Ring buffer backend API */
37
38 /* Ring buffer backend access (read/write) */
39
40 extern size_t lib_ring_buffer_read(struct lttng_ust_lib_ring_buffer_backend *bufb,
41 size_t offset, void *dest, size_t len,
42 struct lttng_ust_shm_handle *handle);
43
44 extern int lib_ring_buffer_read_cstr(struct lttng_ust_lib_ring_buffer_backend *bufb,
45 size_t offset, void *dest, size_t len,
46 struct lttng_ust_shm_handle *handle);
47
48 /*
49 * Return the address where a given offset is located.
50 * Should be used to get the current subbuffer header pointer. Given we know
51 * it's never on a page boundary, it's safe to write directly to this address,
52 * as long as the write is never bigger than a page size.
53 */
54 extern void *
55 lib_ring_buffer_offset_address(struct lttng_ust_lib_ring_buffer_backend *bufb,
56 size_t offset,
57 struct lttng_ust_shm_handle *handle);
58 extern void *
59 lib_ring_buffer_read_offset_address(struct lttng_ust_lib_ring_buffer_backend *bufb,
60 size_t offset,
61 struct lttng_ust_shm_handle *handle);
62
63 /**
64 * lib_ring_buffer_write - write data to a buffer backend
65 * @config : ring buffer instance configuration
66 * @ctx: ring buffer context. (input arguments only)
67 * @src : source pointer to copy from
68 * @len : length of data to copy
69 *
70 * This function copies "len" bytes of data from a source pointer to a buffer
71 * backend, at the current context offset. This is more or less a buffer
72 * backend-specific memcpy() operation. Calls the slow path (_ring_buffer_write)
73 * if copy is crossing a page boundary.
74 */
75 static inline __attribute__((always_inline))
76 void lib_ring_buffer_write(const struct lttng_ust_lib_ring_buffer_config *config,
77 struct lttng_ust_lib_ring_buffer_ctx *ctx,
78 const void *src, size_t len)
79 {
80 struct channel_backend *chanb = &ctx->chan->backend;
81 struct lttng_ust_shm_handle *handle = ctx->handle;
82 size_t offset = ctx->buf_offset;
83 struct lttng_ust_lib_ring_buffer_backend_pages *backend_pages;
84 void *p;
85
86 if (caa_unlikely(!len))
87 return;
88 /*
89 * Underlying layer should never ask for writes across
90 * subbuffers.
91 */
92 CHAN_WARN_ON(chanb, (offset & (chanb->buf_size - 1)) + len > chanb->buf_size);
93 backend_pages = lib_ring_buffer_get_backend_pages_from_ctx(config, ctx);
94 if (caa_unlikely(!backend_pages)) {
95 if (lib_ring_buffer_backend_get_pages(config, ctx, &backend_pages))
96 return;
97 }
98 p = shmp_index(handle, backend_pages->p, offset & (chanb->subbuf_size - 1));
99 if (caa_unlikely(!p))
100 return;
101 lib_ring_buffer_do_copy(config, p, src, len);
102 ctx->buf_offset += len;
103 }
104
105 /*
106 * Copy up to @len string bytes from @src to @dest. Stop whenever a NULL
107 * terminating character is found in @src. Returns the number of bytes
108 * copied. Does *not* terminate @dest with NULL terminating character.
109 */
110 static inline __attribute__((always_inline))
111 size_t lib_ring_buffer_do_strcpy(const struct lttng_ust_lib_ring_buffer_config *config,
112 char *dest, const char *src, size_t len)
113 {
114 size_t count;
115
116 for (count = 0; count < len; count++) {
117 char c;
118
119 /*
120 * Only read source character once, in case it is
121 * modified concurrently.
122 */
123 c = CMM_LOAD_SHARED(src[count]);
124 if (!c)
125 break;
126 lib_ring_buffer_do_copy(config, &dest[count], &c, 1);
127 }
128 return count;
129 }
130
131 /**
132 * lib_ring_buffer_strcpy - write string data to a buffer backend
133 * @config : ring buffer instance configuration
134 * @ctx: ring buffer context. (input arguments only)
135 * @src : source pointer to copy from
136 * @len : length of data to copy
137 * @pad : character to use for padding
138 *
139 * This function copies @len - 1 bytes of string data from a source
140 * pointer to a buffer backend, followed by a terminating '\0'
141 * character, at the current context offset. This is more or less a
142 * buffer backend-specific strncpy() operation. If a terminating '\0'
143 * character is found in @src before @len - 1 characters are copied, pad
144 * the buffer with @pad characters (e.g. '#').
145 */
146 static inline __attribute__((always_inline))
147 void lib_ring_buffer_strcpy(const struct lttng_ust_lib_ring_buffer_config *config,
148 struct lttng_ust_lib_ring_buffer_ctx *ctx,
149 const char *src, size_t len, int pad)
150 {
151 struct channel_backend *chanb = &ctx->chan->backend;
152 struct lttng_ust_shm_handle *handle = ctx->handle;
153 size_t count;
154 size_t offset = ctx->buf_offset;
155 struct lttng_ust_lib_ring_buffer_backend_pages *backend_pages;
156 void *p;
157
158 if (caa_unlikely(!len))
159 return;
160 /*
161 * Underlying layer should never ask for writes across
162 * subbuffers.
163 */
164 CHAN_WARN_ON(chanb, (offset & (chanb->buf_size - 1)) + len > chanb->buf_size);
165 backend_pages = lib_ring_buffer_get_backend_pages_from_ctx(config, ctx);
166 if (caa_unlikely(!backend_pages)) {
167 if (lib_ring_buffer_backend_get_pages(config, ctx, &backend_pages))
168 return;
169 }
170 p = shmp_index(handle, backend_pages->p, offset & (chanb->subbuf_size - 1));
171 if (caa_unlikely(!p))
172 return;
173
174 count = lib_ring_buffer_do_strcpy(config, p, src, len - 1);
175 offset += count;
176 /* Padding */
177 if (caa_unlikely(count < len - 1)) {
178 size_t pad_len = len - 1 - count;
179
180 p = shmp_index(handle, backend_pages->p, offset & (chanb->subbuf_size - 1));
181 if (caa_unlikely(!p))
182 return;
183 lib_ring_buffer_do_memset(p, pad, pad_len);
184 offset += pad_len;
185 }
186 /* Final '\0' */
187 p = shmp_index(handle, backend_pages->p, offset & (chanb->subbuf_size - 1));
188 if (caa_unlikely(!p))
189 return;
190 lib_ring_buffer_do_memset(p, '\0', 1);
191 ctx->buf_offset += len;
192 }
193
194 /*
195 * This accessor counts the number of unread records in a buffer.
196 * It only provides a consistent value if no reads not writes are performed
197 * concurrently.
198 */
199 static inline
200 unsigned long lib_ring_buffer_get_records_unread(
201 const struct lttng_ust_lib_ring_buffer_config *config,
202 struct lttng_ust_lib_ring_buffer *buf,
203 struct lttng_ust_shm_handle *handle)
204 {
205 struct lttng_ust_lib_ring_buffer_backend *bufb = &buf->backend;
206 unsigned long records_unread = 0, sb_bindex;
207 unsigned int i;
208 struct channel *chan;
209
210 chan = shmp(handle, bufb->chan);
211 if (!chan)
212 return 0;
213 for (i = 0; i < chan->backend.num_subbuf; i++) {
214 struct lttng_ust_lib_ring_buffer_backend_subbuffer *wsb;
215 struct lttng_ust_lib_ring_buffer_backend_pages_shmp *rpages;
216 struct lttng_ust_lib_ring_buffer_backend_pages *backend_pages;
217
218 wsb = shmp_index(handle, bufb->buf_wsb, i);
219 if (!wsb)
220 return 0;
221 sb_bindex = subbuffer_id_get_index(config, wsb->id);
222 rpages = shmp_index(handle, bufb->array, sb_bindex);
223 if (!rpages)
224 return 0;
225 backend_pages = shmp(handle, rpages->shmp);
226 if (!backend_pages)
227 return 0;
228 records_unread += v_read(config, &backend_pages->records_unread);
229 }
230 if (config->mode == RING_BUFFER_OVERWRITE) {
231 struct lttng_ust_lib_ring_buffer_backend_pages_shmp *rpages;
232 struct lttng_ust_lib_ring_buffer_backend_pages *backend_pages;
233
234 sb_bindex = subbuffer_id_get_index(config, bufb->buf_rsb.id);
235 rpages = shmp_index(handle, bufb->array, sb_bindex);
236 if (!rpages)
237 return 0;
238 backend_pages = shmp(handle, rpages->shmp);
239 if (!backend_pages)
240 return 0;
241 records_unread += v_read(config, &backend_pages->records_unread);
242 }
243 return records_unread;
244 }
245
246 #endif /* _LTTNG_RING_BUFFER_BACKEND_H */
This page took 0.03338 seconds and 3 git commands to generate.