Add metadata transport
[lttng-modules.git] / ltt-ring-buffer-client.h
1 /*
2 * ltt-ring-buffer-client.h
3 *
4 * Copyright (C) 2010 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
5 *
6 * LTTng lib ring buffer client template.
7 *
8 * Dual LGPL v2.1/GPL v2 license.
9 */
10
11 #include <linux/module.h>
12 #include <linux/types.h>
13 #include "wrapper/vmalloc.h" /* for wrapper_vmalloc_sync_all() */
14 #include "wrapper/trace-clock.h"
15 #include "ltt-events.h"
16 #include "ltt-tracer.h"
17
18 static inline notrace u64 lib_ring_buffer_clock_read(struct channel *chan)
19 {
20 return trace_clock_read64();
21 }
22
23 /*
24 * record_header_size - Calculate the header size and padding necessary.
25 * @config: ring buffer instance configuration
26 * @chan: channel
27 * @offset: offset in the write buffer
28 * @data_size: size of the payload
29 * @pre_header_padding: padding to add before the header (output)
30 * @rflags: reservation flags
31 * @ctx: reservation context
32 *
33 * Returns the event header size (including padding).
34 *
35 * Important note :
36 * The event header must be 32-bits. The total offset calculated here :
37 *
38 * Alignment of header struct on 32 bits (min arch size, header size)
39 * + sizeof(header struct) (32-bits)
40 * + (opt) u16 (ext. event id)
41 * + (opt) u16 (event_size)
42 * (if event_size == LTT_MAX_SMALL_SIZE, has ext. event size)
43 * + (opt) u32 (ext. event size)
44 * + (opt) u64 full TSC (aligned on min(64-bits, arch size))
45 *
46 * The payload must itself determine its own alignment from the biggest type it
47 * contains.
48 */
49 static __inline__
50 unsigned char record_header_size(const struct lib_ring_buffer_config *config,
51 struct channel *chan, size_t offset,
52 size_t data_size, size_t *pre_header_padding,
53 unsigned int rflags,
54 struct lib_ring_buffer_ctx *ctx)
55 {
56 size_t orig_offset = offset;
57 size_t padding;
58
59 BUILD_BUG_ON(sizeof(struct event_header) != sizeof(u32));
60
61 padding = lib_ring_buffer_align(offset,
62 sizeof(struct event_header));
63 offset += padding;
64 offset += sizeof(struct event_header);
65
66 if (unlikely(rflags)) {
67 switch (rflags) {
68 case LTT_RFLAG_ID_SIZE_TSC:
69 offset += sizeof(u16) + sizeof(u16);
70 if (data_size >= LTT_MAX_SMALL_SIZE)
71 offset += sizeof(u32);
72 offset += lib_ring_buffer_align(offset, sizeof(u64));
73 offset += sizeof(u64);
74 break;
75 case LTT_RFLAG_ID_SIZE:
76 offset += sizeof(u16) + sizeof(u16);
77 if (data_size >= LTT_MAX_SMALL_SIZE)
78 offset += sizeof(u32);
79 break;
80 case LTT_RFLAG_ID:
81 offset += sizeof(u16);
82 break;
83 }
84 }
85
86 *pre_header_padding = padding;
87 return offset - orig_offset;
88 }
89
90 #include "wrapper/ringbuffer/api.h"
91
92 extern
93 void ltt_write_event_header_slow(const struct lib_ring_buffer_config *config,
94 struct lib_ring_buffer_ctx *ctx,
95 u16 eID, u32 event_size);
96
97 /*
98 * ltt_write_event_header
99 *
100 * Writes the event header to the offset (already aligned on 32-bits).
101 *
102 * @config: ring buffer instance configuration
103 * @ctx: reservation context
104 * @eID : event ID
105 * @event_size : size of the event, excluding the event header.
106 */
107 static __inline__
108 void ltt_write_event_header(const struct lib_ring_buffer_config *config,
109 struct lib_ring_buffer_ctx *ctx,
110 u16 eID, u32 event_size)
111 {
112 struct event_header header;
113
114 if (unlikely(ctx->rflags))
115 goto slow_path;
116
117 header.id_time = eID << LTT_TSC_BITS;
118 header.id_time |= (u32)ctx->tsc & LTT_TSC_MASK;
119 lib_ring_buffer_write(config, ctx, &header, sizeof(header));
120
121 slow_path:
122 ltt_write_event_header_slow(config, ctx, eID, event_size);
123 }
124
125 /**
126 * ltt_write_trace_header - Write trace header
127 * @priv: Private data (struct trace)
128 * @header: Memory address where the information must be written to
129 */
130 static __inline__
131 void write_trace_header(const struct lib_ring_buffer_config *config,
132 struct packet_header *header)
133 {
134 header->magic = CTF_MAGIC_NUMBER;
135 #if 0
136 /* TODO: move start time to metadata */
137 header->major_version = LTT_TRACER_VERSION_MAJOR;
138 header->minor_version = LTT_TRACER_VERSION_MINOR;
139 header->arch_size = sizeof(void *);
140 header->alignment = lib_ring_buffer_get_alignment(config);
141 header->start_time_sec = ltt_chan->session->start_time.tv_sec;
142 header->start_time_usec = ltt_chan->session->start_time.tv_usec;
143 header->start_freq = ltt_chan->session->start_freq;
144 header->freq_scale = ltt_chan->session->freq_scale;
145 #endif //0
146 }
147
148 void ltt_write_event_header_slow(const struct lib_ring_buffer_config *config,
149 struct lib_ring_buffer_ctx *ctx,
150 u16 eID, u32 event_size)
151 {
152 struct event_header header;
153 u16 small_size;
154
155 switch (ctx->rflags) {
156 case LTT_RFLAG_ID_SIZE_TSC:
157 header.id_time = 29 << LTT_TSC_BITS;
158 break;
159 case LTT_RFLAG_ID_SIZE:
160 header.id_time = 30 << LTT_TSC_BITS;
161 break;
162 case LTT_RFLAG_ID:
163 header.id_time = 31 << LTT_TSC_BITS;
164 break;
165 default:
166 WARN_ON_ONCE(1);
167 header.id_time = 0;
168 }
169
170 header.id_time |= (u32)ctx->tsc & LTT_TSC_MASK;
171 lib_ring_buffer_write(config, ctx, &header, sizeof(header));
172
173 switch (ctx->rflags) {
174 case LTT_RFLAG_ID_SIZE_TSC:
175 small_size = (u16)min_t(u32, event_size, LTT_MAX_SMALL_SIZE);
176 lib_ring_buffer_write(config, ctx, &eID, sizeof(u16));
177 lib_ring_buffer_write(config, ctx, &small_size, sizeof(u16));
178 if (small_size == LTT_MAX_SMALL_SIZE)
179 lib_ring_buffer_write(config, ctx, &event_size,
180 sizeof(u32));
181 lib_ring_buffer_align_ctx(ctx, sizeof(u64));
182 lib_ring_buffer_write(config, ctx, &ctx->tsc, sizeof(u64));
183 break;
184 case LTT_RFLAG_ID_SIZE:
185 small_size = (u16)min_t(u32, event_size, LTT_MAX_SMALL_SIZE);
186 lib_ring_buffer_write(config, ctx, &eID, sizeof(u16));
187 lib_ring_buffer_write(config, ctx, &small_size, sizeof(u16));
188 if (small_size == LTT_MAX_SMALL_SIZE)
189 lib_ring_buffer_write(config, ctx, &event_size,
190 sizeof(u32));
191 break;
192 case LTT_RFLAG_ID:
193 lib_ring_buffer_write(config, ctx, &eID, sizeof(u16));
194 break;
195 }
196 }
197
198 static const struct lib_ring_buffer_config client_config;
199
200 static u64 client_ring_buffer_clock_read(struct channel *chan)
201 {
202 return lib_ring_buffer_clock_read(chan);
203 }
204
205 static
206 size_t client_record_header_size(const struct lib_ring_buffer_config *config,
207 struct channel *chan, size_t offset,
208 size_t data_size,
209 size_t *pre_header_padding,
210 unsigned int rflags,
211 struct lib_ring_buffer_ctx *ctx)
212 {
213 return record_header_size(config, chan, offset, data_size,
214 pre_header_padding, rflags, ctx);
215 }
216
217 /**
218 * client_packet_header_size - called on buffer-switch to a new sub-buffer
219 *
220 * Return header size without padding after the structure. Don't use packed
221 * structure because gcc generates inefficient code on some architectures
222 * (powerpc, mips..)
223 */
224 static size_t client_packet_header_size(void)
225 {
226 return offsetof(struct packet_header, header_end);
227 }
228
229 static void client_buffer_begin(struct lib_ring_buffer *buf, u64 tsc,
230 unsigned int subbuf_idx)
231 {
232 struct channel *chan = buf->backend.chan;
233 struct packet_header *header =
234 (struct packet_header *)
235 lib_ring_buffer_offset_address(&buf->backend,
236 subbuf_idx * chan->backend.subbuf_size);
237
238 header->timestamp_begin = tsc;
239 header->content_size = 0xFFFFFFFF; /* for debugging */
240 write_trace_header(&client_config, header);
241 }
242
243 /*
244 * offset is assumed to never be 0 here : never deliver a completely empty
245 * subbuffer. data_size is between 1 and subbuf_size.
246 */
247 static void client_buffer_end(struct lib_ring_buffer *buf, u64 tsc,
248 unsigned int subbuf_idx, unsigned long data_size)
249 {
250 struct channel *chan = buf->backend.chan;
251 struct packet_header *header =
252 (struct packet_header *)
253 lib_ring_buffer_offset_address(&buf->backend,
254 subbuf_idx * chan->backend.subbuf_size);
255 unsigned long records_lost = 0;
256
257 header->content_size = data_size;
258 header->packet_size = PAGE_ALIGN(data_size);
259 header->timestamp_end = tsc;
260 records_lost += lib_ring_buffer_get_records_lost_full(&client_config, buf);
261 records_lost += lib_ring_buffer_get_records_lost_wrap(&client_config, buf);
262 records_lost += lib_ring_buffer_get_records_lost_big(&client_config, buf);
263 header->events_lost = records_lost;
264 }
265
266 static int client_buffer_create(struct lib_ring_buffer *buf, void *priv,
267 int cpu, const char *name)
268 {
269 return 0;
270 }
271
272 static void client_buffer_finalize(struct lib_ring_buffer *buf, void *priv, int cpu)
273 {
274 }
275
276 static const struct lib_ring_buffer_config client_config = {
277 .cb.ring_buffer_clock_read = client_ring_buffer_clock_read,
278 .cb.record_header_size = client_record_header_size,
279 .cb.subbuffer_header_size = client_packet_header_size,
280 .cb.buffer_begin = client_buffer_begin,
281 .cb.buffer_end = client_buffer_end,
282 .cb.buffer_create = client_buffer_create,
283 .cb.buffer_finalize = client_buffer_finalize,
284
285 .tsc_bits = 32,
286 .alloc = RING_BUFFER_ALLOC_PER_CPU,
287 .sync = RING_BUFFER_SYNC_PER_CPU,
288 .mode = RING_BUFFER_MODE_TEMPLATE,
289 .backend = RING_BUFFER_PAGE,
290 .output = RING_BUFFER_SPLICE,
291 .oops = RING_BUFFER_OOPS_CONSISTENCY,
292 .ipi = RING_BUFFER_IPI_BARRIER,
293 .wakeup = RING_BUFFER_WAKEUP_BY_TIMER,
294 };
295
296 static
297 struct channel *_channel_create(const char *name,
298 struct ltt_session *session, void *buf_addr,
299 size_t subbuf_size, size_t num_subbuf,
300 unsigned int switch_timer_interval,
301 unsigned int read_timer_interval)
302 {
303 return channel_create(&client_config, name, session, buf_addr,
304 subbuf_size, num_subbuf, switch_timer_interval,
305 read_timer_interval);
306 }
307
308 static
309 void ltt_channel_destroy(struct channel *chan)
310 {
311 channel_destroy(chan);
312 }
313
314 static
315 struct lib_ring_buffer *ltt_buffer_read_open(struct channel *chan)
316 {
317 struct lib_ring_buffer *buf;
318 int cpu;
319
320 for_each_channel_cpu(cpu, chan) {
321 buf = channel_get_ring_buffer(&client_config, chan, cpu);
322 if (!lib_ring_buffer_open_read(buf))
323 return buf;
324 }
325 return NULL;
326 }
327
328 static
329 void ltt_buffer_read_close(struct lib_ring_buffer *buf)
330 {
331 lib_ring_buffer_release_read(buf);
332
333 }
334
335 int ltt_event_reserve(struct lib_ring_buffer_ctx *ctx)
336 {
337 int ret, cpu;
338
339 cpu = lib_ring_buffer_get_cpu(&client_config);
340 if (cpu < 0)
341 return -EPERM;
342 ctx->cpu = cpu;
343
344 ret = lib_ring_buffer_reserve(&client_config, ctx);
345 if (ret)
346 goto put;
347 return ret;
348
349 put:
350 lib_ring_buffer_put_cpu(&client_config);
351 return ret;
352 }
353
354 void ltt_event_commit(struct lib_ring_buffer_ctx *ctx)
355 {
356 lib_ring_buffer_commit(&client_config, ctx);
357 lib_ring_buffer_put_cpu(&client_config);
358 }
359
360 void ltt_event_write(struct lib_ring_buffer_ctx *ctx, const void *src,
361 size_t len)
362 {
363 lib_ring_buffer_write(&client_config, ctx, src, len);
364 }
365
366 static struct ltt_transport ltt_relay_transport = {
367 .name = "relay-" RING_BUFFER_MODE_TEMPLATE_STRING,
368 .owner = THIS_MODULE,
369 .ops = {
370 .channel_create = _channel_create,
371 .channel_destroy = ltt_channel_destroy,
372 .buffer_read_open = ltt_buffer_read_open,
373 .buffer_read_close = ltt_buffer_read_close,
374 .event_reserve = ltt_event_reserve,
375 .event_commit = ltt_event_commit,
376 .event_write = ltt_event_write,
377 },
378 };
379
380 static int __init ltt_ring_buffer_client_init(void)
381 {
382 /*
383 * This vmalloc sync all also takes care of the lib ring buffer
384 * vmalloc'd module pages when it is built as a module into LTTng.
385 */
386 wrapper_vmalloc_sync_all();
387 printk(KERN_INFO "LTT : ltt ring buffer client init\n");
388 ltt_transport_register(&ltt_relay_transport);
389 return 0;
390 }
391
392 module_init(ltt_ring_buffer_client_init);
393
394 static void __exit ltt_ring_buffer_client_exit(void)
395 {
396 printk(KERN_INFO "LTT : ltt ring buffer client exit\n");
397 ltt_transport_unregister(&ltt_relay_transport);
398 }
399
400 module_exit(ltt_ring_buffer_client_exit);
401
402 MODULE_LICENSE("GPL and additional rights");
403 MODULE_AUTHOR("Mathieu Desnoyers");
404 MODULE_DESCRIPTION("LTTng ring buffer " RING_BUFFER_MODE_TEMPLATE_STRING
405 " client");
This page took 0.037229 seconds and 4 git commands to generate.