quick and dirty fix for message maximum length bug
[ust.git] / libust / buffers.c
CommitLineData
b5b073e2
PMF
1/*
2 * buffers.c
3 * LTTng userspace tracer buffering system
4 *
5 * Copyright (C) 2009 - Pierre-Marc Fournier (pierre-marc dot fournier at polymtl dot ca)
6 * Copyright (C) 2008 - Mathieu Desnoyers (mathieu.desnoyers@polymtl.ca)
7 *
8 * This library is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License as published by the Free Software Foundation; either
11 * version 2.1 of the License, or (at your option) any later version.
12 *
13 * This library is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 * Lesser General Public License for more details.
17 *
18 * You should have received a copy of the GNU Lesser General Public
19 * License along with this library; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
21 */
22
204141ee 23#include <unistd.h>
b5b073e2
PMF
24#include <sys/mman.h>
25#include <sys/ipc.h>
26#include <sys/shm.h>
27#include <fcntl.h>
28#include <ust/kernelcompat.h>
29#include <kcompat/kref.h>
30#include "buffers.h"
31#include "channels.h"
32#include "tracer.h"
33#include "tracercore.h"
34#include "usterr.h"
35
36static DEFINE_MUTEX(ust_buffers_channels_mutex);
37static LIST_HEAD(ust_buffers_channels);
38
204141ee
PMF
39static int get_n_cpus(void)
40{
41 int result;
42 static int n_cpus = 0;
43
44 if(n_cpus) {
45 return n_cpus;
46 }
47
a0243ab1
PMF
48 /* On Linux, when some processors are offline
49 * _SC_NPROCESSORS_CONF counts the offline
50 * processors, whereas _SC_NPROCESSORS_ONLN
51 * does not. If we used _SC_NPROCESSORS_ONLN,
52 * getcpu() could return a value greater than
53 * this sysconf, in which case the arrays
54 * indexed by processor would overflow.
55 */
56 result = sysconf(_SC_NPROCESSORS_CONF);
204141ee
PMF
57 if(result == -1) {
58 return -1;
59 }
60
61 n_cpus = result;
62
63 return result;
64}
65
b5b073e2
PMF
66static int ust_buffers_init_buffer(struct ltt_trace_struct *trace,
67 struct ust_channel *ltt_chan,
68 struct ust_buffer *buf,
69 unsigned int n_subbufs);
70
71static int ust_buffers_alloc_buf(struct ust_buffer *buf, size_t *size)
72{
73 void *ptr;
74 int result;
75
76 *size = PAGE_ALIGN(*size);
77
78 result = buf->shmid = shmget(getpid(), *size, IPC_CREAT | IPC_EXCL | 0700);
79 if(result == -1 && errno == EINVAL) {
80 ERR("shmget() returned EINVAL; maybe /proc/sys/kernel/shmmax should be increased.");
81 return -1;
82 }
83 else if(result == -1) {
84 PERROR("shmget");
85 return -1;
86 }
87
204141ee 88 /* FIXME: should have matching call to shmdt */
b5b073e2
PMF
89 ptr = shmat(buf->shmid, NULL, 0);
90 if(ptr == (void *) -1) {
91 perror("shmat");
92 goto destroy_shmem;
93 }
94
95 /* Already mark the shared memory for destruction. This will occur only
96 * when all users have detached.
97 */
98 result = shmctl(buf->shmid, IPC_RMID, NULL);
99 if(result == -1) {
100 perror("shmctl");
101 return -1;
102 }
103
104 buf->buf_data = ptr;
105 buf->buf_size = *size;
106
107 return 0;
108
109 destroy_shmem:
110 result = shmctl(buf->shmid, IPC_RMID, NULL);
111 if(result == -1) {
112 perror("shmctl");
113 }
114
115 return -1;
116}
117
204141ee 118int ust_buffers_create_buf(struct ust_channel *channel, int cpu)
b5b073e2
PMF
119{
120 int result;
204141ee 121 struct ust_buffer *buf = channel->buf[cpu];
b5b073e2 122
204141ee
PMF
123 buf->cpu = cpu;
124 result = ust_buffers_alloc_buf(buf, &channel->alloc_size);
b5b073e2 125 if(result)
204141ee 126 return -1;
b5b073e2 127
204141ee 128 buf->chan = channel;
b5b073e2 129 kref_get(&channel->kref);
204141ee 130 return 0;
b5b073e2
PMF
131}
132
133static void ust_buffers_destroy_channel(struct kref *kref)
134{
135 struct ust_channel *chan = container_of(kref, struct ust_channel, kref);
136 free(chan);
137}
138
139static void ust_buffers_destroy_buf(struct ust_buffer *buf)
140{
141 struct ust_channel *chan = buf->chan;
142 int result;
143
144 result = munmap(buf->buf_data, buf->buf_size);
145 if(result == -1) {
146 PERROR("munmap");
147 }
148
204141ee 149//ust// chan->buf[buf->cpu] = NULL;
b5b073e2
PMF
150 free(buf);
151 kref_put(&chan->kref, ust_buffers_destroy_channel);
152}
153
154/* called from kref_put */
155static void ust_buffers_remove_buf(struct kref *kref)
156{
157 struct ust_buffer *buf = container_of(kref, struct ust_buffer, kref);
158 ust_buffers_destroy_buf(buf);
159}
160
204141ee 161int ust_buffers_open_buf(struct ust_channel *chan, int cpu)
b5b073e2 162{
204141ee 163 int result;
b5b073e2 164
204141ee
PMF
165 result = ust_buffers_create_buf(chan, cpu);
166 if (result == -1)
167 return -1;
b5b073e2 168
204141ee 169 kref_init(&chan->buf[cpu]->kref);
b5b073e2 170
204141ee
PMF
171 result = ust_buffers_init_buffer(chan->trace, chan, chan->buf[cpu], chan->subbuf_cnt);
172 if(result == -1)
173 return -1;
b5b073e2 174
204141ee 175 return 0;
b5b073e2
PMF
176
177 /* FIXME: decrementally destroy on error? */
178}
179
180/**
181 * ust_buffers_close_buf - close a channel buffer
182 * @buf: buffer
183 */
184static void ust_buffers_close_buf(struct ust_buffer *buf)
185{
186 kref_put(&buf->kref, ust_buffers_remove_buf);
187}
188
189int ust_buffers_channel_open(struct ust_channel *chan, size_t subbuf_size, size_t subbuf_cnt)
190{
204141ee
PMF
191 int i;
192 int result;
193
b5b073e2
PMF
194 if(subbuf_size == 0 || subbuf_cnt == 0)
195 return -1;
196
197 chan->version = UST_CHANNEL_VERSION;
198 chan->subbuf_cnt = subbuf_cnt;
199 chan->subbuf_size = subbuf_size;
200 chan->subbuf_size_order = get_count_order(subbuf_size);
201 chan->alloc_size = FIX_SIZE(subbuf_size * subbuf_cnt);
204141ee 202
b5b073e2
PMF
203 kref_init(&chan->kref);
204
205 mutex_lock(&ust_buffers_channels_mutex);
204141ee
PMF
206 for(i=0; i<chan->n_cpus; i++) {
207 result = ust_buffers_open_buf(chan, i);
208 if (result == -1)
209 goto error;
210 }
b5b073e2
PMF
211 list_add(&chan->list, &ust_buffers_channels);
212 mutex_unlock(&ust_buffers_channels_mutex);
213
214 return 0;
215
204141ee
PMF
216 /* Jump directly inside the loop to close the buffers that were already
217 * opened. */
218 for(; i>=0; i--) {
219 ust_buffers_close_buf(chan->buf[i]);
220error:
120b0ec3 221 do {} while(0);
204141ee
PMF
222 }
223
b5b073e2
PMF
224 kref_put(&chan->kref, ust_buffers_destroy_channel);
225 mutex_unlock(&ust_buffers_channels_mutex);
226 return -1;
227}
228
229void ust_buffers_channel_close(struct ust_channel *chan)
230{
204141ee
PMF
231 int i;
232 if(!chan)
b5b073e2
PMF
233 return;
234
235 mutex_lock(&ust_buffers_channels_mutex);
204141ee
PMF
236 for(i=0; i<chan->n_cpus; i++) {
237 /* FIXME: if we make it here, then all buffers were necessarily allocated. Moreover, we don't
238 * initialize to NULL so we cannot use this check. Should we? */
239//ust// if (chan->buf[i])
240 ust_buffers_close_buf(chan->buf[i]);
241 }
b5b073e2
PMF
242
243 list_del(&chan->list);
244 kref_put(&chan->kref, ust_buffers_destroy_channel);
245 mutex_unlock(&ust_buffers_channels_mutex);
246}
247
248/* _ust_buffers_write()
249 *
250 * @buf: destination buffer
251 * @offset: offset in destination
252 * @src: source buffer
253 * @len: length of source
254 * @cpy: already copied
255 */
256
257void _ust_buffers_write(struct ust_buffer *buf, size_t offset,
258 const void *src, size_t len, ssize_t cpy)
259{
260 do {
261 len -= cpy;
262 src += cpy;
263 offset += cpy;
204141ee 264
b5b073e2
PMF
265 WARN_ON(offset >= buf->buf_size);
266
267 cpy = min_t(size_t, len, buf->buf_size - offset);
268 ust_buffers_do_copy(buf->buf_data + offset, src, cpy);
269 } while (unlikely(len != cpy));
270}
271
b5b073e2
PMF
272void *ltt_buffers_offset_address(struct ust_buffer *buf, size_t offset)
273{
274 return ((char *)buf->buf_data)+offset;
275}
276
277/*
278 * -------
279 */
280
281/*
282 * Last TSC comparison functions. Check if the current TSC overflows
283 * LTT_TSC_BITS bits from the last TSC read. Reads and writes last_tsc
284 * atomically.
285 */
286
287/* FIXME: does this test work properly? */
288#if (BITS_PER_LONG == 32)
c9dab68a 289static inline void save_last_tsc(struct ust_buffer *ltt_buf,
b5b073e2
PMF
290 u64 tsc)
291{
292 ltt_buf->last_tsc = (unsigned long)(tsc >> LTT_TSC_BITS);
293}
294
c9dab68a 295static inline int last_tsc_overflow(struct ust_buffer *ltt_buf,
b5b073e2
PMF
296 u64 tsc)
297{
298 unsigned long tsc_shifted = (unsigned long)(tsc >> LTT_TSC_BITS);
299
300 if (unlikely((tsc_shifted - ltt_buf->last_tsc)))
301 return 1;
302 else
303 return 0;
304}
305#else
306static inline void save_last_tsc(struct ust_buffer *ltt_buf,
307 u64 tsc)
308{
309 ltt_buf->last_tsc = (unsigned long)tsc;
310}
311
312static inline int last_tsc_overflow(struct ust_buffer *ltt_buf,
313 u64 tsc)
314{
315 if (unlikely((tsc - ltt_buf->last_tsc) >> LTT_TSC_BITS))
316 return 1;
317 else
318 return 0;
319}
320#endif
321
322/*
323 * A switch is done during tracing or as a final flush after tracing (so it
324 * won't write in the new sub-buffer).
325 */
326enum force_switch_mode { FORCE_ACTIVE, FORCE_FLUSH };
327
204141ee 328static void ust_buffers_destroy_buffer(struct ust_channel *ltt_chan, int cpu);
b5b073e2
PMF
329
330static void ltt_force_switch(struct ust_buffer *buf,
331 enum force_switch_mode mode);
332
333/*
334 * Trace callbacks
335 */
336static void ltt_buffer_begin_callback(struct ust_buffer *buf,
337 u64 tsc, unsigned int subbuf_idx)
338{
339 struct ust_channel *channel = buf->chan;
340 struct ltt_subbuffer_header *header =
341 (struct ltt_subbuffer_header *)
342 ltt_buffers_offset_address(buf,
343 subbuf_idx * buf->chan->subbuf_size);
344
345 header->cycle_count_begin = tsc;
346 header->lost_size = 0xFFFFFFFF; /* for debugging */
347 header->buf_size = buf->chan->subbuf_size;
348 ltt_write_trace_header(channel->trace, header);
349}
350
351/*
352 * offset is assumed to never be 0 here : never deliver a completely empty
353 * subbuffer. The lost size is between 0 and subbuf_size-1.
354 */
355static notrace void ltt_buffer_end_callback(struct ust_buffer *buf,
356 u64 tsc, unsigned int offset, unsigned int subbuf_idx)
357{
358 struct ltt_subbuffer_header *header =
359 (struct ltt_subbuffer_header *)
360 ltt_buffers_offset_address(buf,
361 subbuf_idx * buf->chan->subbuf_size);
362
363 header->lost_size = SUBBUF_OFFSET((buf->chan->subbuf_size - offset),
364 buf->chan);
365 header->cycle_count_end = tsc;
366 header->events_lost = local_read(&buf->events_lost);
367 header->subbuf_corrupt = local_read(&buf->corrupted_subbuffers);
368
369}
370
371void (*wake_consumer)(void *, int) = NULL;
372
373void relay_set_wake_consumer(void (*wake)(void *, int))
374{
375 wake_consumer = wake;
376}
377
378void relay_wake_consumer(void *arg, int finished)
379{
380 if(wake_consumer)
381 wake_consumer(arg, finished);
382}
383
384static notrace void ltt_deliver(struct ust_buffer *buf, unsigned int subbuf_idx,
385 long commit_count)
386{
387 int result;
388
389//ust// #ifdef CONFIG_LTT_VMCORE
390 local_set(&buf->commit_seq[subbuf_idx], commit_count);
391//ust// #endif
392
393 /* wakeup consumer */
394 result = write(buf->data_ready_fd_write, "1", 1);
395 if(result == -1) {
396 PERROR("write (in ltt_relay_buffer_flush)");
397 ERR("this should never happen!");
398 }
399//ust// atomic_set(&ltt_buf->wakeup_readers, 1);
400}
401
402/*
403 * This function should not be called from NMI interrupt context
404 */
405static notrace void ltt_buf_unfull(struct ust_buffer *buf,
406 unsigned int subbuf_idx,
407 long offset)
408{
409//ust// struct ltt_channel_struct *ltt_channel =
410//ust// (struct ltt_channel_struct *)buf->chan->private_data;
411//ust// struct ltt_channel_buf_struct *ltt_buf = ltt_channel->buf;
412//ust//
413//ust// ltt_relay_wake_writers(ltt_buf);
414}
415
416int ust_buffers_do_get_subbuf(struct ust_buffer *buf, long *pconsumed_old)
417{
418 struct ust_channel *channel = buf->chan;
419 long consumed_old, consumed_idx, commit_count, write_offset;
420 consumed_old = atomic_long_read(&buf->consumed);
421 consumed_idx = SUBBUF_INDEX(consumed_old, buf->chan);
422 commit_count = local_read(&buf->commit_count[consumed_idx]);
423 /*
424 * Make sure we read the commit count before reading the buffer
425 * data and the write offset. Correct consumed offset ordering
426 * wrt commit count is insured by the use of cmpxchg to update
427 * the consumed offset.
428 */
429 smp_rmb();
430 write_offset = local_read(&buf->offset);
431 /*
432 * Check that the subbuffer we are trying to consume has been
433 * already fully committed.
434 */
435 if (((commit_count - buf->chan->subbuf_size)
436 & channel->commit_count_mask)
437 - (BUFFER_TRUNC(consumed_old, buf->chan)
438 >> channel->n_subbufs_order)
439 != 0) {
440 return -EAGAIN;
441 }
442 /*
443 * Check that we are not about to read the same subbuffer in
444 * which the writer head is.
445 */
446 if ((SUBBUF_TRUNC(write_offset, buf->chan)
447 - SUBBUF_TRUNC(consumed_old, buf->chan))
448 == 0) {
449 return -EAGAIN;
450 }
451
452 *pconsumed_old = consumed_old;
453 return 0;
454}
455
456int ust_buffers_do_put_subbuf(struct ust_buffer *buf, u32 uconsumed_old)
457{
458 long consumed_new, consumed_old;
459
460 consumed_old = atomic_long_read(&buf->consumed);
461 consumed_old = consumed_old & (~0xFFFFFFFFL);
462 consumed_old = consumed_old | uconsumed_old;
463 consumed_new = SUBBUF_ALIGN(consumed_old, buf->chan);
464
465//ust// spin_lock(&ltt_buf->full_lock);
466 if (atomic_long_cmpxchg(&buf->consumed, consumed_old,
467 consumed_new)
468 != consumed_old) {
469 /* We have been pushed by the writer : the last
470 * buffer read _is_ corrupted! It can also
471 * happen if this is a buffer we never got. */
472//ust// spin_unlock(&ltt_buf->full_lock);
473 return -EIO;
474 } else {
475 /* tell the client that buffer is now unfull */
476 int index;
477 long data;
478 index = SUBBUF_INDEX(consumed_old, buf->chan);
479 data = BUFFER_OFFSET(consumed_old, buf->chan);
480 ltt_buf_unfull(buf, index, data);
481//ust// spin_unlock(&ltt_buf->full_lock);
482 }
483 return 0;
484}
485
486static void ltt_relay_print_subbuffer_errors(
487 struct ust_channel *channel,
204141ee 488 long cons_off, int cpu)
b5b073e2 489{
204141ee 490 struct ust_buffer *ltt_buf = channel->buf[cpu];
b5b073e2
PMF
491 long cons_idx, commit_count, write_offset;
492
493 cons_idx = SUBBUF_INDEX(cons_off, channel);
494 commit_count = local_read(&ltt_buf->commit_count[cons_idx]);
495 /*
496 * No need to order commit_count and write_offset reads because we
497 * execute after trace is stopped when there are no readers left.
498 */
499 write_offset = local_read(&ltt_buf->offset);
500 WARN( "LTT : unread channel %s offset is %ld "
501 "and cons_off : %ld\n",
502 channel->channel_name, write_offset, cons_off);
503 /* Check each sub-buffer for non filled commit count */
504 if (((commit_count - channel->subbuf_size) & channel->commit_count_mask)
505 - (BUFFER_TRUNC(cons_off, channel) >> channel->n_subbufs_order) != 0) {
506 ERR("LTT : %s : subbuffer %lu has non filled "
507 "commit count %lu.\n",
508 channel->channel_name, cons_idx, commit_count);
509 }
510 ERR("LTT : %s : commit count : %lu, subbuf size %zd\n",
511 channel->channel_name, commit_count,
512 channel->subbuf_size);
513}
514
515static void ltt_relay_print_errors(struct ltt_trace_struct *trace,
204141ee 516 struct ust_channel *channel, int cpu)
b5b073e2 517{
204141ee 518 struct ust_buffer *ltt_buf = channel->buf[cpu];
b5b073e2
PMF
519 long cons_off;
520
4292ed8a
PMF
521 /*
522 * Can be called in the error path of allocation when
523 * trans_channel_data is not yet set.
524 */
525 if (!channel)
526 return;
527
b5b073e2
PMF
528 for (cons_off = atomic_long_read(&ltt_buf->consumed);
529 (SUBBUF_TRUNC(local_read(&ltt_buf->offset),
530 channel)
531 - cons_off) > 0;
532 cons_off = SUBBUF_ALIGN(cons_off, channel))
204141ee 533 ltt_relay_print_subbuffer_errors(channel, cons_off, cpu);
b5b073e2
PMF
534}
535
204141ee 536static void ltt_relay_print_buffer_errors(struct ust_channel *channel, int cpu)
b5b073e2
PMF
537{
538 struct ltt_trace_struct *trace = channel->trace;
204141ee 539 struct ust_buffer *ltt_buf = channel->buf[cpu];
b5b073e2
PMF
540
541 if (local_read(&ltt_buf->events_lost))
c1f20530 542 ERR("channel %s: %ld events lost",
b5b073e2 543 channel->channel_name,
c1f20530 544 local_read(&ltt_buf->events_lost));
b5b073e2 545 if (local_read(&ltt_buf->corrupted_subbuffers))
c1f20530 546 ERR("channel %s : %ld corrupted subbuffers",
b5b073e2 547 channel->channel_name,
c1f20530 548 local_read(&ltt_buf->corrupted_subbuffers));
b5b073e2 549
204141ee 550 ltt_relay_print_errors(trace, channel, cpu);
b5b073e2
PMF
551}
552
553static void ltt_relay_release_channel(struct kref *kref)
554{
555 struct ust_channel *ltt_chan = container_of(kref,
556 struct ust_channel, kref);
557 free(ltt_chan->buf);
558}
559
560/*
561 * Create ltt buffer.
562 */
563//ust// static int ltt_relay_create_buffer(struct ltt_trace_struct *trace,
564//ust// struct ltt_channel_struct *ltt_chan, struct rchan_buf *buf,
565//ust// unsigned int cpu, unsigned int n_subbufs)
566//ust// {
567//ust// struct ltt_channel_buf_struct *ltt_buf =
568//ust// percpu_ptr(ltt_chan->buf, cpu);
569//ust// unsigned int j;
570//ust//
571//ust// ltt_buf->commit_count =
572//ust// kzalloc_node(sizeof(ltt_buf->commit_count) * n_subbufs,
573//ust// GFP_KERNEL, cpu_to_node(cpu));
574//ust// if (!ltt_buf->commit_count)
575//ust// return -ENOMEM;
576//ust// kref_get(&trace->kref);
577//ust// kref_get(&trace->ltt_transport_kref);
578//ust// kref_get(&ltt_chan->kref);
579//ust// local_set(&ltt_buf->offset, ltt_subbuffer_header_size());
580//ust// atomic_long_set(&ltt_buf->consumed, 0);
581//ust// atomic_long_set(&ltt_buf->active_readers, 0);
582//ust// for (j = 0; j < n_subbufs; j++)
583//ust// local_set(&ltt_buf->commit_count[j], 0);
584//ust// init_waitqueue_head(&ltt_buf->write_wait);
585//ust// atomic_set(&ltt_buf->wakeup_readers, 0);
586//ust// spin_lock_init(&ltt_buf->full_lock);
587//ust//
588//ust// ltt_buffer_begin_callback(buf, trace->start_tsc, 0);
589//ust// /* atomic_add made on local variable on data that belongs to
590//ust// * various CPUs : ok because tracing not started (for this cpu). */
591//ust// local_add(ltt_subbuffer_header_size(), &ltt_buf->commit_count[0]);
592//ust//
593//ust// local_set(&ltt_buf->events_lost, 0);
594//ust// local_set(&ltt_buf->corrupted_subbuffers, 0);
595//ust//
596//ust// return 0;
597//ust// }
598
599static int ust_buffers_init_buffer(struct ltt_trace_struct *trace,
600 struct ust_channel *ltt_chan, struct ust_buffer *buf,
601 unsigned int n_subbufs)
602{
603 unsigned int j;
604 int fds[2];
605 int result;
606
607 buf->commit_count =
608 zmalloc(sizeof(buf->commit_count) * n_subbufs);
609 if (!buf->commit_count)
610 return -ENOMEM;
611 kref_get(&trace->kref);
612 kref_get(&trace->ltt_transport_kref);
613 kref_get(&ltt_chan->kref);
614 local_set(&buf->offset, ltt_subbuffer_header_size());
615 atomic_long_set(&buf->consumed, 0);
616 atomic_long_set(&buf->active_readers, 0);
617 for (j = 0; j < n_subbufs; j++)
618 local_set(&buf->commit_count[j], 0);
619//ust// init_waitqueue_head(&buf->write_wait);
620//ust// atomic_set(&buf->wakeup_readers, 0);
621//ust// spin_lock_init(&buf->full_lock);
622
623 ltt_buffer_begin_callback(buf, trace->start_tsc, 0);
624
625 local_add(ltt_subbuffer_header_size(), &buf->commit_count[0]);
626
627 local_set(&buf->events_lost, 0);
628 local_set(&buf->corrupted_subbuffers, 0);
629
630 result = pipe(fds);
631 if(result == -1) {
632 PERROR("pipe");
633 return -1;
634 }
635 buf->data_ready_fd_read = fds[0];
636 buf->data_ready_fd_write = fds[1];
637
638 /* FIXME: do we actually need this? */
639 result = fcntl(fds[0], F_SETFL, O_NONBLOCK);
640 if(result == -1) {
641 PERROR("fcntl");
642 }
643
644//ust// buf->commit_seq = malloc(sizeof(buf->commit_seq) * n_subbufs);
645//ust// if(!ltt_buf->commit_seq) {
646//ust// return -1;
647//ust// }
648
649 /* FIXME: decrementally destroy on error */
650
651 return 0;
652}
653
654/* FIXME: use this function */
204141ee 655static void ust_buffers_destroy_buffer(struct ust_channel *ltt_chan, int cpu)
b5b073e2
PMF
656{
657 struct ltt_trace_struct *trace = ltt_chan->trace;
204141ee 658 struct ust_buffer *ltt_buf = ltt_chan->buf[cpu];
b5b073e2
PMF
659
660 kref_put(&ltt_chan->trace->ltt_transport_kref,
661 ltt_release_transport);
204141ee 662 ltt_relay_print_buffer_errors(ltt_chan, cpu);
b5b073e2
PMF
663//ust// free(ltt_buf->commit_seq);
664 kfree(ltt_buf->commit_count);
665 ltt_buf->commit_count = NULL;
666 kref_put(&ltt_chan->kref, ltt_relay_release_channel);
667 kref_put(&trace->kref, ltt_release_trace);
668//ust// wake_up_interruptible(&trace->kref_wq);
669}
670
204141ee 671static int ust_buffers_alloc_channel_buf_structs(struct ust_channel *chan)
b5b073e2
PMF
672{
673 void *ptr;
674 int result;
204141ee
PMF
675 size_t size;
676 int i;
b5b073e2 677
204141ee 678 size = PAGE_ALIGN(1);
b5b073e2 679
204141ee 680 for(i=0; i<chan->n_cpus; i++) {
b5b073e2 681
204141ee
PMF
682 result = chan->buf_struct_shmids[i] = shmget(getpid(), size, IPC_CREAT | IPC_EXCL | 0700);
683 if(result == -1) {
684 PERROR("shmget");
685 goto destroy_previous;
686 }
b5b073e2 687
204141ee
PMF
688 /* FIXME: should have matching call to shmdt */
689 ptr = shmat(chan->buf_struct_shmids[i], NULL, 0);
690 if(ptr == (void *) -1) {
691 perror("shmat");
692 goto destroy_shm;
693 }
694
695 /* Already mark the shared memory for destruction. This will occur only
696 * when all users have detached.
697 */
698 result = shmctl(chan->buf_struct_shmids[i], IPC_RMID, NULL);
699 if(result == -1) {
700 perror("shmctl");
701 goto destroy_previous;
702 }
703
704 chan->buf[i] = ptr;
b5b073e2
PMF
705 }
706
204141ee 707 return 0;
b5b073e2 708
204141ee
PMF
709 /* Jumping inside this loop occurs from within the other loop above with i as
710 * counter, so it unallocates the structures for the cpu = current_i down to
711 * zero. */
712 for(; i>=0; i--) {
713 destroy_shm:
714 result = shmctl(chan->buf_struct_shmids[i], IPC_RMID, NULL);
715 if(result == -1) {
716 perror("shmctl");
717 }
b5b073e2 718
204141ee
PMF
719 destroy_previous:
720 continue;
b5b073e2
PMF
721 }
722
204141ee 723 return -1;
b5b073e2
PMF
724}
725
726/*
727 * Create channel.
728 */
729static int ust_buffers_create_channel(const char *trace_name, struct ltt_trace_struct *trace,
730 const char *channel_name, struct ust_channel *ltt_chan,
731 unsigned int subbuf_size, unsigned int n_subbufs, int overwrite)
732{
b5b073e2
PMF
733 int result;
734
735 kref_init(&ltt_chan->kref);
736
737 ltt_chan->trace = trace;
738 ltt_chan->buffer_begin = ltt_buffer_begin_callback;
739 ltt_chan->buffer_end = ltt_buffer_end_callback;
740 ltt_chan->overwrite = overwrite;
741 ltt_chan->n_subbufs_order = get_count_order(n_subbufs);
742 ltt_chan->commit_count_mask = (~0UL >> ltt_chan->n_subbufs_order);
204141ee 743 ltt_chan->n_cpus = get_n_cpus();
b5b073e2 744//ust// ltt_chan->buf = percpu_alloc_mask(sizeof(struct ltt_channel_buf_struct), GFP_KERNEL, cpu_possible_map);
204141ee
PMF
745 ltt_chan->buf = (void *) malloc(ltt_chan->n_cpus * sizeof(void *));
746 if(ltt_chan->buf == NULL) {
747 goto error;
748 }
749 ltt_chan->buf_struct_shmids = (int *) malloc(ltt_chan->n_cpus * sizeof(int));
750 if(ltt_chan->buf_struct_shmids == NULL)
751 goto free_buf;
b5b073e2 752
204141ee
PMF
753 result = ust_buffers_alloc_channel_buf_structs(ltt_chan);
754 if(result != 0) {
755 goto free_buf_struct_shmids;
756 }
b5b073e2 757
b5b073e2 758 result = ust_buffers_channel_open(ltt_chan, subbuf_size, n_subbufs);
204141ee 759 if (result != 0) {
c1f20530 760 ERR("Cannot open channel for trace %s", trace_name);
204141ee 761 goto unalloc_buf_structs;
b5b073e2
PMF
762 }
763
204141ee
PMF
764 return 0;
765
766unalloc_buf_structs:
767 /* FIXME: put a call here to unalloc the buf structs! */
768
769free_buf_struct_shmids:
770 free(ltt_chan->buf_struct_shmids);
b5b073e2 771
204141ee
PMF
772free_buf:
773 free(ltt_chan->buf);
774
775error:
776 return -1;
b5b073e2
PMF
777}
778
779/*
780 * LTTng channel flush function.
781 *
782 * Must be called when no tracing is active in the channel, because of
783 * accesses across CPUs.
784 */
785static notrace void ltt_relay_buffer_flush(struct ust_buffer *buf)
786{
787 int result;
788
789//ust// buf->finalized = 1;
790 ltt_force_switch(buf, FORCE_FLUSH);
791
792 result = write(buf->data_ready_fd_write, "1", 1);
793 if(result == -1) {
794 PERROR("write (in ltt_relay_buffer_flush)");
795 ERR("this should never happen!");
796 }
797}
798
799static void ltt_relay_async_wakeup_chan(struct ust_channel *ltt_channel)
800{
801//ust// unsigned int i;
802//ust// struct rchan *rchan = ltt_channel->trans_channel_data;
803//ust//
804//ust// for_each_possible_cpu(i) {
805//ust// struct ltt_channel_buf_struct *ltt_buf =
806//ust// percpu_ptr(ltt_channel->buf, i);
807//ust//
808//ust// if (atomic_read(&ltt_buf->wakeup_readers) == 1) {
809//ust// atomic_set(&ltt_buf->wakeup_readers, 0);
810//ust// wake_up_interruptible(&rchan->buf[i]->read_wait);
811//ust// }
812//ust// }
813}
814
204141ee 815static void ltt_relay_finish_buffer(struct ust_channel *channel, unsigned int cpu)
b5b073e2
PMF
816{
817// int result;
818
204141ee
PMF
819 if (channel->buf[cpu]) {
820 struct ust_buffer *buf = channel->buf[cpu];
b5b073e2
PMF
821 ltt_relay_buffer_flush(buf);
822//ust// ltt_relay_wake_writers(ltt_buf);
823 /* closing the pipe tells the consumer the buffer is finished */
824
825 //result = write(ltt_buf->data_ready_fd_write, "D", 1);
826 //if(result == -1) {
827 // PERROR("write (in ltt_relay_finish_buffer)");
828 // ERR("this should never happen!");
829 //}
830 close(buf->data_ready_fd_write);
831 }
832}
833
834
835static void ltt_relay_finish_channel(struct ust_channel *channel)
836{
204141ee 837 unsigned int i;
b5b073e2 838
204141ee
PMF
839 for(i=0; i<channel->n_cpus; i++) {
840 ltt_relay_finish_buffer(channel, i);
841 }
b5b073e2
PMF
842}
843
844static void ltt_relay_remove_channel(struct ust_channel *channel)
845{
846 ust_buffers_channel_close(channel);
847 kref_put(&channel->kref, ltt_relay_release_channel);
848}
849
850struct ltt_reserve_switch_offsets {
851 long begin, end, old;
852 long begin_switch, end_switch_current, end_switch_old;
853 long commit_count, reserve_commit_diff;
854 size_t before_hdr_pad, size;
855};
856
857/*
858 * Returns :
859 * 0 if ok
860 * !0 if execution must be aborted.
861 */
862static inline int ltt_relay_try_reserve(
863 struct ust_channel *channel, struct ust_buffer *buf,
864 struct ltt_reserve_switch_offsets *offsets, size_t data_size,
865 u64 *tsc, unsigned int *rflags, int largest_align)
866{
867 offsets->begin = local_read(&buf->offset);
868 offsets->old = offsets->begin;
869 offsets->begin_switch = 0;
870 offsets->end_switch_current = 0;
871 offsets->end_switch_old = 0;
872
873 *tsc = trace_clock_read64();
874 if (last_tsc_overflow(buf, *tsc))
875 *rflags = LTT_RFLAG_ID_SIZE_TSC;
876
877 if (SUBBUF_OFFSET(offsets->begin, buf->chan) == 0) {
878 offsets->begin_switch = 1; /* For offsets->begin */
879 } else {
880 offsets->size = ust_get_header_size(channel,
881 offsets->begin, data_size,
882 &offsets->before_hdr_pad, *rflags);
883 offsets->size += ltt_align(offsets->begin + offsets->size,
884 largest_align)
885 + data_size;
886 if ((SUBBUF_OFFSET(offsets->begin, buf->chan) + offsets->size)
887 > buf->chan->subbuf_size) {
888 offsets->end_switch_old = 1; /* For offsets->old */
889 offsets->begin_switch = 1; /* For offsets->begin */
890 }
891 }
892 if (offsets->begin_switch) {
893 long subbuf_index;
894
895 if (offsets->end_switch_old)
896 offsets->begin = SUBBUF_ALIGN(offsets->begin,
897 buf->chan);
898 offsets->begin = offsets->begin + ltt_subbuffer_header_size();
899 /* Test new buffer integrity */
900 subbuf_index = SUBBUF_INDEX(offsets->begin, buf->chan);
901 offsets->reserve_commit_diff =
902 (BUFFER_TRUNC(offsets->begin, buf->chan)
903 >> channel->n_subbufs_order)
904 - (local_read(&buf->commit_count[subbuf_index])
905 & channel->commit_count_mask);
906 if (offsets->reserve_commit_diff == 0) {
907 long consumed;
908
909 consumed = atomic_long_read(&buf->consumed);
910
911 /* Next buffer not corrupted. */
912 if (!channel->overwrite &&
913 (SUBBUF_TRUNC(offsets->begin, buf->chan)
914 - SUBBUF_TRUNC(consumed, buf->chan))
915 >= channel->alloc_size) {
916
917 long consumed_idx = SUBBUF_INDEX(consumed, buf->chan);
918 long commit_count = local_read(&buf->commit_count[consumed_idx]);
919 if(((commit_count - buf->chan->subbuf_size) & channel->commit_count_mask) - (BUFFER_TRUNC(consumed, buf->chan) >> channel->n_subbufs_order) != 0) {
920 WARN("Event dropped. Caused by non-committed event.");
921 }
922 else {
923 WARN("Event dropped. Caused by non-consumed buffer.");
924 }
925 /*
926 * We do not overwrite non consumed buffers
927 * and we are full : event is lost.
928 */
929 local_inc(&buf->events_lost);
930 return -1;
931 } else {
932 /*
933 * next buffer not corrupted, we are either in
934 * overwrite mode or the buffer is not full.
935 * It's safe to write in this new subbuffer.
936 */
937 }
938 } else {
939 /*
940 * Next subbuffer corrupted. Force pushing reader even
941 * in normal mode. It's safe to write in this new
942 * subbuffer.
943 */
944 }
945 offsets->size = ust_get_header_size(channel,
946 offsets->begin, data_size,
947 &offsets->before_hdr_pad, *rflags);
948 offsets->size += ltt_align(offsets->begin + offsets->size,
949 largest_align)
950 + data_size;
951 if ((SUBBUF_OFFSET(offsets->begin, buf->chan) + offsets->size)
952 > buf->chan->subbuf_size) {
953 /*
954 * Event too big for subbuffers, report error, don't
955 * complete the sub-buffer switch.
956 */
957 local_inc(&buf->events_lost);
958 return -1;
959 } else {
960 /*
961 * We just made a successful buffer switch and the event
962 * fits in the new subbuffer. Let's write.
963 */
964 }
965 } else {
966 /*
967 * Event fits in the current buffer and we are not on a switch
968 * boundary. It's safe to write.
969 */
970 }
971 offsets->end = offsets->begin + offsets->size;
972
973 if ((SUBBUF_OFFSET(offsets->end, buf->chan)) == 0) {
974 /*
975 * The offset_end will fall at the very beginning of the next
976 * subbuffer.
977 */
978 offsets->end_switch_current = 1; /* For offsets->begin */
979 }
980 return 0;
981}
982
983/*
984 * Returns :
985 * 0 if ok
986 * !0 if execution must be aborted.
987 */
988static inline int ltt_relay_try_switch(
989 enum force_switch_mode mode,
990 struct ust_channel *channel,
991 struct ust_buffer *buf,
992 struct ltt_reserve_switch_offsets *offsets,
993 u64 *tsc)
994{
995 long subbuf_index;
996
997 offsets->begin = local_read(&buf->offset);
998 offsets->old = offsets->begin;
999 offsets->begin_switch = 0;
1000 offsets->end_switch_old = 0;
1001
1002 *tsc = trace_clock_read64();
1003
1004 if (SUBBUF_OFFSET(offsets->begin, buf->chan) != 0) {
1005 offsets->begin = SUBBUF_ALIGN(offsets->begin, buf->chan);
1006 offsets->end_switch_old = 1;
1007 } else {
1008 /* we do not have to switch : buffer is empty */
1009 return -1;
1010 }
1011 if (mode == FORCE_ACTIVE)
1012 offsets->begin += ltt_subbuffer_header_size();
1013 /*
1014 * Always begin_switch in FORCE_ACTIVE mode.
1015 * Test new buffer integrity
1016 */
1017 subbuf_index = SUBBUF_INDEX(offsets->begin, buf->chan);
1018 offsets->reserve_commit_diff =
1019 (BUFFER_TRUNC(offsets->begin, buf->chan)
1020 >> channel->n_subbufs_order)
1021 - (local_read(&buf->commit_count[subbuf_index])
1022 & channel->commit_count_mask);
1023 if (offsets->reserve_commit_diff == 0) {
1024 /* Next buffer not corrupted. */
1025 if (mode == FORCE_ACTIVE
1026 && !channel->overwrite
1027 && offsets->begin - atomic_long_read(&buf->consumed)
1028 >= channel->alloc_size) {
1029 /*
1030 * We do not overwrite non consumed buffers and we are
1031 * full : ignore switch while tracing is active.
1032 */
1033 return -1;
1034 }
1035 } else {
1036 /*
1037 * Next subbuffer corrupted. Force pushing reader even in normal
1038 * mode
1039 */
1040 }
1041 offsets->end = offsets->begin;
1042 return 0;
1043}
1044
1045static inline void ltt_reserve_push_reader(
1046 struct ust_channel *channel,
1047 struct ust_buffer *buf,
1048 struct ltt_reserve_switch_offsets *offsets)
1049{
1050 long consumed_old, consumed_new;
1051
1052 do {
1053 consumed_old = atomic_long_read(&buf->consumed);
1054 /*
1055 * If buffer is in overwrite mode, push the reader consumed
1056 * count if the write position has reached it and we are not
1057 * at the first iteration (don't push the reader farther than
1058 * the writer). This operation can be done concurrently by many
1059 * writers in the same buffer, the writer being at the farthest
1060 * write position sub-buffer index in the buffer being the one
1061 * which will win this loop.
1062 * If the buffer is not in overwrite mode, pushing the reader
1063 * only happens if a sub-buffer is corrupted.
1064 */
1065 if ((SUBBUF_TRUNC(offsets->end-1, buf->chan)
1066 - SUBBUF_TRUNC(consumed_old, buf->chan))
1067 >= channel->alloc_size)
1068 consumed_new = SUBBUF_ALIGN(consumed_old, buf->chan);
1069 else {
1070 consumed_new = consumed_old;
1071 break;
1072 }
1073 } while (atomic_long_cmpxchg(&buf->consumed, consumed_old,
1074 consumed_new) != consumed_old);
1075
1076 if (consumed_old != consumed_new) {
1077 /*
1078 * Reader pushed : we are the winner of the push, we can
1079 * therefore reequilibrate reserve and commit. Atomic increment
1080 * of the commit count permits other writers to play around
1081 * with this variable before us. We keep track of
1082 * corrupted_subbuffers even in overwrite mode :
1083 * we never want to write over a non completely committed
1084 * sub-buffer : possible causes : the buffer size is too low
1085 * compared to the unordered data input, or there is a writer
1086 * that died between the reserve and the commit.
1087 */
1088 if (offsets->reserve_commit_diff) {
1089 /*
1090 * We have to alter the sub-buffer commit count.
1091 * We do not deliver the previous subbuffer, given it
1092 * was either corrupted or not consumed (overwrite
1093 * mode).
1094 */
1095 local_add(offsets->reserve_commit_diff,
1096 &buf->commit_count[
1097 SUBBUF_INDEX(offsets->begin,
1098 buf->chan)]);
1099 if (!channel->overwrite
1100 || offsets->reserve_commit_diff
1101 != channel->subbuf_size) {
1102 /*
1103 * The reserve commit diff was not subbuf_size :
1104 * it means the subbuffer was partly written to
1105 * and is therefore corrupted. If it is multiple
1106 * of subbuffer size and we are in flight
1107 * recorder mode, we are skipping over a whole
1108 * subbuffer.
1109 */
1110 local_inc(&buf->corrupted_subbuffers);
1111 }
1112 }
1113 }
1114}
1115
1116
1117/*
1118 * ltt_reserve_switch_old_subbuf: switch old subbuffer
1119 *
1120 * Concurrency safe because we are the last and only thread to alter this
1121 * sub-buffer. As long as it is not delivered and read, no other thread can
1122 * alter the offset, alter the reserve_count or call the
1123 * client_buffer_end_callback on this sub-buffer.
1124 *
1125 * The only remaining threads could be the ones with pending commits. They will
1126 * have to do the deliver themselves. Not concurrency safe in overwrite mode.
1127 * We detect corrupted subbuffers with commit and reserve counts. We keep a
1128 * corrupted sub-buffers count and push the readers across these sub-buffers.
1129 *
1130 * Not concurrency safe if a writer is stalled in a subbuffer and another writer
1131 * switches in, finding out it's corrupted. The result will be than the old
1132 * (uncommited) subbuffer will be declared corrupted, and that the new subbuffer
1133 * will be declared corrupted too because of the commit count adjustment.
1134 *
1135 * Note : offset_old should never be 0 here.
1136 */
1137static inline void ltt_reserve_switch_old_subbuf(
1138 struct ust_channel *channel,
1139 struct ust_buffer *buf,
1140 struct ltt_reserve_switch_offsets *offsets, u64 *tsc)
1141{
1142 long oldidx = SUBBUF_INDEX(offsets->old - 1, channel);
1143
1144 channel->buffer_end(buf, *tsc, offsets->old, oldidx);
1145 /* Must write buffer end before incrementing commit count */
1146 smp_wmb();
1147 offsets->commit_count =
1148 local_add_return(channel->subbuf_size
1149 - (SUBBUF_OFFSET(offsets->old - 1, channel)
1150 + 1),
1151 &buf->commit_count[oldidx]);
1152 if ((BUFFER_TRUNC(offsets->old - 1, channel)
1153 >> channel->n_subbufs_order)
1154 - ((offsets->commit_count - channel->subbuf_size)
1155 & channel->commit_count_mask) == 0)
1156 ltt_deliver(buf, oldidx, offsets->commit_count);
1157}
1158
1159/*
1160 * ltt_reserve_switch_new_subbuf: Populate new subbuffer.
1161 *
1162 * This code can be executed unordered : writers may already have written to the
1163 * sub-buffer before this code gets executed, caution. The commit makes sure
1164 * that this code is executed before the deliver of this sub-buffer.
1165 */
1166static /*inline*/ void ltt_reserve_switch_new_subbuf(
1167 struct ust_channel *channel,
1168 struct ust_buffer *buf,
1169 struct ltt_reserve_switch_offsets *offsets, u64 *tsc)
1170{
1171 long beginidx = SUBBUF_INDEX(offsets->begin, channel);
1172
1173 channel->buffer_begin(buf, *tsc, beginidx);
1174 /* Must write buffer end before incrementing commit count */
1175 smp_wmb();
1176 offsets->commit_count = local_add_return(ltt_subbuffer_header_size(),
1177 &buf->commit_count[beginidx]);
1178 /* Check if the written buffer has to be delivered */
1179 if ((BUFFER_TRUNC(offsets->begin, channel)
1180 >> channel->n_subbufs_order)
1181 - ((offsets->commit_count - channel->subbuf_size)
1182 & channel->commit_count_mask) == 0)
1183 ltt_deliver(buf, beginidx, offsets->commit_count);
1184}
1185
1186
1187/*
1188 * ltt_reserve_end_switch_current: finish switching current subbuffer
1189 *
1190 * Concurrency safe because we are the last and only thread to alter this
1191 * sub-buffer. As long as it is not delivered and read, no other thread can
1192 * alter the offset, alter the reserve_count or call the
1193 * client_buffer_end_callback on this sub-buffer.
1194 *
1195 * The only remaining threads could be the ones with pending commits. They will
1196 * have to do the deliver themselves. Not concurrency safe in overwrite mode.
1197 * We detect corrupted subbuffers with commit and reserve counts. We keep a
1198 * corrupted sub-buffers count and push the readers across these sub-buffers.
1199 *
1200 * Not concurrency safe if a writer is stalled in a subbuffer and another writer
1201 * switches in, finding out it's corrupted. The result will be than the old
1202 * (uncommited) subbuffer will be declared corrupted, and that the new subbuffer
1203 * will be declared corrupted too because of the commit count adjustment.
1204 */
1205static inline void ltt_reserve_end_switch_current(
1206 struct ust_channel *channel,
1207 struct ust_buffer *buf,
1208 struct ltt_reserve_switch_offsets *offsets, u64 *tsc)
1209{
1210 long endidx = SUBBUF_INDEX(offsets->end - 1, channel);
1211
1212 channel->buffer_end(buf, *tsc, offsets->end, endidx);
1213 /* Must write buffer begin before incrementing commit count */
1214 smp_wmb();
1215 offsets->commit_count =
1216 local_add_return(channel->subbuf_size
1217 - (SUBBUF_OFFSET(offsets->end - 1, channel)
1218 + 1),
1219 &buf->commit_count[endidx]);
1220 if ((BUFFER_TRUNC(offsets->end - 1, channel)
1221 >> channel->n_subbufs_order)
1222 - ((offsets->commit_count - channel->subbuf_size)
1223 & channel->commit_count_mask) == 0)
1224 ltt_deliver(buf, endidx, offsets->commit_count);
1225}
1226
1227/**
1228 * ltt_relay_reserve_slot - Atomic slot reservation in a LTTng buffer.
1229 * @trace: the trace structure to log to.
1230 * @ltt_channel: channel structure
1231 * @transport_data: data structure specific to ltt relay
1232 * @data_size: size of the variable length data to log.
1233 * @slot_size: pointer to total size of the slot (out)
1234 * @buf_offset : pointer to reserved buffer offset (out)
1235 * @tsc: pointer to the tsc at the slot reservation (out)
1236 * @cpu: cpuid
1237 *
1238 * Return : -ENOSPC if not enough space, else returns 0.
1239 * It will take care of sub-buffer switching.
1240 */
1241static notrace int ltt_relay_reserve_slot(struct ltt_trace_struct *trace,
1242 struct ust_channel *channel, void **transport_data,
1243 size_t data_size, size_t *slot_size, long *buf_offset, u64 *tsc,
204141ee 1244 unsigned int *rflags, int largest_align, int cpu)
b5b073e2 1245{
204141ee 1246 struct ust_buffer *buf = *transport_data = channel->buf[cpu];
b5b073e2
PMF
1247 struct ltt_reserve_switch_offsets offsets;
1248
1249 offsets.reserve_commit_diff = 0;
1250 offsets.size = 0;
1251
1252 /*
1253 * Perform retryable operations.
1254 */
1255 if (ltt_nesting > 4) {
1256 local_inc(&buf->events_lost);
1257 return -EPERM;
1258 }
1259 do {
1260 if (ltt_relay_try_reserve(channel, buf, &offsets, data_size, tsc, rflags,
1261 largest_align))
1262 return -ENOSPC;
1263 } while (local_cmpxchg(&buf->offset, offsets.old,
1264 offsets.end) != offsets.old);
1265
1266 /*
1267 * Atomically update last_tsc. This update races against concurrent
1268 * atomic updates, but the race will always cause supplementary full TSC
1269 * events, never the opposite (missing a full TSC event when it would be
1270 * needed).
1271 */
1272 save_last_tsc(buf, *tsc);
1273
1274 /*
1275 * Push the reader if necessary
1276 */
1277 ltt_reserve_push_reader(channel, buf, &offsets);
1278
1279 /*
1280 * Switch old subbuffer if needed.
1281 */
1282 if (offsets.end_switch_old)
1283 ltt_reserve_switch_old_subbuf(channel, buf, &offsets, tsc);
1284
1285 /*
1286 * Populate new subbuffer.
1287 */
1288 if (offsets.begin_switch)
1289 ltt_reserve_switch_new_subbuf(channel, buf, &offsets, tsc);
1290
1291 if (offsets.end_switch_current)
1292 ltt_reserve_end_switch_current(channel, buf, &offsets, tsc);
1293
1294 *slot_size = offsets.size;
1295 *buf_offset = offsets.begin + offsets.before_hdr_pad;
1296 return 0;
1297}
1298
1299/*
1300 * Force a sub-buffer switch for a per-cpu buffer. This operation is
1301 * completely reentrant : can be called while tracing is active with
1302 * absolutely no lock held.
1303 *
1304 * Note, however, that as a local_cmpxchg is used for some atomic
1305 * operations, this function must be called from the CPU which owns the buffer
1306 * for a ACTIVE flush.
1307 */
1308static notrace void ltt_force_switch(struct ust_buffer *buf,
1309 enum force_switch_mode mode)
1310{
1311 struct ust_channel *channel = buf->chan;
1312 struct ltt_reserve_switch_offsets offsets;
1313 u64 tsc;
1314
1315 offsets.reserve_commit_diff = 0;
1316 offsets.size = 0;
1317
1318 /*
1319 * Perform retryable operations.
1320 */
1321 do {
1322 if (ltt_relay_try_switch(mode, channel, buf, &offsets, &tsc))
1323 return;
1324 } while (local_cmpxchg(&buf->offset, offsets.old,
1325 offsets.end) != offsets.old);
1326
1327 /*
1328 * Atomically update last_tsc. This update races against concurrent
1329 * atomic updates, but the race will always cause supplementary full TSC
1330 * events, never the opposite (missing a full TSC event when it would be
1331 * needed).
1332 */
1333 save_last_tsc(buf, tsc);
1334
1335 /*
1336 * Push the reader if necessary
1337 */
1338 if (mode == FORCE_ACTIVE)
1339 ltt_reserve_push_reader(channel, buf, &offsets);
1340
1341 /*
1342 * Switch old subbuffer if needed.
1343 */
1344 if (offsets.end_switch_old)
1345 ltt_reserve_switch_old_subbuf(channel, buf, &offsets, &tsc);
1346
1347 /*
1348 * Populate new subbuffer.
1349 */
1350 if (mode == FORCE_ACTIVE)
1351 ltt_reserve_switch_new_subbuf(channel, buf, &offsets, &tsc);
1352}
1353
b5b073e2
PMF
1354static struct ltt_transport ust_relay_transport = {
1355 .name = "ustrelay",
1356 .ops = {
1357 .create_channel = ust_buffers_create_channel,
1358 .finish_channel = ltt_relay_finish_channel,
1359 .remove_channel = ltt_relay_remove_channel,
1360 .wakeup_channel = ltt_relay_async_wakeup_chan,
1361// .commit_slot = ltt_relay_commit_slot,
1362 .reserve_slot = ltt_relay_reserve_slot,
b5b073e2
PMF
1363 },
1364};
1365
1366/*
1367 * for flight recording. must be called after relay_commit.
1368 * This function decrements de subbuffer's lost_size each time the commit count
1369 * reaches back the reserve offset (module subbuffer size). It is useful for
1370 * crash dump.
1371 */
1372static /* inline */ void ltt_write_commit_counter(struct ust_buffer *buf,
1373 struct ust_buffer *ltt_buf,
1374 long idx, long buf_offset, long commit_count, size_t data_size)
1375{
1376 long offset;
1377 long commit_seq_old;
1378
1379 offset = buf_offset + data_size;
1380
1381 /*
1382 * SUBBUF_OFFSET includes commit_count_mask. We can simply
1383 * compare the offsets within the subbuffer without caring about
1384 * buffer full/empty mismatch because offset is never zero here
1385 * (subbuffer header and event headers have non-zero length).
1386 */
1387 if (unlikely(SUBBUF_OFFSET(offset - commit_count, buf->chan)))
1388 return;
1389
1390 commit_seq_old = local_read(&ltt_buf->commit_seq[idx]);
1391 while (commit_seq_old < commit_count)
1392 commit_seq_old = local_cmpxchg(&ltt_buf->commit_seq[idx],
1393 commit_seq_old, commit_count);
1394}
1395
1396/*
1397 * Atomic unordered slot commit. Increments the commit count in the
1398 * specified sub-buffer, and delivers it if necessary.
1399 *
1400 * Parameters:
1401 *
1402 * @ltt_channel : channel structure
1403 * @transport_data: transport-specific data
1404 * @buf_offset : offset following the event header.
1405 * @data_size : size of the event data.
1406 * @slot_size : size of the reserved slot.
1407 */
1408/* FIXME: make this function static inline in the .h! */
1409/*static*/ /* inline */ notrace void ltt_commit_slot(
1410 struct ust_channel *channel,
1411 void **transport_data, long buf_offset,
1412 size_t data_size, size_t slot_size)
1413{
1414 struct ust_buffer *buf = *transport_data;
1415 long offset_end = buf_offset;
1416 long endidx = SUBBUF_INDEX(offset_end - 1, channel);
1417 long commit_count;
1418
1419 /* Must write slot data before incrementing commit count */
1420 smp_wmb();
1421 commit_count = local_add_return(slot_size,
1422 &buf->commit_count[endidx]);
1423 /* Check if all commits have been done */
1424 if ((BUFFER_TRUNC(offset_end - 1, channel)
1425 >> channel->n_subbufs_order)
1426 - ((commit_count - channel->subbuf_size)
1427 & channel->commit_count_mask) == 0)
1428 ltt_deliver(buf, endidx, commit_count);
1429 /*
1430 * Update lost_size for each commit. It's needed only for extracting
1431 * ltt buffers from vmcore, after crash.
1432 */
1433 ltt_write_commit_counter(buf, buf, endidx,
1434 buf_offset, commit_count, data_size);
1435}
1436
1437
1438static char initialized = 0;
1439
1440void __attribute__((constructor)) init_ustrelay_transport(void)
1441{
1442 if(!initialized) {
1443 ltt_transport_register(&ust_relay_transport);
1444 initialized = 1;
1445 }
1446}
1447
1448static void __attribute__((destructor)) ltt_relay_exit(void)
1449{
1450 ltt_transport_unregister(&ust_relay_transport);
1451}
This page took 0.079171 seconds and 4 git commands to generate.