buffers.c: fix compilation issue
[ust.git] / libust / buffers.c
1 /*
2 * buffers.c
3 * LTTng userspace tracer buffering system
4 *
5 * Copyright (C) 2009 - Pierre-Marc Fournier (pierre-marc dot fournier at polymtl dot ca)
6 * Copyright (C) 2008 - Mathieu Desnoyers (mathieu.desnoyers@polymtl.ca)
7 *
8 * This library is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License as published by the Free Software Foundation; either
11 * version 2.1 of the License, or (at your option) any later version.
12 *
13 * This library is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 * Lesser General Public License for more details.
17 *
18 * You should have received a copy of the GNU Lesser General Public
19 * License along with this library; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
21 */
22
23 #include <unistd.h>
24 #include <sys/mman.h>
25 #include <sys/ipc.h>
26 #include <sys/shm.h>
27 #include <fcntl.h>
28 #include <ust/kernelcompat.h>
29 #include <kcompat/kref.h>
30 #include "buffers.h"
31 #include "channels.h"
32 #include "tracer.h"
33 #include "tracercore.h"
34 #include "usterr.h"
35
36 static DEFINE_MUTEX(ust_buffers_channels_mutex);
37 static LIST_HEAD(ust_buffers_channels);
38
39 static int get_n_cpus(void)
40 {
41 int result;
42 static int n_cpus = 0;
43
44 if(n_cpus) {
45 return n_cpus;
46 }
47
48 /* On Linux, when some processors are offline
49 * _SC_NPROCESSORS_CONF counts the offline
50 * processors, whereas _SC_NPROCESSORS_ONLN
51 * does not. If we used _SC_NPROCESSORS_ONLN,
52 * getcpu() could return a value greater than
53 * this sysconf, in which case the arrays
54 * indexed by processor would overflow.
55 */
56 result = sysconf(_SC_NPROCESSORS_CONF);
57 if(result == -1) {
58 return -1;
59 }
60
61 n_cpus = result;
62
63 return result;
64 }
65
66 static int ust_buffers_init_buffer(struct ltt_trace_struct *trace,
67 struct ust_channel *ltt_chan,
68 struct ust_buffer *buf,
69 unsigned int n_subbufs);
70
71 static int ust_buffers_alloc_buf(struct ust_buffer *buf, size_t *size)
72 {
73 void *ptr;
74 int result;
75
76 *size = PAGE_ALIGN(*size);
77
78 result = buf->shmid = shmget(getpid(), *size, IPC_CREAT | IPC_EXCL | 0700);
79 if(result == -1 && errno == EINVAL) {
80 ERR("shmget() returned EINVAL; maybe /proc/sys/kernel/shmmax should be increased.");
81 return -1;
82 }
83 else if(result == -1) {
84 PERROR("shmget");
85 return -1;
86 }
87
88 /* FIXME: should have matching call to shmdt */
89 ptr = shmat(buf->shmid, NULL, 0);
90 if(ptr == (void *) -1) {
91 perror("shmat");
92 goto destroy_shmem;
93 }
94
95 /* Already mark the shared memory for destruction. This will occur only
96 * when all users have detached.
97 */
98 result = shmctl(buf->shmid, IPC_RMID, NULL);
99 if(result == -1) {
100 perror("shmctl");
101 return -1;
102 }
103
104 buf->buf_data = ptr;
105 buf->buf_size = *size;
106
107 return 0;
108
109 destroy_shmem:
110 result = shmctl(buf->shmid, IPC_RMID, NULL);
111 if(result == -1) {
112 perror("shmctl");
113 }
114
115 return -1;
116 }
117
118 int ust_buffers_create_buf(struct ust_channel *channel, int cpu)
119 {
120 int result;
121 struct ust_buffer *buf = channel->buf[cpu];
122
123 buf->cpu = cpu;
124 result = ust_buffers_alloc_buf(buf, &channel->alloc_size);
125 if(result)
126 return -1;
127
128 buf->chan = channel;
129 kref_get(&channel->kref);
130 return 0;
131 }
132
133 static void ust_buffers_destroy_channel(struct kref *kref)
134 {
135 struct ust_channel *chan = container_of(kref, struct ust_channel, kref);
136 free(chan);
137 }
138
139 static void ust_buffers_destroy_buf(struct ust_buffer *buf)
140 {
141 struct ust_channel *chan = buf->chan;
142 int result;
143
144 result = munmap(buf->buf_data, buf->buf_size);
145 if(result == -1) {
146 PERROR("munmap");
147 }
148
149 //ust// chan->buf[buf->cpu] = NULL;
150 free(buf);
151 kref_put(&chan->kref, ust_buffers_destroy_channel);
152 }
153
154 /* called from kref_put */
155 static void ust_buffers_remove_buf(struct kref *kref)
156 {
157 struct ust_buffer *buf = container_of(kref, struct ust_buffer, kref);
158 ust_buffers_destroy_buf(buf);
159 }
160
161 int ust_buffers_open_buf(struct ust_channel *chan, int cpu)
162 {
163 int result;
164
165 result = ust_buffers_create_buf(chan, cpu);
166 if (result == -1)
167 return -1;
168
169 kref_init(&chan->buf[cpu]->kref);
170
171 result = ust_buffers_init_buffer(chan->trace, chan, chan->buf[cpu], chan->subbuf_cnt);
172 if(result == -1)
173 return -1;
174
175 return 0;
176
177 /* FIXME: decrementally destroy on error? */
178 }
179
180 /**
181 * ust_buffers_close_buf - close a channel buffer
182 * @buf: buffer
183 */
184 static void ust_buffers_close_buf(struct ust_buffer *buf)
185 {
186 kref_put(&buf->kref, ust_buffers_remove_buf);
187 }
188
189 int ust_buffers_channel_open(struct ust_channel *chan, size_t subbuf_size, size_t subbuf_cnt)
190 {
191 int i;
192 int result;
193
194 if(subbuf_size == 0 || subbuf_cnt == 0)
195 return -1;
196
197 chan->version = UST_CHANNEL_VERSION;
198 chan->subbuf_cnt = subbuf_cnt;
199 chan->subbuf_size = subbuf_size;
200 chan->subbuf_size_order = get_count_order(subbuf_size);
201 chan->alloc_size = FIX_SIZE(subbuf_size * subbuf_cnt);
202
203 kref_init(&chan->kref);
204
205 mutex_lock(&ust_buffers_channels_mutex);
206 for(i=0; i<chan->n_cpus; i++) {
207 result = ust_buffers_open_buf(chan, i);
208 if (result == -1)
209 goto error;
210 }
211 list_add(&chan->list, &ust_buffers_channels);
212 mutex_unlock(&ust_buffers_channels_mutex);
213
214 return 0;
215
216 /* Jump directly inside the loop to close the buffers that were already
217 * opened. */
218 for(; i>=0; i--) {
219 ust_buffers_close_buf(chan->buf[i]);
220 error:
221 do {} while(0);
222 }
223
224 kref_put(&chan->kref, ust_buffers_destroy_channel);
225 mutex_unlock(&ust_buffers_channels_mutex);
226 return -1;
227 }
228
229 void ust_buffers_channel_close(struct ust_channel *chan)
230 {
231 int i;
232 if(!chan)
233 return;
234
235 mutex_lock(&ust_buffers_channels_mutex);
236 for(i=0; i<chan->n_cpus; i++) {
237 /* FIXME: if we make it here, then all buffers were necessarily allocated. Moreover, we don't
238 * initialize to NULL so we cannot use this check. Should we? */
239 //ust// if (chan->buf[i])
240 ust_buffers_close_buf(chan->buf[i]);
241 }
242
243 list_del(&chan->list);
244 kref_put(&chan->kref, ust_buffers_destroy_channel);
245 mutex_unlock(&ust_buffers_channels_mutex);
246 }
247
248 /* _ust_buffers_write()
249 *
250 * @buf: destination buffer
251 * @offset: offset in destination
252 * @src: source buffer
253 * @len: length of source
254 * @cpy: already copied
255 */
256
257 void _ust_buffers_write(struct ust_buffer *buf, size_t offset,
258 const void *src, size_t len, ssize_t cpy)
259 {
260 do {
261 len -= cpy;
262 src += cpy;
263 offset += cpy;
264
265 WARN_ON(offset >= buf->buf_size);
266
267 cpy = min_t(size_t, len, buf->buf_size - offset);
268 ust_buffers_do_copy(buf->buf_data + offset, src, cpy);
269 } while (unlikely(len != cpy));
270 }
271
272 void *ltt_buffers_offset_address(struct ust_buffer *buf, size_t offset)
273 {
274 return ((char *)buf->buf_data)+offset;
275 }
276
277 /*
278 * -------
279 */
280
281 /*
282 * Last TSC comparison functions. Check if the current TSC overflows
283 * LTT_TSC_BITS bits from the last TSC read. Reads and writes last_tsc
284 * atomically.
285 */
286
287 /* FIXME: does this test work properly? */
288 #if (BITS_PER_LONG == 32)
289 static inline void save_last_tsc(struct ust_buffer *ltt_buf,
290 u64 tsc)
291 {
292 ltt_buf->last_tsc = (unsigned long)(tsc >> LTT_TSC_BITS);
293 }
294
295 static inline int last_tsc_overflow(struct ust_buffer *ltt_buf,
296 u64 tsc)
297 {
298 unsigned long tsc_shifted = (unsigned long)(tsc >> LTT_TSC_BITS);
299
300 if (unlikely((tsc_shifted - ltt_buf->last_tsc)))
301 return 1;
302 else
303 return 0;
304 }
305 #else
306 static inline void save_last_tsc(struct ust_buffer *ltt_buf,
307 u64 tsc)
308 {
309 ltt_buf->last_tsc = (unsigned long)tsc;
310 }
311
312 static inline int last_tsc_overflow(struct ust_buffer *ltt_buf,
313 u64 tsc)
314 {
315 if (unlikely((tsc - ltt_buf->last_tsc) >> LTT_TSC_BITS))
316 return 1;
317 else
318 return 0;
319 }
320 #endif
321
322 /*
323 * A switch is done during tracing or as a final flush after tracing (so it
324 * won't write in the new sub-buffer).
325 */
326 enum force_switch_mode { FORCE_ACTIVE, FORCE_FLUSH };
327
328 static void ust_buffers_destroy_buffer(struct ust_channel *ltt_chan, int cpu);
329
330 static void ltt_force_switch(struct ust_buffer *buf,
331 enum force_switch_mode mode);
332
333 /*
334 * Trace callbacks
335 */
336 static void ltt_buffer_begin_callback(struct ust_buffer *buf,
337 u64 tsc, unsigned int subbuf_idx)
338 {
339 struct ust_channel *channel = buf->chan;
340 struct ltt_subbuffer_header *header =
341 (struct ltt_subbuffer_header *)
342 ltt_buffers_offset_address(buf,
343 subbuf_idx * buf->chan->subbuf_size);
344
345 header->cycle_count_begin = tsc;
346 header->lost_size = 0xFFFFFFFF; /* for debugging */
347 header->buf_size = buf->chan->subbuf_size;
348 ltt_write_trace_header(channel->trace, header);
349 }
350
351 /*
352 * offset is assumed to never be 0 here : never deliver a completely empty
353 * subbuffer. The lost size is between 0 and subbuf_size-1.
354 */
355 static notrace void ltt_buffer_end_callback(struct ust_buffer *buf,
356 u64 tsc, unsigned int offset, unsigned int subbuf_idx)
357 {
358 struct ltt_subbuffer_header *header =
359 (struct ltt_subbuffer_header *)
360 ltt_buffers_offset_address(buf,
361 subbuf_idx * buf->chan->subbuf_size);
362
363 header->lost_size = SUBBUF_OFFSET((buf->chan->subbuf_size - offset),
364 buf->chan);
365 header->cycle_count_end = tsc;
366 header->events_lost = local_read(&buf->events_lost);
367 header->subbuf_corrupt = local_read(&buf->corrupted_subbuffers);
368
369 }
370
371 void (*wake_consumer)(void *, int) = NULL;
372
373 void relay_set_wake_consumer(void (*wake)(void *, int))
374 {
375 wake_consumer = wake;
376 }
377
378 void relay_wake_consumer(void *arg, int finished)
379 {
380 if(wake_consumer)
381 wake_consumer(arg, finished);
382 }
383
384 static notrace void ltt_deliver(struct ust_buffer *buf, unsigned int subbuf_idx,
385 long commit_count)
386 {
387 int result;
388
389 //ust// #ifdef CONFIG_LTT_VMCORE
390 local_set(&buf->commit_seq[subbuf_idx], commit_count);
391 //ust// #endif
392
393 /* wakeup consumer */
394 result = write(buf->data_ready_fd_write, "1", 1);
395 if(result == -1) {
396 PERROR("write (in ltt_relay_buffer_flush)");
397 ERR("this should never happen!");
398 }
399 //ust// atomic_set(&ltt_buf->wakeup_readers, 1);
400 }
401
402 /*
403 * This function should not be called from NMI interrupt context
404 */
405 static notrace void ltt_buf_unfull(struct ust_buffer *buf,
406 unsigned int subbuf_idx,
407 long offset)
408 {
409 //ust// struct ltt_channel_struct *ltt_channel =
410 //ust// (struct ltt_channel_struct *)buf->chan->private_data;
411 //ust// struct ltt_channel_buf_struct *ltt_buf = ltt_channel->buf;
412 //ust//
413 //ust// ltt_relay_wake_writers(ltt_buf);
414 }
415
416 int ust_buffers_do_get_subbuf(struct ust_buffer *buf, long *pconsumed_old)
417 {
418 struct ust_channel *channel = buf->chan;
419 long consumed_old, consumed_idx, commit_count, write_offset;
420 consumed_old = atomic_long_read(&buf->consumed);
421 consumed_idx = SUBBUF_INDEX(consumed_old, buf->chan);
422 commit_count = local_read(&buf->commit_count[consumed_idx]);
423 /*
424 * Make sure we read the commit count before reading the buffer
425 * data and the write offset. Correct consumed offset ordering
426 * wrt commit count is insured by the use of cmpxchg to update
427 * the consumed offset.
428 */
429 smp_rmb();
430 write_offset = local_read(&buf->offset);
431 /*
432 * Check that the subbuffer we are trying to consume has been
433 * already fully committed.
434 */
435 if (((commit_count - buf->chan->subbuf_size)
436 & channel->commit_count_mask)
437 - (BUFFER_TRUNC(consumed_old, buf->chan)
438 >> channel->n_subbufs_order)
439 != 0) {
440 return -EAGAIN;
441 }
442 /*
443 * Check that we are not about to read the same subbuffer in
444 * which the writer head is.
445 */
446 if ((SUBBUF_TRUNC(write_offset, buf->chan)
447 - SUBBUF_TRUNC(consumed_old, buf->chan))
448 == 0) {
449 return -EAGAIN;
450 }
451
452 *pconsumed_old = consumed_old;
453 return 0;
454 }
455
456 int ust_buffers_do_put_subbuf(struct ust_buffer *buf, u32 uconsumed_old)
457 {
458 long consumed_new, consumed_old;
459
460 consumed_old = atomic_long_read(&buf->consumed);
461 consumed_old = consumed_old & (~0xFFFFFFFFL);
462 consumed_old = consumed_old | uconsumed_old;
463 consumed_new = SUBBUF_ALIGN(consumed_old, buf->chan);
464
465 //ust// spin_lock(&ltt_buf->full_lock);
466 if (atomic_long_cmpxchg(&buf->consumed, consumed_old,
467 consumed_new)
468 != consumed_old) {
469 /* We have been pushed by the writer : the last
470 * buffer read _is_ corrupted! It can also
471 * happen if this is a buffer we never got. */
472 //ust// spin_unlock(&ltt_buf->full_lock);
473 return -EIO;
474 } else {
475 /* tell the client that buffer is now unfull */
476 int index;
477 long data;
478 index = SUBBUF_INDEX(consumed_old, buf->chan);
479 data = BUFFER_OFFSET(consumed_old, buf->chan);
480 ltt_buf_unfull(buf, index, data);
481 //ust// spin_unlock(&ltt_buf->full_lock);
482 }
483 return 0;
484 }
485
486 static void ltt_relay_print_subbuffer_errors(
487 struct ust_channel *channel,
488 long cons_off, int cpu)
489 {
490 struct ust_buffer *ltt_buf = channel->buf[cpu];
491 long cons_idx, commit_count, write_offset;
492
493 cons_idx = SUBBUF_INDEX(cons_off, channel);
494 commit_count = local_read(&ltt_buf->commit_count[cons_idx]);
495 /*
496 * No need to order commit_count and write_offset reads because we
497 * execute after trace is stopped when there are no readers left.
498 */
499 write_offset = local_read(&ltt_buf->offset);
500 WARN( "LTT : unread channel %s offset is %ld "
501 "and cons_off : %ld\n",
502 channel->channel_name, write_offset, cons_off);
503 /* Check each sub-buffer for non filled commit count */
504 if (((commit_count - channel->subbuf_size) & channel->commit_count_mask)
505 - (BUFFER_TRUNC(cons_off, channel) >> channel->n_subbufs_order) != 0) {
506 ERR("LTT : %s : subbuffer %lu has non filled "
507 "commit count %lu.\n",
508 channel->channel_name, cons_idx, commit_count);
509 }
510 ERR("LTT : %s : commit count : %lu, subbuf size %zd\n",
511 channel->channel_name, commit_count,
512 channel->subbuf_size);
513 }
514
515 static void ltt_relay_print_errors(struct ltt_trace_struct *trace,
516 struct ust_channel *channel, int cpu)
517 {
518 struct ust_buffer *ltt_buf = channel->buf[cpu];
519 long cons_off;
520
521 /*
522 * Can be called in the error path of allocation when
523 * trans_channel_data is not yet set.
524 */
525 if (!channel)
526 return;
527
528 for (cons_off = atomic_long_read(&ltt_buf->consumed);
529 (SUBBUF_TRUNC(local_read(&ltt_buf->offset),
530 channel)
531 - cons_off) > 0;
532 cons_off = SUBBUF_ALIGN(cons_off, channel))
533 ltt_relay_print_subbuffer_errors(channel, cons_off, cpu);
534 }
535
536 static void ltt_relay_print_buffer_errors(struct ust_channel *channel, int cpu)
537 {
538 struct ltt_trace_struct *trace = channel->trace;
539 struct ust_buffer *ltt_buf = channel->buf[cpu];
540
541 if (local_read(&ltt_buf->events_lost))
542 ERR("channel %s: %ld events lost",
543 channel->channel_name,
544 local_read(&ltt_buf->events_lost));
545 if (local_read(&ltt_buf->corrupted_subbuffers))
546 ERR("channel %s : %ld corrupted subbuffers",
547 channel->channel_name,
548 local_read(&ltt_buf->corrupted_subbuffers));
549
550 ltt_relay_print_errors(trace, channel, cpu);
551 }
552
553 static void ltt_relay_release_channel(struct kref *kref)
554 {
555 struct ust_channel *ltt_chan = container_of(kref,
556 struct ust_channel, kref);
557 free(ltt_chan->buf);
558 }
559
560 /*
561 * Create ltt buffer.
562 */
563 //ust// static int ltt_relay_create_buffer(struct ltt_trace_struct *trace,
564 //ust// struct ltt_channel_struct *ltt_chan, struct rchan_buf *buf,
565 //ust// unsigned int cpu, unsigned int n_subbufs)
566 //ust// {
567 //ust// struct ltt_channel_buf_struct *ltt_buf =
568 //ust// percpu_ptr(ltt_chan->buf, cpu);
569 //ust// unsigned int j;
570 //ust//
571 //ust// ltt_buf->commit_count =
572 //ust// kzalloc_node(sizeof(ltt_buf->commit_count) * n_subbufs,
573 //ust// GFP_KERNEL, cpu_to_node(cpu));
574 //ust// if (!ltt_buf->commit_count)
575 //ust// return -ENOMEM;
576 //ust// kref_get(&trace->kref);
577 //ust// kref_get(&trace->ltt_transport_kref);
578 //ust// kref_get(&ltt_chan->kref);
579 //ust// local_set(&ltt_buf->offset, ltt_subbuffer_header_size());
580 //ust// atomic_long_set(&ltt_buf->consumed, 0);
581 //ust// atomic_long_set(&ltt_buf->active_readers, 0);
582 //ust// for (j = 0; j < n_subbufs; j++)
583 //ust// local_set(&ltt_buf->commit_count[j], 0);
584 //ust// init_waitqueue_head(&ltt_buf->write_wait);
585 //ust// atomic_set(&ltt_buf->wakeup_readers, 0);
586 //ust// spin_lock_init(&ltt_buf->full_lock);
587 //ust//
588 //ust// ltt_buffer_begin_callback(buf, trace->start_tsc, 0);
589 //ust// /* atomic_add made on local variable on data that belongs to
590 //ust// * various CPUs : ok because tracing not started (for this cpu). */
591 //ust// local_add(ltt_subbuffer_header_size(), &ltt_buf->commit_count[0]);
592 //ust//
593 //ust// local_set(&ltt_buf->events_lost, 0);
594 //ust// local_set(&ltt_buf->corrupted_subbuffers, 0);
595 //ust//
596 //ust// return 0;
597 //ust// }
598
599 static int ust_buffers_init_buffer(struct ltt_trace_struct *trace,
600 struct ust_channel *ltt_chan, struct ust_buffer *buf,
601 unsigned int n_subbufs)
602 {
603 unsigned int j;
604 int fds[2];
605 int result;
606
607 buf->commit_count =
608 zmalloc(sizeof(buf->commit_count) * n_subbufs);
609 if (!buf->commit_count)
610 return -ENOMEM;
611 kref_get(&trace->kref);
612 kref_get(&trace->ltt_transport_kref);
613 kref_get(&ltt_chan->kref);
614 local_set(&buf->offset, ltt_subbuffer_header_size());
615 atomic_long_set(&buf->consumed, 0);
616 atomic_long_set(&buf->active_readers, 0);
617 for (j = 0; j < n_subbufs; j++)
618 local_set(&buf->commit_count[j], 0);
619 //ust// init_waitqueue_head(&buf->write_wait);
620 //ust// atomic_set(&buf->wakeup_readers, 0);
621 //ust// spin_lock_init(&buf->full_lock);
622
623 ltt_buffer_begin_callback(buf, trace->start_tsc, 0);
624
625 local_add(ltt_subbuffer_header_size(), &buf->commit_count[0]);
626
627 local_set(&buf->events_lost, 0);
628 local_set(&buf->corrupted_subbuffers, 0);
629
630 result = pipe(fds);
631 if(result == -1) {
632 PERROR("pipe");
633 return -1;
634 }
635 buf->data_ready_fd_read = fds[0];
636 buf->data_ready_fd_write = fds[1];
637
638 /* FIXME: do we actually need this? */
639 result = fcntl(fds[0], F_SETFL, O_NONBLOCK);
640 if(result == -1) {
641 PERROR("fcntl");
642 }
643
644 //ust// buf->commit_seq = malloc(sizeof(buf->commit_seq) * n_subbufs);
645 //ust// if(!ltt_buf->commit_seq) {
646 //ust// return -1;
647 //ust// }
648
649 /* FIXME: decrementally destroy on error */
650
651 return 0;
652 }
653
654 /* FIXME: use this function */
655 static void ust_buffers_destroy_buffer(struct ust_channel *ltt_chan, int cpu)
656 {
657 struct ltt_trace_struct *trace = ltt_chan->trace;
658 struct ust_buffer *ltt_buf = ltt_chan->buf[cpu];
659
660 kref_put(&ltt_chan->trace->ltt_transport_kref,
661 ltt_release_transport);
662 ltt_relay_print_buffer_errors(ltt_chan, cpu);
663 //ust// free(ltt_buf->commit_seq);
664 kfree(ltt_buf->commit_count);
665 ltt_buf->commit_count = NULL;
666 kref_put(&ltt_chan->kref, ltt_relay_release_channel);
667 kref_put(&trace->kref, ltt_release_trace);
668 //ust// wake_up_interruptible(&trace->kref_wq);
669 }
670
671 static int ust_buffers_alloc_channel_buf_structs(struct ust_channel *chan)
672 {
673 void *ptr;
674 int result;
675 size_t size;
676 int i;
677
678 size = PAGE_ALIGN(1);
679
680 for(i=0; i<chan->n_cpus; i++) {
681
682 result = chan->buf_struct_shmids[i] = shmget(getpid(), size, IPC_CREAT | IPC_EXCL | 0700);
683 if(result == -1) {
684 PERROR("shmget");
685 goto destroy_previous;
686 }
687
688 /* FIXME: should have matching call to shmdt */
689 ptr = shmat(chan->buf_struct_shmids[i], NULL, 0);
690 if(ptr == (void *) -1) {
691 perror("shmat");
692 goto destroy_shm;
693 }
694
695 /* Already mark the shared memory for destruction. This will occur only
696 * when all users have detached.
697 */
698 result = shmctl(chan->buf_struct_shmids[i], IPC_RMID, NULL);
699 if(result == -1) {
700 perror("shmctl");
701 goto destroy_previous;
702 }
703
704 chan->buf[i] = ptr;
705 }
706
707 return 0;
708
709 /* Jumping inside this loop occurs from within the other loop above with i as
710 * counter, so it unallocates the structures for the cpu = current_i down to
711 * zero. */
712 for(; i>=0; i--) {
713 destroy_shm:
714 result = shmctl(chan->buf_struct_shmids[i], IPC_RMID, NULL);
715 if(result == -1) {
716 perror("shmctl");
717 }
718
719 destroy_previous:
720 continue;
721 }
722
723 return -1;
724 }
725
726 /*
727 * Create channel.
728 */
729 static int ust_buffers_create_channel(const char *trace_name, struct ltt_trace_struct *trace,
730 const char *channel_name, struct ust_channel *ltt_chan,
731 unsigned int subbuf_size, unsigned int n_subbufs, int overwrite)
732 {
733 int result;
734
735 kref_init(&ltt_chan->kref);
736
737 ltt_chan->trace = trace;
738 ltt_chan->buffer_begin = ltt_buffer_begin_callback;
739 ltt_chan->buffer_end = ltt_buffer_end_callback;
740 ltt_chan->overwrite = overwrite;
741 ltt_chan->n_subbufs_order = get_count_order(n_subbufs);
742 ltt_chan->commit_count_mask = (~0UL >> ltt_chan->n_subbufs_order);
743 ltt_chan->n_cpus = get_n_cpus();
744 //ust// ltt_chan->buf = percpu_alloc_mask(sizeof(struct ltt_channel_buf_struct), GFP_KERNEL, cpu_possible_map);
745 ltt_chan->buf = (void *) malloc(ltt_chan->n_cpus * sizeof(void *));
746 if(ltt_chan->buf == NULL) {
747 goto error;
748 }
749 ltt_chan->buf_struct_shmids = (int *) malloc(ltt_chan->n_cpus * sizeof(int));
750 if(ltt_chan->buf_struct_shmids == NULL)
751 goto free_buf;
752
753 result = ust_buffers_alloc_channel_buf_structs(ltt_chan);
754 if(result != 0) {
755 goto free_buf_struct_shmids;
756 }
757
758 result = ust_buffers_channel_open(ltt_chan, subbuf_size, n_subbufs);
759 if (result != 0) {
760 ERR("Cannot open channel for trace %s", trace_name);
761 goto unalloc_buf_structs;
762 }
763
764 return 0;
765
766 unalloc_buf_structs:
767 /* FIXME: put a call here to unalloc the buf structs! */
768
769 free_buf_struct_shmids:
770 free(ltt_chan->buf_struct_shmids);
771
772 free_buf:
773 free(ltt_chan->buf);
774
775 error:
776 return -1;
777 }
778
779 /*
780 * LTTng channel flush function.
781 *
782 * Must be called when no tracing is active in the channel, because of
783 * accesses across CPUs.
784 */
785 static notrace void ltt_relay_buffer_flush(struct ust_buffer *buf)
786 {
787 int result;
788
789 //ust// buf->finalized = 1;
790 ltt_force_switch(buf, FORCE_FLUSH);
791
792 result = write(buf->data_ready_fd_write, "1", 1);
793 if(result == -1) {
794 PERROR("write (in ltt_relay_buffer_flush)");
795 ERR("this should never happen!");
796 }
797 }
798
799 static void ltt_relay_async_wakeup_chan(struct ust_channel *ltt_channel)
800 {
801 //ust// unsigned int i;
802 //ust// struct rchan *rchan = ltt_channel->trans_channel_data;
803 //ust//
804 //ust// for_each_possible_cpu(i) {
805 //ust// struct ltt_channel_buf_struct *ltt_buf =
806 //ust// percpu_ptr(ltt_channel->buf, i);
807 //ust//
808 //ust// if (atomic_read(&ltt_buf->wakeup_readers) == 1) {
809 //ust// atomic_set(&ltt_buf->wakeup_readers, 0);
810 //ust// wake_up_interruptible(&rchan->buf[i]->read_wait);
811 //ust// }
812 //ust// }
813 }
814
815 static void ltt_relay_finish_buffer(struct ust_channel *channel, unsigned int cpu)
816 {
817 // int result;
818
819 if (channel->buf[cpu]) {
820 struct ust_buffer *buf = channel->buf[cpu];
821 ltt_relay_buffer_flush(buf);
822 //ust// ltt_relay_wake_writers(ltt_buf);
823 /* closing the pipe tells the consumer the buffer is finished */
824
825 //result = write(ltt_buf->data_ready_fd_write, "D", 1);
826 //if(result == -1) {
827 // PERROR("write (in ltt_relay_finish_buffer)");
828 // ERR("this should never happen!");
829 //}
830 close(buf->data_ready_fd_write);
831 }
832 }
833
834
835 static void ltt_relay_finish_channel(struct ust_channel *channel)
836 {
837 unsigned int i;
838
839 for(i=0; i<channel->n_cpus; i++) {
840 ltt_relay_finish_buffer(channel, i);
841 }
842 }
843
844 static void ltt_relay_remove_channel(struct ust_channel *channel)
845 {
846 ust_buffers_channel_close(channel);
847 kref_put(&channel->kref, ltt_relay_release_channel);
848 }
849
850 struct ltt_reserve_switch_offsets {
851 long begin, end, old;
852 long begin_switch, end_switch_current, end_switch_old;
853 long commit_count, reserve_commit_diff;
854 size_t before_hdr_pad, size;
855 };
856
857 /*
858 * Returns :
859 * 0 if ok
860 * !0 if execution must be aborted.
861 */
862 static inline int ltt_relay_try_reserve(
863 struct ust_channel *channel, struct ust_buffer *buf,
864 struct ltt_reserve_switch_offsets *offsets, size_t data_size,
865 u64 *tsc, unsigned int *rflags, int largest_align)
866 {
867 offsets->begin = local_read(&buf->offset);
868 offsets->old = offsets->begin;
869 offsets->begin_switch = 0;
870 offsets->end_switch_current = 0;
871 offsets->end_switch_old = 0;
872
873 *tsc = trace_clock_read64();
874 if (last_tsc_overflow(buf, *tsc))
875 *rflags = LTT_RFLAG_ID_SIZE_TSC;
876
877 if (SUBBUF_OFFSET(offsets->begin, buf->chan) == 0) {
878 offsets->begin_switch = 1; /* For offsets->begin */
879 } else {
880 offsets->size = ust_get_header_size(channel,
881 offsets->begin, data_size,
882 &offsets->before_hdr_pad, *rflags);
883 offsets->size += ltt_align(offsets->begin + offsets->size,
884 largest_align)
885 + data_size;
886 if ((SUBBUF_OFFSET(offsets->begin, buf->chan) + offsets->size)
887 > buf->chan->subbuf_size) {
888 offsets->end_switch_old = 1; /* For offsets->old */
889 offsets->begin_switch = 1; /* For offsets->begin */
890 }
891 }
892 if (offsets->begin_switch) {
893 long subbuf_index;
894
895 if (offsets->end_switch_old)
896 offsets->begin = SUBBUF_ALIGN(offsets->begin,
897 buf->chan);
898 offsets->begin = offsets->begin + ltt_subbuffer_header_size();
899 /* Test new buffer integrity */
900 subbuf_index = SUBBUF_INDEX(offsets->begin, buf->chan);
901 offsets->reserve_commit_diff =
902 (BUFFER_TRUNC(offsets->begin, buf->chan)
903 >> channel->n_subbufs_order)
904 - (local_read(&buf->commit_count[subbuf_index])
905 & channel->commit_count_mask);
906 if (offsets->reserve_commit_diff == 0) {
907 long consumed;
908
909 consumed = atomic_long_read(&buf->consumed);
910
911 /* Next buffer not corrupted. */
912 if (!channel->overwrite &&
913 (SUBBUF_TRUNC(offsets->begin, buf->chan)
914 - SUBBUF_TRUNC(consumed, buf->chan))
915 >= channel->alloc_size) {
916
917 long consumed_idx = SUBBUF_INDEX(consumed, buf->chan);
918 long commit_count = local_read(&buf->commit_count[consumed_idx]);
919 if(((commit_count - buf->chan->subbuf_size) & channel->commit_count_mask) - (BUFFER_TRUNC(consumed, buf->chan) >> channel->n_subbufs_order) != 0) {
920 WARN("Event dropped. Caused by non-committed event.");
921 }
922 else {
923 WARN("Event dropped. Caused by non-consumed buffer.");
924 }
925 /*
926 * We do not overwrite non consumed buffers
927 * and we are full : event is lost.
928 */
929 local_inc(&buf->events_lost);
930 return -1;
931 } else {
932 /*
933 * next buffer not corrupted, we are either in
934 * overwrite mode or the buffer is not full.
935 * It's safe to write in this new subbuffer.
936 */
937 }
938 } else {
939 /*
940 * Next subbuffer corrupted. Force pushing reader even
941 * in normal mode. It's safe to write in this new
942 * subbuffer.
943 */
944 }
945 offsets->size = ust_get_header_size(channel,
946 offsets->begin, data_size,
947 &offsets->before_hdr_pad, *rflags);
948 offsets->size += ltt_align(offsets->begin + offsets->size,
949 largest_align)
950 + data_size;
951 if ((SUBBUF_OFFSET(offsets->begin, buf->chan) + offsets->size)
952 > buf->chan->subbuf_size) {
953 /*
954 * Event too big for subbuffers, report error, don't
955 * complete the sub-buffer switch.
956 */
957 local_inc(&buf->events_lost);
958 return -1;
959 } else {
960 /*
961 * We just made a successful buffer switch and the event
962 * fits in the new subbuffer. Let's write.
963 */
964 }
965 } else {
966 /*
967 * Event fits in the current buffer and we are not on a switch
968 * boundary. It's safe to write.
969 */
970 }
971 offsets->end = offsets->begin + offsets->size;
972
973 if ((SUBBUF_OFFSET(offsets->end, buf->chan)) == 0) {
974 /*
975 * The offset_end will fall at the very beginning of the next
976 * subbuffer.
977 */
978 offsets->end_switch_current = 1; /* For offsets->begin */
979 }
980 return 0;
981 }
982
983 /*
984 * Returns :
985 * 0 if ok
986 * !0 if execution must be aborted.
987 */
988 static inline int ltt_relay_try_switch(
989 enum force_switch_mode mode,
990 struct ust_channel *channel,
991 struct ust_buffer *buf,
992 struct ltt_reserve_switch_offsets *offsets,
993 u64 *tsc)
994 {
995 long subbuf_index;
996
997 offsets->begin = local_read(&buf->offset);
998 offsets->old = offsets->begin;
999 offsets->begin_switch = 0;
1000 offsets->end_switch_old = 0;
1001
1002 *tsc = trace_clock_read64();
1003
1004 if (SUBBUF_OFFSET(offsets->begin, buf->chan) != 0) {
1005 offsets->begin = SUBBUF_ALIGN(offsets->begin, buf->chan);
1006 offsets->end_switch_old = 1;
1007 } else {
1008 /* we do not have to switch : buffer is empty */
1009 return -1;
1010 }
1011 if (mode == FORCE_ACTIVE)
1012 offsets->begin += ltt_subbuffer_header_size();
1013 /*
1014 * Always begin_switch in FORCE_ACTIVE mode.
1015 * Test new buffer integrity
1016 */
1017 subbuf_index = SUBBUF_INDEX(offsets->begin, buf->chan);
1018 offsets->reserve_commit_diff =
1019 (BUFFER_TRUNC(offsets->begin, buf->chan)
1020 >> channel->n_subbufs_order)
1021 - (local_read(&buf->commit_count[subbuf_index])
1022 & channel->commit_count_mask);
1023 if (offsets->reserve_commit_diff == 0) {
1024 /* Next buffer not corrupted. */
1025 if (mode == FORCE_ACTIVE
1026 && !channel->overwrite
1027 && offsets->begin - atomic_long_read(&buf->consumed)
1028 >= channel->alloc_size) {
1029 /*
1030 * We do not overwrite non consumed buffers and we are
1031 * full : ignore switch while tracing is active.
1032 */
1033 return -1;
1034 }
1035 } else {
1036 /*
1037 * Next subbuffer corrupted. Force pushing reader even in normal
1038 * mode
1039 */
1040 }
1041 offsets->end = offsets->begin;
1042 return 0;
1043 }
1044
1045 static inline void ltt_reserve_push_reader(
1046 struct ust_channel *channel,
1047 struct ust_buffer *buf,
1048 struct ltt_reserve_switch_offsets *offsets)
1049 {
1050 long consumed_old, consumed_new;
1051
1052 do {
1053 consumed_old = atomic_long_read(&buf->consumed);
1054 /*
1055 * If buffer is in overwrite mode, push the reader consumed
1056 * count if the write position has reached it and we are not
1057 * at the first iteration (don't push the reader farther than
1058 * the writer). This operation can be done concurrently by many
1059 * writers in the same buffer, the writer being at the farthest
1060 * write position sub-buffer index in the buffer being the one
1061 * which will win this loop.
1062 * If the buffer is not in overwrite mode, pushing the reader
1063 * only happens if a sub-buffer is corrupted.
1064 */
1065 if ((SUBBUF_TRUNC(offsets->end-1, buf->chan)
1066 - SUBBUF_TRUNC(consumed_old, buf->chan))
1067 >= channel->alloc_size)
1068 consumed_new = SUBBUF_ALIGN(consumed_old, buf->chan);
1069 else {
1070 consumed_new = consumed_old;
1071 break;
1072 }
1073 } while (atomic_long_cmpxchg(&buf->consumed, consumed_old,
1074 consumed_new) != consumed_old);
1075
1076 if (consumed_old != consumed_new) {
1077 /*
1078 * Reader pushed : we are the winner of the push, we can
1079 * therefore reequilibrate reserve and commit. Atomic increment
1080 * of the commit count permits other writers to play around
1081 * with this variable before us. We keep track of
1082 * corrupted_subbuffers even in overwrite mode :
1083 * we never want to write over a non completely committed
1084 * sub-buffer : possible causes : the buffer size is too low
1085 * compared to the unordered data input, or there is a writer
1086 * that died between the reserve and the commit.
1087 */
1088 if (offsets->reserve_commit_diff) {
1089 /*
1090 * We have to alter the sub-buffer commit count.
1091 * We do not deliver the previous subbuffer, given it
1092 * was either corrupted or not consumed (overwrite
1093 * mode).
1094 */
1095 local_add(offsets->reserve_commit_diff,
1096 &buf->commit_count[
1097 SUBBUF_INDEX(offsets->begin,
1098 buf->chan)]);
1099 if (!channel->overwrite
1100 || offsets->reserve_commit_diff
1101 != channel->subbuf_size) {
1102 /*
1103 * The reserve commit diff was not subbuf_size :
1104 * it means the subbuffer was partly written to
1105 * and is therefore corrupted. If it is multiple
1106 * of subbuffer size and we are in flight
1107 * recorder mode, we are skipping over a whole
1108 * subbuffer.
1109 */
1110 local_inc(&buf->corrupted_subbuffers);
1111 }
1112 }
1113 }
1114 }
1115
1116
1117 /*
1118 * ltt_reserve_switch_old_subbuf: switch old subbuffer
1119 *
1120 * Concurrency safe because we are the last and only thread to alter this
1121 * sub-buffer. As long as it is not delivered and read, no other thread can
1122 * alter the offset, alter the reserve_count or call the
1123 * client_buffer_end_callback on this sub-buffer.
1124 *
1125 * The only remaining threads could be the ones with pending commits. They will
1126 * have to do the deliver themselves. Not concurrency safe in overwrite mode.
1127 * We detect corrupted subbuffers with commit and reserve counts. We keep a
1128 * corrupted sub-buffers count and push the readers across these sub-buffers.
1129 *
1130 * Not concurrency safe if a writer is stalled in a subbuffer and another writer
1131 * switches in, finding out it's corrupted. The result will be than the old
1132 * (uncommited) subbuffer will be declared corrupted, and that the new subbuffer
1133 * will be declared corrupted too because of the commit count adjustment.
1134 *
1135 * Note : offset_old should never be 0 here.
1136 */
1137 static inline void ltt_reserve_switch_old_subbuf(
1138 struct ust_channel *channel,
1139 struct ust_buffer *buf,
1140 struct ltt_reserve_switch_offsets *offsets, u64 *tsc)
1141 {
1142 long oldidx = SUBBUF_INDEX(offsets->old - 1, channel);
1143
1144 channel->buffer_end(buf, *tsc, offsets->old, oldidx);
1145 /* Must write buffer end before incrementing commit count */
1146 smp_wmb();
1147 offsets->commit_count =
1148 local_add_return(channel->subbuf_size
1149 - (SUBBUF_OFFSET(offsets->old - 1, channel)
1150 + 1),
1151 &buf->commit_count[oldidx]);
1152 if ((BUFFER_TRUNC(offsets->old - 1, channel)
1153 >> channel->n_subbufs_order)
1154 - ((offsets->commit_count - channel->subbuf_size)
1155 & channel->commit_count_mask) == 0)
1156 ltt_deliver(buf, oldidx, offsets->commit_count);
1157 }
1158
1159 /*
1160 * ltt_reserve_switch_new_subbuf: Populate new subbuffer.
1161 *
1162 * This code can be executed unordered : writers may already have written to the
1163 * sub-buffer before this code gets executed, caution. The commit makes sure
1164 * that this code is executed before the deliver of this sub-buffer.
1165 */
1166 static /*inline*/ void ltt_reserve_switch_new_subbuf(
1167 struct ust_channel *channel,
1168 struct ust_buffer *buf,
1169 struct ltt_reserve_switch_offsets *offsets, u64 *tsc)
1170 {
1171 long beginidx = SUBBUF_INDEX(offsets->begin, channel);
1172
1173 channel->buffer_begin(buf, *tsc, beginidx);
1174 /* Must write buffer end before incrementing commit count */
1175 smp_wmb();
1176 offsets->commit_count = local_add_return(ltt_subbuffer_header_size(),
1177 &buf->commit_count[beginidx]);
1178 /* Check if the written buffer has to be delivered */
1179 if ((BUFFER_TRUNC(offsets->begin, channel)
1180 >> channel->n_subbufs_order)
1181 - ((offsets->commit_count - channel->subbuf_size)
1182 & channel->commit_count_mask) == 0)
1183 ltt_deliver(buf, beginidx, offsets->commit_count);
1184 }
1185
1186
1187 /*
1188 * ltt_reserve_end_switch_current: finish switching current subbuffer
1189 *
1190 * Concurrency safe because we are the last and only thread to alter this
1191 * sub-buffer. As long as it is not delivered and read, no other thread can
1192 * alter the offset, alter the reserve_count or call the
1193 * client_buffer_end_callback on this sub-buffer.
1194 *
1195 * The only remaining threads could be the ones with pending commits. They will
1196 * have to do the deliver themselves. Not concurrency safe in overwrite mode.
1197 * We detect corrupted subbuffers with commit and reserve counts. We keep a
1198 * corrupted sub-buffers count and push the readers across these sub-buffers.
1199 *
1200 * Not concurrency safe if a writer is stalled in a subbuffer and another writer
1201 * switches in, finding out it's corrupted. The result will be than the old
1202 * (uncommited) subbuffer will be declared corrupted, and that the new subbuffer
1203 * will be declared corrupted too because of the commit count adjustment.
1204 */
1205 static inline void ltt_reserve_end_switch_current(
1206 struct ust_channel *channel,
1207 struct ust_buffer *buf,
1208 struct ltt_reserve_switch_offsets *offsets, u64 *tsc)
1209 {
1210 long endidx = SUBBUF_INDEX(offsets->end - 1, channel);
1211
1212 channel->buffer_end(buf, *tsc, offsets->end, endidx);
1213 /* Must write buffer begin before incrementing commit count */
1214 smp_wmb();
1215 offsets->commit_count =
1216 local_add_return(channel->subbuf_size
1217 - (SUBBUF_OFFSET(offsets->end - 1, channel)
1218 + 1),
1219 &buf->commit_count[endidx]);
1220 if ((BUFFER_TRUNC(offsets->end - 1, channel)
1221 >> channel->n_subbufs_order)
1222 - ((offsets->commit_count - channel->subbuf_size)
1223 & channel->commit_count_mask) == 0)
1224 ltt_deliver(buf, endidx, offsets->commit_count);
1225 }
1226
1227 /**
1228 * ltt_relay_reserve_slot - Atomic slot reservation in a LTTng buffer.
1229 * @trace: the trace structure to log to.
1230 * @ltt_channel: channel structure
1231 * @transport_data: data structure specific to ltt relay
1232 * @data_size: size of the variable length data to log.
1233 * @slot_size: pointer to total size of the slot (out)
1234 * @buf_offset : pointer to reserved buffer offset (out)
1235 * @tsc: pointer to the tsc at the slot reservation (out)
1236 * @cpu: cpuid
1237 *
1238 * Return : -ENOSPC if not enough space, else returns 0.
1239 * It will take care of sub-buffer switching.
1240 */
1241 static notrace int ltt_relay_reserve_slot(struct ltt_trace_struct *trace,
1242 struct ust_channel *channel, void **transport_data,
1243 size_t data_size, size_t *slot_size, long *buf_offset, u64 *tsc,
1244 unsigned int *rflags, int largest_align, int cpu)
1245 {
1246 struct ust_buffer *buf = *transport_data = channel->buf[cpu];
1247 struct ltt_reserve_switch_offsets offsets;
1248
1249 offsets.reserve_commit_diff = 0;
1250 offsets.size = 0;
1251
1252 /*
1253 * Perform retryable operations.
1254 */
1255 if (ltt_nesting > 4) {
1256 local_inc(&buf->events_lost);
1257 return -EPERM;
1258 }
1259 do {
1260 if (ltt_relay_try_reserve(channel, buf, &offsets, data_size, tsc, rflags,
1261 largest_align))
1262 return -ENOSPC;
1263 } while (local_cmpxchg(&buf->offset, offsets.old,
1264 offsets.end) != offsets.old);
1265
1266 /*
1267 * Atomically update last_tsc. This update races against concurrent
1268 * atomic updates, but the race will always cause supplementary full TSC
1269 * events, never the opposite (missing a full TSC event when it would be
1270 * needed).
1271 */
1272 save_last_tsc(buf, *tsc);
1273
1274 /*
1275 * Push the reader if necessary
1276 */
1277 ltt_reserve_push_reader(channel, buf, &offsets);
1278
1279 /*
1280 * Switch old subbuffer if needed.
1281 */
1282 if (offsets.end_switch_old)
1283 ltt_reserve_switch_old_subbuf(channel, buf, &offsets, tsc);
1284
1285 /*
1286 * Populate new subbuffer.
1287 */
1288 if (offsets.begin_switch)
1289 ltt_reserve_switch_new_subbuf(channel, buf, &offsets, tsc);
1290
1291 if (offsets.end_switch_current)
1292 ltt_reserve_end_switch_current(channel, buf, &offsets, tsc);
1293
1294 *slot_size = offsets.size;
1295 *buf_offset = offsets.begin + offsets.before_hdr_pad;
1296 return 0;
1297 }
1298
1299 /*
1300 * Force a sub-buffer switch for a per-cpu buffer. This operation is
1301 * completely reentrant : can be called while tracing is active with
1302 * absolutely no lock held.
1303 *
1304 * Note, however, that as a local_cmpxchg is used for some atomic
1305 * operations, this function must be called from the CPU which owns the buffer
1306 * for a ACTIVE flush.
1307 */
1308 static notrace void ltt_force_switch(struct ust_buffer *buf,
1309 enum force_switch_mode mode)
1310 {
1311 struct ust_channel *channel = buf->chan;
1312 struct ltt_reserve_switch_offsets offsets;
1313 u64 tsc;
1314
1315 offsets.reserve_commit_diff = 0;
1316 offsets.size = 0;
1317
1318 /*
1319 * Perform retryable operations.
1320 */
1321 do {
1322 if (ltt_relay_try_switch(mode, channel, buf, &offsets, &tsc))
1323 return;
1324 } while (local_cmpxchg(&buf->offset, offsets.old,
1325 offsets.end) != offsets.old);
1326
1327 /*
1328 * Atomically update last_tsc. This update races against concurrent
1329 * atomic updates, but the race will always cause supplementary full TSC
1330 * events, never the opposite (missing a full TSC event when it would be
1331 * needed).
1332 */
1333 save_last_tsc(buf, tsc);
1334
1335 /*
1336 * Push the reader if necessary
1337 */
1338 if (mode == FORCE_ACTIVE)
1339 ltt_reserve_push_reader(channel, buf, &offsets);
1340
1341 /*
1342 * Switch old subbuffer if needed.
1343 */
1344 if (offsets.end_switch_old)
1345 ltt_reserve_switch_old_subbuf(channel, buf, &offsets, &tsc);
1346
1347 /*
1348 * Populate new subbuffer.
1349 */
1350 if (mode == FORCE_ACTIVE)
1351 ltt_reserve_switch_new_subbuf(channel, buf, &offsets, &tsc);
1352 }
1353
1354 static struct ltt_transport ust_relay_transport = {
1355 .name = "ustrelay",
1356 .ops = {
1357 .create_channel = ust_buffers_create_channel,
1358 .finish_channel = ltt_relay_finish_channel,
1359 .remove_channel = ltt_relay_remove_channel,
1360 .wakeup_channel = ltt_relay_async_wakeup_chan,
1361 // .commit_slot = ltt_relay_commit_slot,
1362 .reserve_slot = ltt_relay_reserve_slot,
1363 },
1364 };
1365
1366 /*
1367 * for flight recording. must be called after relay_commit.
1368 * This function decrements de subbuffer's lost_size each time the commit count
1369 * reaches back the reserve offset (module subbuffer size). It is useful for
1370 * crash dump.
1371 */
1372 static /* inline */ void ltt_write_commit_counter(struct ust_buffer *buf,
1373 struct ust_buffer *ltt_buf,
1374 long idx, long buf_offset, long commit_count, size_t data_size)
1375 {
1376 long offset;
1377 long commit_seq_old;
1378
1379 offset = buf_offset + data_size;
1380
1381 /*
1382 * SUBBUF_OFFSET includes commit_count_mask. We can simply
1383 * compare the offsets within the subbuffer without caring about
1384 * buffer full/empty mismatch because offset is never zero here
1385 * (subbuffer header and event headers have non-zero length).
1386 */
1387 if (unlikely(SUBBUF_OFFSET(offset - commit_count, buf->chan)))
1388 return;
1389
1390 commit_seq_old = local_read(&ltt_buf->commit_seq[idx]);
1391 while (commit_seq_old < commit_count)
1392 commit_seq_old = local_cmpxchg(&ltt_buf->commit_seq[idx],
1393 commit_seq_old, commit_count);
1394 }
1395
1396 /*
1397 * Atomic unordered slot commit. Increments the commit count in the
1398 * specified sub-buffer, and delivers it if necessary.
1399 *
1400 * Parameters:
1401 *
1402 * @ltt_channel : channel structure
1403 * @transport_data: transport-specific data
1404 * @buf_offset : offset following the event header.
1405 * @data_size : size of the event data.
1406 * @slot_size : size of the reserved slot.
1407 */
1408 /* FIXME: make this function static inline in the .h! */
1409 /*static*/ /* inline */ notrace void ltt_commit_slot(
1410 struct ust_channel *channel,
1411 void **transport_data, long buf_offset,
1412 size_t data_size, size_t slot_size)
1413 {
1414 struct ust_buffer *buf = *transport_data;
1415 long offset_end = buf_offset;
1416 long endidx = SUBBUF_INDEX(offset_end - 1, channel);
1417 long commit_count;
1418
1419 /* Must write slot data before incrementing commit count */
1420 smp_wmb();
1421 commit_count = local_add_return(slot_size,
1422 &buf->commit_count[endidx]);
1423 /* Check if all commits have been done */
1424 if ((BUFFER_TRUNC(offset_end - 1, channel)
1425 >> channel->n_subbufs_order)
1426 - ((commit_count - channel->subbuf_size)
1427 & channel->commit_count_mask) == 0)
1428 ltt_deliver(buf, endidx, commit_count);
1429 /*
1430 * Update lost_size for each commit. It's needed only for extracting
1431 * ltt buffers from vmcore, after crash.
1432 */
1433 ltt_write_commit_counter(buf, buf, endidx,
1434 buf_offset, commit_count, data_size);
1435 }
1436
1437
1438 static char initialized = 0;
1439
1440 void __attribute__((constructor)) init_ustrelay_transport(void)
1441 {
1442 if(!initialized) {
1443 ltt_transport_register(&ust_relay_transport);
1444 initialized = 1;
1445 }
1446 }
1447
1448 static void __attribute__((destructor)) ltt_relay_exit(void)
1449 {
1450 ltt_transport_unregister(&ust_relay_transport);
1451 }
This page took 0.077802 seconds and 5 git commands to generate.