tracectl cleanup v3
[ust.git] / libust / buffers.c
CommitLineData
b5b073e2
PMF
1/*
2 * buffers.c
3 * LTTng userspace tracer buffering system
4 *
5 * Copyright (C) 2009 - Pierre-Marc Fournier (pierre-marc dot fournier at polymtl dot ca)
6 * Copyright (C) 2008 - Mathieu Desnoyers (mathieu.desnoyers@polymtl.ca)
7 *
8 * This library is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License as published by the Free Software Foundation; either
11 * version 2.1 of the License, or (at your option) any later version.
12 *
13 * This library is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 * Lesser General Public License for more details.
17 *
18 * You should have received a copy of the GNU Lesser General Public
19 * License along with this library; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
21 */
22
204141ee 23#include <unistd.h>
b5b073e2
PMF
24#include <sys/mman.h>
25#include <sys/ipc.h>
26#include <sys/shm.h>
27#include <fcntl.h>
909bc43f 28#include <stdlib.h>
518d7abb
PMF
29
30#include <ust/clock.h>
31
b5b073e2
PMF
32#include "buffers.h"
33#include "channels.h"
34#include "tracer.h"
35#include "tracercore.h"
36#include "usterr.h"
37
b73a4c47
PMF
38struct ltt_reserve_switch_offsets {
39 long begin, end, old;
40 long begin_switch, end_switch_current, end_switch_old;
41 size_t before_hdr_pad, size;
42};
43
44
b5b073e2
PMF
45static DEFINE_MUTEX(ust_buffers_channels_mutex);
46static LIST_HEAD(ust_buffers_channels);
47
204141ee
PMF
48static int get_n_cpus(void)
49{
50 int result;
51 static int n_cpus = 0;
52
c7dc133c
PMF
53 if(!n_cpus) {
54 /* On Linux, when some processors are offline
55 * _SC_NPROCESSORS_CONF counts the offline
56 * processors, whereas _SC_NPROCESSORS_ONLN
57 * does not. If we used _SC_NPROCESSORS_ONLN,
58 * getcpu() could return a value greater than
59 * this sysconf, in which case the arrays
60 * indexed by processor would overflow.
61 */
62 result = sysconf(_SC_NPROCESSORS_CONF);
63 if(result == -1) {
64 return -1;
65 }
66
67 n_cpus = result;
204141ee
PMF
68 }
69
c7dc133c 70 return n_cpus;
204141ee
PMF
71}
72
bb3132c8
MD
73/**
74 * _ust_buffers_strncpy_fixup - Fix an incomplete string in a ltt_relay buffer.
75 * @buf : buffer
76 * @offset : offset within the buffer
77 * @len : length to write
78 * @copied: string actually copied
79 * @terminated: does string end with \0
b73a4c47 80 *
bb3132c8 81 * Fills string with "X" if incomplete.
b73a4c47 82 */
bb3132c8
MD
83void _ust_buffers_strncpy_fixup(struct ust_buffer *buf, size_t offset,
84 size_t len, size_t copied, int terminated)
b73a4c47 85{
bb3132c8
MD
86 size_t buf_offset, cpy;
87
88 if (copied == len) {
89 /*
90 * Deal with non-terminated string.
91 */
92 assert(!terminated);
93 offset += copied - 1;
94 buf_offset = BUFFER_OFFSET(offset, buf->chan);
95 /*
96 * Underlying layer should never ask for writes across
97 * subbuffers.
98 */
99 assert(buf_offset
100 < buf->chan->subbuf_size*buf->chan->subbuf_cnt);
101 ust_buffers_do_memset(buf->buf_data + buf_offset, '\0', 1);
102 return;
103 }
104
105 /*
106 * Deal with incomplete string.
107 * Overwrite string's \0 with X too.
108 */
109 cpy = copied - 1;
110 assert(terminated);
111 len -= cpy;
112 offset += cpy;
113 buf_offset = BUFFER_OFFSET(offset, buf->chan);
114
115 /*
116 * Underlying layer should never ask for writes across subbuffers.
117 */
118 assert(buf_offset
119 < buf->chan->subbuf_size*buf->chan->subbuf_cnt);
b73a4c47 120
bb3132c8
MD
121 ust_buffers_do_memset(buf->buf_data + buf_offset,
122 'X', len);
b73a4c47 123
bb3132c8
MD
124 /*
125 * Overwrite last 'X' with '\0'.
126 */
127 offset += len - 1;
128 buf_offset = BUFFER_OFFSET(offset, buf->chan);
129 /*
130 * Underlying layer should never ask for writes across subbuffers.
131 */
132 assert(buf_offset
133 < buf->chan->subbuf_size*buf->chan->subbuf_cnt);
134 ust_buffers_do_memset(buf->buf_data + buf_offset, '\0', 1);
b73a4c47
PMF
135}
136
137static int ust_buffers_init_buffer(struct ust_trace *trace,
b5b073e2
PMF
138 struct ust_channel *ltt_chan,
139 struct ust_buffer *buf,
140 unsigned int n_subbufs);
141
142static int ust_buffers_alloc_buf(struct ust_buffer *buf, size_t *size)
143{
144 void *ptr;
145 int result;
146
147 *size = PAGE_ALIGN(*size);
148
149 result = buf->shmid = shmget(getpid(), *size, IPC_CREAT | IPC_EXCL | 0700);
150 if(result == -1 && errno == EINVAL) {
151 ERR("shmget() returned EINVAL; maybe /proc/sys/kernel/shmmax should be increased.");
152 return -1;
153 }
154 else if(result == -1) {
155 PERROR("shmget");
156 return -1;
157 }
158
204141ee 159 /* FIXME: should have matching call to shmdt */
b5b073e2
PMF
160 ptr = shmat(buf->shmid, NULL, 0);
161 if(ptr == (void *) -1) {
162 perror("shmat");
163 goto destroy_shmem;
164 }
165
166 /* Already mark the shared memory for destruction. This will occur only
167 * when all users have detached.
168 */
169 result = shmctl(buf->shmid, IPC_RMID, NULL);
170 if(result == -1) {
171 perror("shmctl");
172 return -1;
173 }
174
175 buf->buf_data = ptr;
176 buf->buf_size = *size;
177
178 return 0;
179
180 destroy_shmem:
181 result = shmctl(buf->shmid, IPC_RMID, NULL);
182 if(result == -1) {
183 perror("shmctl");
184 }
185
186 return -1;
187}
188
204141ee 189int ust_buffers_create_buf(struct ust_channel *channel, int cpu)
b5b073e2
PMF
190{
191 int result;
204141ee 192 struct ust_buffer *buf = channel->buf[cpu];
b5b073e2 193
204141ee
PMF
194 buf->cpu = cpu;
195 result = ust_buffers_alloc_buf(buf, &channel->alloc_size);
b5b073e2 196 if(result)
204141ee 197 return -1;
b5b073e2 198
204141ee 199 buf->chan = channel;
b5b073e2 200 kref_get(&channel->kref);
204141ee 201 return 0;
b5b073e2
PMF
202}
203
204static void ust_buffers_destroy_channel(struct kref *kref)
205{
206 struct ust_channel *chan = container_of(kref, struct ust_channel, kref);
207 free(chan);
208}
209
210static void ust_buffers_destroy_buf(struct ust_buffer *buf)
211{
212 struct ust_channel *chan = buf->chan;
213 int result;
214
215 result = munmap(buf->buf_data, buf->buf_size);
216 if(result == -1) {
217 PERROR("munmap");
218 }
219
204141ee 220//ust// chan->buf[buf->cpu] = NULL;
b5b073e2
PMF
221 free(buf);
222 kref_put(&chan->kref, ust_buffers_destroy_channel);
223}
224
225/* called from kref_put */
226static void ust_buffers_remove_buf(struct kref *kref)
227{
228 struct ust_buffer *buf = container_of(kref, struct ust_buffer, kref);
229 ust_buffers_destroy_buf(buf);
230}
231
204141ee 232int ust_buffers_open_buf(struct ust_channel *chan, int cpu)
b5b073e2 233{
204141ee 234 int result;
b5b073e2 235
204141ee
PMF
236 result = ust_buffers_create_buf(chan, cpu);
237 if (result == -1)
238 return -1;
b5b073e2 239
204141ee 240 kref_init(&chan->buf[cpu]->kref);
b5b073e2 241
204141ee
PMF
242 result = ust_buffers_init_buffer(chan->trace, chan, chan->buf[cpu], chan->subbuf_cnt);
243 if(result == -1)
244 return -1;
b5b073e2 245
204141ee 246 return 0;
b5b073e2
PMF
247
248 /* FIXME: decrementally destroy on error? */
249}
250
251/**
252 * ust_buffers_close_buf - close a channel buffer
253 * @buf: buffer
254 */
255static void ust_buffers_close_buf(struct ust_buffer *buf)
256{
257 kref_put(&buf->kref, ust_buffers_remove_buf);
258}
259
260int ust_buffers_channel_open(struct ust_channel *chan, size_t subbuf_size, size_t subbuf_cnt)
261{
204141ee
PMF
262 int i;
263 int result;
264
b5b073e2
PMF
265 if(subbuf_size == 0 || subbuf_cnt == 0)
266 return -1;
267
b73a4c47
PMF
268 /* Check that the subbuffer size is larger than a page. */
269 WARN_ON_ONCE(subbuf_size < PAGE_SIZE);
270
271 /*
272 * Make sure the number of subbuffers and subbuffer size are power of 2.
273 */
274 WARN_ON_ONCE(hweight32(subbuf_size) != 1);
275 WARN_ON(hweight32(subbuf_cnt) != 1);
276
b5b073e2
PMF
277 chan->version = UST_CHANNEL_VERSION;
278 chan->subbuf_cnt = subbuf_cnt;
279 chan->subbuf_size = subbuf_size;
280 chan->subbuf_size_order = get_count_order(subbuf_size);
b73a4c47 281 chan->alloc_size = subbuf_size * subbuf_cnt;
204141ee 282
b5b073e2
PMF
283 kref_init(&chan->kref);
284
f7b16408 285 pthread_mutex_lock(&ust_buffers_channels_mutex);
204141ee
PMF
286 for(i=0; i<chan->n_cpus; i++) {
287 result = ust_buffers_open_buf(chan, i);
288 if (result == -1)
289 goto error;
290 }
b5b073e2 291 list_add(&chan->list, &ust_buffers_channels);
f7b16408 292 pthread_mutex_unlock(&ust_buffers_channels_mutex);
b5b073e2
PMF
293
294 return 0;
295
204141ee
PMF
296 /* Jump directly inside the loop to close the buffers that were already
297 * opened. */
298 for(; i>=0; i--) {
299 ust_buffers_close_buf(chan->buf[i]);
300error:
120b0ec3 301 do {} while(0);
204141ee
PMF
302 }
303
b5b073e2 304 kref_put(&chan->kref, ust_buffers_destroy_channel);
f7b16408 305 pthread_mutex_unlock(&ust_buffers_channels_mutex);
b5b073e2
PMF
306 return -1;
307}
308
309void ust_buffers_channel_close(struct ust_channel *chan)
310{
204141ee
PMF
311 int i;
312 if(!chan)
b5b073e2
PMF
313 return;
314
f7b16408 315 pthread_mutex_lock(&ust_buffers_channels_mutex);
204141ee
PMF
316 for(i=0; i<chan->n_cpus; i++) {
317 /* FIXME: if we make it here, then all buffers were necessarily allocated. Moreover, we don't
318 * initialize to NULL so we cannot use this check. Should we? */
319//ust// if (chan->buf[i])
320 ust_buffers_close_buf(chan->buf[i]);
321 }
b5b073e2
PMF
322
323 list_del(&chan->list);
324 kref_put(&chan->kref, ust_buffers_destroy_channel);
f7b16408 325 pthread_mutex_unlock(&ust_buffers_channels_mutex);
b5b073e2
PMF
326}
327
b5b073e2
PMF
328/*
329 * -------
330 */
331
204141ee 332static void ust_buffers_destroy_buffer(struct ust_channel *ltt_chan, int cpu);
b5b073e2
PMF
333
334static void ltt_force_switch(struct ust_buffer *buf,
335 enum force_switch_mode mode);
336
337/*
338 * Trace callbacks
339 */
b73a4c47 340static void ltt_buffer_begin(struct ust_buffer *buf,
b5b073e2
PMF
341 u64 tsc, unsigned int subbuf_idx)
342{
343 struct ust_channel *channel = buf->chan;
344 struct ltt_subbuffer_header *header =
345 (struct ltt_subbuffer_header *)
b73a4c47 346 ust_buffers_offset_address(buf,
b5b073e2
PMF
347 subbuf_idx * buf->chan->subbuf_size);
348
349 header->cycle_count_begin = tsc;
02af3e60
PMF
350 header->data_size = 0xFFFFFFFF; /* for recognizing crashed buffers */
351 header->sb_size = 0xFFFFFFFF; /* for recognizing crashed buffers */
352 /* FIXME: add memory barrier? */
b5b073e2
PMF
353 ltt_write_trace_header(channel->trace, header);
354}
355
356/*
357 * offset is assumed to never be 0 here : never deliver a completely empty
358 * subbuffer. The lost size is between 0 and subbuf_size-1.
359 */
b73a4c47 360static notrace void ltt_buffer_end(struct ust_buffer *buf,
b5b073e2
PMF
361 u64 tsc, unsigned int offset, unsigned int subbuf_idx)
362{
363 struct ltt_subbuffer_header *header =
364 (struct ltt_subbuffer_header *)
b73a4c47 365 ust_buffers_offset_address(buf,
b5b073e2 366 subbuf_idx * buf->chan->subbuf_size);
8c36d1ee 367 u32 data_size = SUBBUF_OFFSET(offset - 1, buf->chan) + 1;
b5b073e2 368
8c36d1ee
PMF
369 header->data_size = data_size;
370 header->sb_size = PAGE_ALIGN(data_size);
b5b073e2 371 header->cycle_count_end = tsc;
b102c2b0
PMF
372 header->events_lost = uatomic_read(&buf->events_lost);
373 header->subbuf_corrupt = uatomic_read(&buf->corrupted_subbuffers);
719569e4
PMF
374 if(unlikely(header->events_lost > 0)) {
375 DBG("Some events (%d) were lost in %s_%d", header->events_lost, buf->chan->channel_name, buf->cpu);
376 }
b5b073e2
PMF
377}
378
379/*
380 * This function should not be called from NMI interrupt context
381 */
382static notrace void ltt_buf_unfull(struct ust_buffer *buf,
383 unsigned int subbuf_idx,
384 long offset)
385{
b5b073e2
PMF
386}
387
b73a4c47
PMF
388/*
389 * Promote compiler barrier to a smp_mb().
390 * For the specific LTTng case, this IPI call should be removed if the
391 * architecture does not reorder writes. This should eventually be provided by
392 * a separate architecture-specific infrastructure.
393 */
e17571a5
PMF
394//ust// static void remote_mb(void *info)
395//ust// {
396//ust// smp_mb();
397//ust// }
b73a4c47
PMF
398
399int ust_buffers_get_subbuf(struct ust_buffer *buf, long *consumed)
b5b073e2
PMF
400{
401 struct ust_channel *channel = buf->chan;
402 long consumed_old, consumed_idx, commit_count, write_offset;
b73a4c47
PMF
403//ust// int retval;
404
b102c2b0 405 consumed_old = uatomic_read(&buf->consumed);
b5b073e2 406 consumed_idx = SUBBUF_INDEX(consumed_old, buf->chan);
b102c2b0 407 commit_count = uatomic_read(&buf->commit_count[consumed_idx].cc_sb);
b5b073e2
PMF
408 /*
409 * Make sure we read the commit count before reading the buffer
410 * data and the write offset. Correct consumed offset ordering
411 * wrt commit count is insured by the use of cmpxchg to update
412 * the consumed offset.
b73a4c47
PMF
413 * smp_call_function_single can fail if the remote CPU is offline,
414 * this is OK because then there is no wmb to execute there.
415 * If our thread is executing on the same CPU as the on the buffers
416 * belongs to, we don't have to synchronize it at all. If we are
417 * migrated, the scheduler will take care of the memory barriers.
418 * Normally, smp_call_function_single() should ensure program order when
419 * executing the remote function, which implies that it surrounds the
420 * function execution with :
421 * smp_mb()
422 * send IPI
423 * csd_lock_wait
424 * recv IPI
425 * smp_mb()
426 * exec. function
427 * smp_mb()
428 * csd unlock
429 * smp_mb()
430 *
431 * However, smp_call_function_single() does not seem to clearly execute
432 * such barriers. It depends on spinlock semantic to provide the barrier
433 * before executing the IPI and, when busy-looping, csd_lock_wait only
434 * executes smp_mb() when it has to wait for the other CPU.
435 *
436 * I don't trust this code. Therefore, let's add the smp_mb() sequence
437 * required ourself, even if duplicated. It has no performance impact
438 * anyway.
439 *
440 * smp_mb() is needed because smp_rmb() and smp_wmb() only order read vs
441 * read and write vs write. They do not ensure core synchronization. We
442 * really have to ensure total order between the 3 barriers running on
443 * the 2 CPUs.
444 */
445//ust// #ifdef LTT_NO_IPI_BARRIER
446 /*
447 * Local rmb to match the remote wmb to read the commit count before the
448 * buffer data and the write offset.
b5b073e2
PMF
449 */
450 smp_rmb();
b73a4c47
PMF
451//ust// #else
452//ust// if (raw_smp_processor_id() != buf->cpu) {
453//ust// smp_mb(); /* Total order with IPI handler smp_mb() */
454//ust// smp_call_function_single(buf->cpu, remote_mb, NULL, 1);
455//ust// smp_mb(); /* Total order with IPI handler smp_mb() */
456//ust// }
457//ust// #endif
458
b102c2b0 459 write_offset = uatomic_read(&buf->offset);
b5b073e2
PMF
460 /*
461 * Check that the subbuffer we are trying to consume has been
462 * already fully committed.
463 */
464 if (((commit_count - buf->chan->subbuf_size)
465 & channel->commit_count_mask)
466 - (BUFFER_TRUNC(consumed_old, buf->chan)
467 >> channel->n_subbufs_order)
468 != 0) {
469 return -EAGAIN;
470 }
471 /*
472 * Check that we are not about to read the same subbuffer in
473 * which the writer head is.
474 */
475 if ((SUBBUF_TRUNC(write_offset, buf->chan)
476 - SUBBUF_TRUNC(consumed_old, buf->chan))
477 == 0) {
478 return -EAGAIN;
479 }
480
b73a4c47
PMF
481 /* FIXME: is this ok to disable the reading feature? */
482//ust// retval = update_read_sb_index(buf, consumed_idx);
483//ust// if (retval)
484//ust// return retval;
485
486 *consumed = consumed_old;
487
b5b073e2
PMF
488 return 0;
489}
490
b73a4c47 491int ust_buffers_put_subbuf(struct ust_buffer *buf, unsigned long uconsumed_old)
b5b073e2
PMF
492{
493 long consumed_new, consumed_old;
494
b102c2b0 495 consumed_old = uatomic_read(&buf->consumed);
b5b073e2
PMF
496 consumed_old = consumed_old & (~0xFFFFFFFFL);
497 consumed_old = consumed_old | uconsumed_old;
498 consumed_new = SUBBUF_ALIGN(consumed_old, buf->chan);
499
500//ust// spin_lock(&ltt_buf->full_lock);
b102c2b0 501 if (uatomic_cmpxchg(&buf->consumed, consumed_old,
b5b073e2
PMF
502 consumed_new)
503 != consumed_old) {
504 /* We have been pushed by the writer : the last
505 * buffer read _is_ corrupted! It can also
506 * happen if this is a buffer we never got. */
507//ust// spin_unlock(&ltt_buf->full_lock);
508 return -EIO;
509 } else {
510 /* tell the client that buffer is now unfull */
511 int index;
512 long data;
513 index = SUBBUF_INDEX(consumed_old, buf->chan);
514 data = BUFFER_OFFSET(consumed_old, buf->chan);
515 ltt_buf_unfull(buf, index, data);
516//ust// spin_unlock(&ltt_buf->full_lock);
517 }
518 return 0;
519}
520
521static void ltt_relay_print_subbuffer_errors(
522 struct ust_channel *channel,
204141ee 523 long cons_off, int cpu)
b5b073e2 524{
204141ee 525 struct ust_buffer *ltt_buf = channel->buf[cpu];
b73a4c47 526 long cons_idx, commit_count, commit_count_sb, write_offset;
b5b073e2
PMF
527
528 cons_idx = SUBBUF_INDEX(cons_off, channel);
b102c2b0
PMF
529 commit_count = uatomic_read(&ltt_buf->commit_count[cons_idx].cc);
530 commit_count_sb = uatomic_read(&ltt_buf->commit_count[cons_idx].cc_sb);
b73a4c47 531
b5b073e2
PMF
532 /*
533 * No need to order commit_count and write_offset reads because we
534 * execute after trace is stopped when there are no readers left.
535 */
b102c2b0 536 write_offset = uatomic_read(&ltt_buf->offset);
b5b073e2 537 WARN( "LTT : unread channel %s offset is %ld "
b73a4c47
PMF
538 "and cons_off : %ld (cpu %d)\n",
539 channel->channel_name, write_offset, cons_off, cpu);
b5b073e2
PMF
540 /* Check each sub-buffer for non filled commit count */
541 if (((commit_count - channel->subbuf_size) & channel->commit_count_mask)
542 - (BUFFER_TRUNC(cons_off, channel) >> channel->n_subbufs_order) != 0) {
543 ERR("LTT : %s : subbuffer %lu has non filled "
b73a4c47
PMF
544 "commit count [cc, cc_sb] [%lu,%lu].\n",
545 channel->channel_name, cons_idx, commit_count, commit_count_sb);
b5b073e2
PMF
546 }
547 ERR("LTT : %s : commit count : %lu, subbuf size %zd\n",
548 channel->channel_name, commit_count,
549 channel->subbuf_size);
550}
551
b73a4c47 552static void ltt_relay_print_errors(struct ust_trace *trace,
204141ee 553 struct ust_channel *channel, int cpu)
b5b073e2 554{
204141ee 555 struct ust_buffer *ltt_buf = channel->buf[cpu];
b5b073e2
PMF
556 long cons_off;
557
4292ed8a
PMF
558 /*
559 * Can be called in the error path of allocation when
560 * trans_channel_data is not yet set.
561 */
562 if (!channel)
563 return;
564
e17571a5
PMF
565//ust// for (cons_off = 0; cons_off < rchan->alloc_size;
566//ust// cons_off = SUBBUF_ALIGN(cons_off, rchan))
567//ust// ust_buffers_print_written(ltt_chan, cons_off, cpu);
b102c2b0
PMF
568 for (cons_off = uatomic_read(&ltt_buf->consumed);
569 (SUBBUF_TRUNC(uatomic_read(&ltt_buf->offset),
b5b073e2
PMF
570 channel)
571 - cons_off) > 0;
572 cons_off = SUBBUF_ALIGN(cons_off, channel))
204141ee 573 ltt_relay_print_subbuffer_errors(channel, cons_off, cpu);
b5b073e2
PMF
574}
575
204141ee 576static void ltt_relay_print_buffer_errors(struct ust_channel *channel, int cpu)
b5b073e2 577{
b73a4c47 578 struct ust_trace *trace = channel->trace;
204141ee 579 struct ust_buffer *ltt_buf = channel->buf[cpu];
b5b073e2 580
b102c2b0 581 if (uatomic_read(&ltt_buf->events_lost))
b73a4c47 582 ERR("channel %s: %ld events lost (cpu %d)",
b5b073e2 583 channel->channel_name,
b102c2b0
PMF
584 uatomic_read(&ltt_buf->events_lost), cpu);
585 if (uatomic_read(&ltt_buf->corrupted_subbuffers))
b73a4c47 586 ERR("channel %s : %ld corrupted subbuffers (cpu %d)",
b5b073e2 587 channel->channel_name,
b102c2b0 588 uatomic_read(&ltt_buf->corrupted_subbuffers), cpu);
b5b073e2 589
204141ee 590 ltt_relay_print_errors(trace, channel, cpu);
b5b073e2
PMF
591}
592
593static void ltt_relay_release_channel(struct kref *kref)
594{
595 struct ust_channel *ltt_chan = container_of(kref,
596 struct ust_channel, kref);
597 free(ltt_chan->buf);
598}
599
600/*
601 * Create ltt buffer.
602 */
b73a4c47 603//ust// static int ltt_relay_create_buffer(struct ust_trace *trace,
b5b073e2
PMF
604//ust// struct ltt_channel_struct *ltt_chan, struct rchan_buf *buf,
605//ust// unsigned int cpu, unsigned int n_subbufs)
606//ust// {
607//ust// struct ltt_channel_buf_struct *ltt_buf =
608//ust// percpu_ptr(ltt_chan->buf, cpu);
609//ust// unsigned int j;
b73a4c47 610//ust//
b5b073e2
PMF
611//ust// ltt_buf->commit_count =
612//ust// kzalloc_node(sizeof(ltt_buf->commit_count) * n_subbufs,
613//ust// GFP_KERNEL, cpu_to_node(cpu));
614//ust// if (!ltt_buf->commit_count)
615//ust// return -ENOMEM;
616//ust// kref_get(&trace->kref);
617//ust// kref_get(&trace->ltt_transport_kref);
618//ust// kref_get(&ltt_chan->kref);
b102c2b0
PMF
619//ust// uatomic_set(&ltt_buf->offset, ltt_subbuffer_header_size());
620//ust// uatomic_set(&ltt_buf->consumed, 0);
621//ust// uatomic_set(&ltt_buf->active_readers, 0);
b5b073e2 622//ust// for (j = 0; j < n_subbufs; j++)
b102c2b0 623//ust// uatomic_set(&ltt_buf->commit_count[j], 0);
b5b073e2 624//ust// init_waitqueue_head(&ltt_buf->write_wait);
b102c2b0 625//ust// uatomic_set(&ltt_buf->wakeup_readers, 0);
b5b073e2 626//ust// spin_lock_init(&ltt_buf->full_lock);
b73a4c47 627//ust//
b5b073e2
PMF
628//ust// ltt_buffer_begin_callback(buf, trace->start_tsc, 0);
629//ust// /* atomic_add made on local variable on data that belongs to
630//ust// * various CPUs : ok because tracing not started (for this cpu). */
b102c2b0 631//ust// uatomic_add(&ltt_buf->commit_count[0], ltt_subbuffer_header_size());
b73a4c47 632//ust//
b102c2b0
PMF
633//ust// uatomic_set(&ltt_buf->events_lost, 0);
634//ust// uatomic_set(&ltt_buf->corrupted_subbuffers, 0);
b73a4c47 635//ust//
b5b073e2
PMF
636//ust// return 0;
637//ust// }
638
b73a4c47 639static int ust_buffers_init_buffer(struct ust_trace *trace,
b5b073e2
PMF
640 struct ust_channel *ltt_chan, struct ust_buffer *buf,
641 unsigned int n_subbufs)
642{
643 unsigned int j;
644 int fds[2];
645 int result;
646
647 buf->commit_count =
b73a4c47 648 zmalloc(sizeof(*buf->commit_count) * n_subbufs);
b5b073e2
PMF
649 if (!buf->commit_count)
650 return -ENOMEM;
651 kref_get(&trace->kref);
652 kref_get(&trace->ltt_transport_kref);
653 kref_get(&ltt_chan->kref);
b102c2b0
PMF
654 uatomic_set(&buf->offset, ltt_subbuffer_header_size());
655 uatomic_set(&buf->consumed, 0);
656 uatomic_set(&buf->active_readers, 0);
b73a4c47 657 for (j = 0; j < n_subbufs; j++) {
b102c2b0
PMF
658 uatomic_set(&buf->commit_count[j].cc, 0);
659 uatomic_set(&buf->commit_count[j].cc_sb, 0);
b73a4c47 660 }
b5b073e2 661//ust// init_waitqueue_head(&buf->write_wait);
b102c2b0 662//ust// uatomic_set(&buf->wakeup_readers, 0);
b5b073e2
PMF
663//ust// spin_lock_init(&buf->full_lock);
664
b73a4c47 665 ltt_buffer_begin(buf, trace->start_tsc, 0);
b5b073e2 666
b102c2b0 667 uatomic_add(&buf->commit_count[0].cc, ltt_subbuffer_header_size());
b5b073e2 668
b102c2b0
PMF
669 uatomic_set(&buf->events_lost, 0);
670 uatomic_set(&buf->corrupted_subbuffers, 0);
b5b073e2
PMF
671
672 result = pipe(fds);
673 if(result == -1) {
674 PERROR("pipe");
675 return -1;
676 }
677 buf->data_ready_fd_read = fds[0];
678 buf->data_ready_fd_write = fds[1];
679
b5b073e2
PMF
680//ust// buf->commit_seq = malloc(sizeof(buf->commit_seq) * n_subbufs);
681//ust// if(!ltt_buf->commit_seq) {
682//ust// return -1;
683//ust// }
37315729 684 memset(buf->commit_seq, 0, sizeof(buf->commit_seq[0]) * n_subbufs);
b5b073e2
PMF
685
686 /* FIXME: decrementally destroy on error */
687
688 return 0;
689}
690
691/* FIXME: use this function */
204141ee 692static void ust_buffers_destroy_buffer(struct ust_channel *ltt_chan, int cpu)
b5b073e2 693{
b73a4c47 694 struct ust_trace *trace = ltt_chan->trace;
204141ee 695 struct ust_buffer *ltt_buf = ltt_chan->buf[cpu];
b5b073e2
PMF
696
697 kref_put(&ltt_chan->trace->ltt_transport_kref,
698 ltt_release_transport);
204141ee 699 ltt_relay_print_buffer_errors(ltt_chan, cpu);
b5b073e2 700//ust// free(ltt_buf->commit_seq);
909bc43f 701 free(ltt_buf->commit_count);
b5b073e2
PMF
702 ltt_buf->commit_count = NULL;
703 kref_put(&ltt_chan->kref, ltt_relay_release_channel);
704 kref_put(&trace->kref, ltt_release_trace);
705//ust// wake_up_interruptible(&trace->kref_wq);
706}
707
204141ee 708static int ust_buffers_alloc_channel_buf_structs(struct ust_channel *chan)
b5b073e2
PMF
709{
710 void *ptr;
711 int result;
204141ee
PMF
712 size_t size;
713 int i;
b5b073e2 714
204141ee 715 size = PAGE_ALIGN(1);
b5b073e2 716
204141ee 717 for(i=0; i<chan->n_cpus; i++) {
b5b073e2 718
204141ee
PMF
719 result = chan->buf_struct_shmids[i] = shmget(getpid(), size, IPC_CREAT | IPC_EXCL | 0700);
720 if(result == -1) {
721 PERROR("shmget");
722 goto destroy_previous;
723 }
b5b073e2 724
204141ee
PMF
725 /* FIXME: should have matching call to shmdt */
726 ptr = shmat(chan->buf_struct_shmids[i], NULL, 0);
727 if(ptr == (void *) -1) {
728 perror("shmat");
729 goto destroy_shm;
730 }
731
732 /* Already mark the shared memory for destruction. This will occur only
733 * when all users have detached.
734 */
735 result = shmctl(chan->buf_struct_shmids[i], IPC_RMID, NULL);
736 if(result == -1) {
737 perror("shmctl");
738 goto destroy_previous;
739 }
740
741 chan->buf[i] = ptr;
b5b073e2
PMF
742 }
743
204141ee 744 return 0;
b5b073e2 745
204141ee
PMF
746 /* Jumping inside this loop occurs from within the other loop above with i as
747 * counter, so it unallocates the structures for the cpu = current_i down to
748 * zero. */
749 for(; i>=0; i--) {
750 destroy_shm:
751 result = shmctl(chan->buf_struct_shmids[i], IPC_RMID, NULL);
752 if(result == -1) {
753 perror("shmctl");
754 }
b5b073e2 755
204141ee
PMF
756 destroy_previous:
757 continue;
b5b073e2
PMF
758 }
759
204141ee 760 return -1;
b5b073e2
PMF
761}
762
763/*
764 * Create channel.
765 */
b73a4c47 766static int ust_buffers_create_channel(const char *trace_name, struct ust_trace *trace,
b5b073e2
PMF
767 const char *channel_name, struct ust_channel *ltt_chan,
768 unsigned int subbuf_size, unsigned int n_subbufs, int overwrite)
769{
b5b073e2
PMF
770 int result;
771
772 kref_init(&ltt_chan->kref);
773
774 ltt_chan->trace = trace;
b5b073e2
PMF
775 ltt_chan->overwrite = overwrite;
776 ltt_chan->n_subbufs_order = get_count_order(n_subbufs);
777 ltt_chan->commit_count_mask = (~0UL >> ltt_chan->n_subbufs_order);
204141ee 778 ltt_chan->n_cpus = get_n_cpus();
b5b073e2 779//ust// ltt_chan->buf = percpu_alloc_mask(sizeof(struct ltt_channel_buf_struct), GFP_KERNEL, cpu_possible_map);
1dba3e6c 780 ltt_chan->buf = (void *) zmalloc(ltt_chan->n_cpus * sizeof(void *));
204141ee
PMF
781 if(ltt_chan->buf == NULL) {
782 goto error;
783 }
1dba3e6c 784 ltt_chan->buf_struct_shmids = (int *) zmalloc(ltt_chan->n_cpus * sizeof(int));
204141ee
PMF
785 if(ltt_chan->buf_struct_shmids == NULL)
786 goto free_buf;
b5b073e2 787
204141ee
PMF
788 result = ust_buffers_alloc_channel_buf_structs(ltt_chan);
789 if(result != 0) {
790 goto free_buf_struct_shmids;
791 }
b5b073e2 792
b5b073e2 793 result = ust_buffers_channel_open(ltt_chan, subbuf_size, n_subbufs);
204141ee 794 if (result != 0) {
c1f20530 795 ERR("Cannot open channel for trace %s", trace_name);
204141ee 796 goto unalloc_buf_structs;
b5b073e2
PMF
797 }
798
204141ee
PMF
799 return 0;
800
801unalloc_buf_structs:
802 /* FIXME: put a call here to unalloc the buf structs! */
803
804free_buf_struct_shmids:
805 free(ltt_chan->buf_struct_shmids);
b5b073e2 806
204141ee
PMF
807free_buf:
808 free(ltt_chan->buf);
809
810error:
811 return -1;
b5b073e2
PMF
812}
813
b5b073e2
PMF
814static void ltt_relay_async_wakeup_chan(struct ust_channel *ltt_channel)
815{
816//ust// unsigned int i;
817//ust// struct rchan *rchan = ltt_channel->trans_channel_data;
818//ust//
819//ust// for_each_possible_cpu(i) {
820//ust// struct ltt_channel_buf_struct *ltt_buf =
821//ust// percpu_ptr(ltt_channel->buf, i);
822//ust//
b102c2b0
PMF
823//ust// if (uatomic_read(&ltt_buf->wakeup_readers) == 1) {
824//ust// uatomic_set(&ltt_buf->wakeup_readers, 0);
b5b073e2
PMF
825//ust// wake_up_interruptible(&rchan->buf[i]->read_wait);
826//ust// }
827//ust// }
828}
829
204141ee 830static void ltt_relay_finish_buffer(struct ust_channel *channel, unsigned int cpu)
b5b073e2
PMF
831{
832// int result;
833
204141ee
PMF
834 if (channel->buf[cpu]) {
835 struct ust_buffer *buf = channel->buf[cpu];
97c10252 836 ltt_force_switch(buf, FORCE_FLUSH);
b5b073e2
PMF
837//ust// ltt_relay_wake_writers(ltt_buf);
838 /* closing the pipe tells the consumer the buffer is finished */
839
840 //result = write(ltt_buf->data_ready_fd_write, "D", 1);
841 //if(result == -1) {
842 // PERROR("write (in ltt_relay_finish_buffer)");
843 // ERR("this should never happen!");
844 //}
845 close(buf->data_ready_fd_write);
846 }
847}
848
849
850static void ltt_relay_finish_channel(struct ust_channel *channel)
851{
204141ee 852 unsigned int i;
b5b073e2 853
204141ee
PMF
854 for(i=0; i<channel->n_cpus; i++) {
855 ltt_relay_finish_buffer(channel, i);
856 }
b5b073e2
PMF
857}
858
859static void ltt_relay_remove_channel(struct ust_channel *channel)
860{
861 ust_buffers_channel_close(channel);
862 kref_put(&channel->kref, ltt_relay_release_channel);
863}
864
b5b073e2 865/*
b73a4c47
PMF
866 * ltt_reserve_switch_old_subbuf: switch old subbuffer
867 *
868 * Concurrency safe because we are the last and only thread to alter this
869 * sub-buffer. As long as it is not delivered and read, no other thread can
870 * alter the offset, alter the reserve_count or call the
871 * client_buffer_end_callback on this sub-buffer.
872 *
873 * The only remaining threads could be the ones with pending commits. They will
874 * have to do the deliver themselves. Not concurrency safe in overwrite mode.
875 * We detect corrupted subbuffers with commit and reserve counts. We keep a
876 * corrupted sub-buffers count and push the readers across these sub-buffers.
877 *
878 * Not concurrency safe if a writer is stalled in a subbuffer and another writer
879 * switches in, finding out it's corrupted. The result will be than the old
880 * (uncommited) subbuffer will be declared corrupted, and that the new subbuffer
881 * will be declared corrupted too because of the commit count adjustment.
882 *
883 * Note : offset_old should never be 0 here.
b5b073e2 884 */
b73a4c47
PMF
885static void ltt_reserve_switch_old_subbuf(
886 struct ust_channel *chan, struct ust_buffer *buf,
887 struct ltt_reserve_switch_offsets *offsets, u64 *tsc)
b5b073e2 888{
b73a4c47
PMF
889 long oldidx = SUBBUF_INDEX(offsets->old - 1, chan);
890 long commit_count, padding_size;
b5b073e2 891
b73a4c47
PMF
892 padding_size = chan->subbuf_size
893 - (SUBBUF_OFFSET(offsets->old - 1, chan) + 1);
894 ltt_buffer_end(buf, *tsc, offsets->old, oldidx);
b5b073e2 895
b73a4c47
PMF
896 /*
897 * Must write slot data before incrementing commit count.
898 * This compiler barrier is upgraded into a smp_wmb() by the IPI
899 * sent by get_subbuf() when it does its smp_rmb().
900 */
ee4c3467 901 smp_wmb();
b102c2b0
PMF
902 uatomic_add(&buf->commit_count[oldidx].cc, padding_size);
903 commit_count = uatomic_read(&buf->commit_count[oldidx].cc);
b73a4c47 904 ltt_check_deliver(chan, buf, offsets->old - 1, commit_count, oldidx);
1e8c9e7b 905 ltt_write_commit_counter(chan, buf, oldidx,
b73a4c47
PMF
906 offsets->old, commit_count, padding_size);
907}
b5b073e2 908
b73a4c47
PMF
909/*
910 * ltt_reserve_switch_new_subbuf: Populate new subbuffer.
911 *
912 * This code can be executed unordered : writers may already have written to the
913 * sub-buffer before this code gets executed, caution. The commit makes sure
914 * that this code is executed before the deliver of this sub-buffer.
915 */
916static void ltt_reserve_switch_new_subbuf(
917 struct ust_channel *chan, struct ust_buffer *buf,
918 struct ltt_reserve_switch_offsets *offsets, u64 *tsc)
919{
920 long beginidx = SUBBUF_INDEX(offsets->begin, chan);
921 long commit_count;
b5b073e2 922
b73a4c47 923 ltt_buffer_begin(buf, *tsc, beginidx);
b5b073e2 924
b73a4c47
PMF
925 /*
926 * Must write slot data before incrementing commit count.
927 * This compiler barrier is upgraded into a smp_wmb() by the IPI
928 * sent by get_subbuf() when it does its smp_rmb().
929 */
ee4c3467 930 smp_wmb();
b102c2b0
PMF
931 uatomic_add(&buf->commit_count[beginidx].cc, ltt_subbuffer_header_size());
932 commit_count = uatomic_read(&buf->commit_count[beginidx].cc);
b73a4c47
PMF
933 /* Check if the written buffer has to be delivered */
934 ltt_check_deliver(chan, buf, offsets->begin, commit_count, beginidx);
1e8c9e7b 935 ltt_write_commit_counter(chan, buf, beginidx,
b73a4c47
PMF
936 offsets->begin, commit_count, ltt_subbuffer_header_size());
937}
b5b073e2 938
b73a4c47
PMF
939/*
940 * ltt_reserve_end_switch_current: finish switching current subbuffer
941 *
942 * Concurrency safe because we are the last and only thread to alter this
943 * sub-buffer. As long as it is not delivered and read, no other thread can
944 * alter the offset, alter the reserve_count or call the
945 * client_buffer_end_callback on this sub-buffer.
946 *
947 * The only remaining threads could be the ones with pending commits. They will
948 * have to do the deliver themselves. Not concurrency safe in overwrite mode.
949 * We detect corrupted subbuffers with commit and reserve counts. We keep a
950 * corrupted sub-buffers count and push the readers across these sub-buffers.
951 *
952 * Not concurrency safe if a writer is stalled in a subbuffer and another writer
953 * switches in, finding out it's corrupted. The result will be than the old
954 * (uncommited) subbuffer will be declared corrupted, and that the new subbuffer
955 * will be declared corrupted too because of the commit count adjustment.
956 */
957static void ltt_reserve_end_switch_current(
958 struct ust_channel *chan,
959 struct ust_buffer *buf,
960 struct ltt_reserve_switch_offsets *offsets, u64 *tsc)
961{
962 long endidx = SUBBUF_INDEX(offsets->end - 1, chan);
963 long commit_count, padding_size;
964
965 padding_size = chan->subbuf_size
966 - (SUBBUF_OFFSET(offsets->end - 1, chan) + 1);
967
968 ltt_buffer_end(buf, *tsc, offsets->end, endidx);
969
970 /*
971 * Must write slot data before incrementing commit count.
972 * This compiler barrier is upgraded into a smp_wmb() by the IPI
973 * sent by get_subbuf() when it does its smp_rmb().
974 */
ee4c3467 975 smp_wmb();
b102c2b0
PMF
976 uatomic_add(&buf->commit_count[endidx].cc, padding_size);
977 commit_count = uatomic_read(&buf->commit_count[endidx].cc);
b73a4c47
PMF
978 ltt_check_deliver(chan, buf,
979 offsets->end - 1, commit_count, endidx);
1e8c9e7b 980 ltt_write_commit_counter(chan, buf, endidx,
b73a4c47 981 offsets->end, commit_count, padding_size);
b5b073e2
PMF
982}
983
984/*
985 * Returns :
986 * 0 if ok
987 * !0 if execution must be aborted.
988 */
b73a4c47 989static int ltt_relay_try_switch_slow(
b5b073e2 990 enum force_switch_mode mode,
b73a4c47 991 struct ust_channel *chan,
b5b073e2
PMF
992 struct ust_buffer *buf,
993 struct ltt_reserve_switch_offsets *offsets,
994 u64 *tsc)
995{
996 long subbuf_index;
b73a4c47 997 long reserve_commit_diff;
b5b073e2 998
b102c2b0 999 offsets->begin = uatomic_read(&buf->offset);
b5b073e2
PMF
1000 offsets->old = offsets->begin;
1001 offsets->begin_switch = 0;
1002 offsets->end_switch_old = 0;
1003
1004 *tsc = trace_clock_read64();
1005
1006 if (SUBBUF_OFFSET(offsets->begin, buf->chan) != 0) {
1007 offsets->begin = SUBBUF_ALIGN(offsets->begin, buf->chan);
1008 offsets->end_switch_old = 1;
1009 } else {
1010 /* we do not have to switch : buffer is empty */
1011 return -1;
1012 }
1013 if (mode == FORCE_ACTIVE)
1014 offsets->begin += ltt_subbuffer_header_size();
1015 /*
1016 * Always begin_switch in FORCE_ACTIVE mode.
1017 * Test new buffer integrity
1018 */
1019 subbuf_index = SUBBUF_INDEX(offsets->begin, buf->chan);
b73a4c47 1020 reserve_commit_diff =
b5b073e2 1021 (BUFFER_TRUNC(offsets->begin, buf->chan)
b73a4c47 1022 >> chan->n_subbufs_order)
b102c2b0 1023 - (uatomic_read(&buf->commit_count[subbuf_index].cc_sb)
b73a4c47
PMF
1024 & chan->commit_count_mask);
1025 if (reserve_commit_diff == 0) {
b5b073e2
PMF
1026 /* Next buffer not corrupted. */
1027 if (mode == FORCE_ACTIVE
b73a4c47 1028 && !chan->overwrite
b102c2b0 1029 && offsets->begin - uatomic_read(&buf->consumed)
b73a4c47 1030 >= chan->alloc_size) {
b5b073e2
PMF
1031 /*
1032 * We do not overwrite non consumed buffers and we are
1033 * full : ignore switch while tracing is active.
1034 */
1035 return -1;
1036 }
1037 } else {
1038 /*
1039 * Next subbuffer corrupted. Force pushing reader even in normal
1040 * mode
1041 */
1042 }
1043 offsets->end = offsets->begin;
1044 return 0;
1045}
1046
b5b073e2 1047/*
b73a4c47
PMF
1048 * Force a sub-buffer switch for a per-cpu buffer. This operation is
1049 * completely reentrant : can be called while tracing is active with
1050 * absolutely no lock held.
b5b073e2 1051 */
b73a4c47
PMF
1052void ltt_force_switch_lockless_slow(struct ust_buffer *buf,
1053 enum force_switch_mode mode)
b5b073e2 1054{
b73a4c47 1055 struct ust_channel *chan = buf->chan;
b5b073e2 1056 struct ltt_reserve_switch_offsets offsets;
b73a4c47 1057 u64 tsc;
b5b073e2 1058
b5b073e2
PMF
1059 offsets.size = 0;
1060
10dd3941 1061 DBG("Switching (forced) %s_%d", chan->channel_name, buf->cpu);
b5b073e2
PMF
1062 /*
1063 * Perform retryable operations.
1064 */
b5b073e2 1065 do {
b73a4c47
PMF
1066 if (ltt_relay_try_switch_slow(mode, chan, buf,
1067 &offsets, &tsc))
1068 return;
b102c2b0 1069 } while (uatomic_cmpxchg(&buf->offset, offsets.old,
b5b073e2
PMF
1070 offsets.end) != offsets.old);
1071
1072 /*
1073 * Atomically update last_tsc. This update races against concurrent
1074 * atomic updates, but the race will always cause supplementary full TSC
1075 * events, never the opposite (missing a full TSC event when it would be
1076 * needed).
1077 */
b73a4c47 1078 save_last_tsc(buf, tsc);
b5b073e2
PMF
1079
1080 /*
1081 * Push the reader if necessary
1082 */
b73a4c47
PMF
1083 if (mode == FORCE_ACTIVE) {
1084 ltt_reserve_push_reader(chan, buf, offsets.end - 1);
1085//ust// ltt_clear_noref_flag(chan, buf, SUBBUF_INDEX(offsets.end - 1, chan));
1086 }
b5b073e2
PMF
1087
1088 /*
1089 * Switch old subbuffer if needed.
1090 */
b73a4c47
PMF
1091 if (offsets.end_switch_old) {
1092//ust// ltt_clear_noref_flag(rchan, buf, SUBBUF_INDEX(offsets.old - 1, rchan));
1093 ltt_reserve_switch_old_subbuf(chan, buf, &offsets, &tsc);
1094 }
b5b073e2
PMF
1095
1096 /*
1097 * Populate new subbuffer.
1098 */
b73a4c47
PMF
1099 if (mode == FORCE_ACTIVE)
1100 ltt_reserve_switch_new_subbuf(chan, buf, &offsets, &tsc);
1101}
b5b073e2 1102
b73a4c47
PMF
1103/*
1104 * Returns :
1105 * 0 if ok
1106 * !0 if execution must be aborted.
1107 */
1108static int ltt_relay_try_reserve_slow(struct ust_channel *chan, struct ust_buffer *buf,
1109 struct ltt_reserve_switch_offsets *offsets, size_t data_size,
1110 u64 *tsc, unsigned int *rflags, int largest_align)
1111{
1112 long reserve_commit_diff;
b5b073e2 1113
b102c2b0 1114 offsets->begin = uatomic_read(&buf->offset);
b73a4c47
PMF
1115 offsets->old = offsets->begin;
1116 offsets->begin_switch = 0;
1117 offsets->end_switch_current = 0;
1118 offsets->end_switch_old = 0;
1119
1120 *tsc = trace_clock_read64();
1121 if (last_tsc_overflow(buf, *tsc))
1122 *rflags = LTT_RFLAG_ID_SIZE_TSC;
1123
1124 if (unlikely(SUBBUF_OFFSET(offsets->begin, buf->chan) == 0)) {
1125 offsets->begin_switch = 1; /* For offsets->begin */
1126 } else {
1127 offsets->size = ust_get_header_size(chan,
1128 offsets->begin, data_size,
1129 &offsets->before_hdr_pad, *rflags);
1130 offsets->size += ltt_align(offsets->begin + offsets->size,
1131 largest_align)
1132 + data_size;
1133 if (unlikely((SUBBUF_OFFSET(offsets->begin, buf->chan) +
1134 offsets->size) > buf->chan->subbuf_size)) {
1135 offsets->end_switch_old = 1; /* For offsets->old */
1136 offsets->begin_switch = 1; /* For offsets->begin */
1137 }
1138 }
1139 if (unlikely(offsets->begin_switch)) {
1140 long subbuf_index;
1141
1142 /*
1143 * We are typically not filling the previous buffer completely.
1144 */
1145 if (likely(offsets->end_switch_old))
1146 offsets->begin = SUBBUF_ALIGN(offsets->begin,
1147 buf->chan);
1148 offsets->begin = offsets->begin + ltt_subbuffer_header_size();
1149 /* Test new buffer integrity */
1150 subbuf_index = SUBBUF_INDEX(offsets->begin, buf->chan);
1151 reserve_commit_diff =
1152 (BUFFER_TRUNC(offsets->begin, buf->chan)
1153 >> chan->n_subbufs_order)
b102c2b0 1154 - (uatomic_read(&buf->commit_count[subbuf_index].cc_sb)
b73a4c47
PMF
1155 & chan->commit_count_mask);
1156 if (likely(reserve_commit_diff == 0)) {
1157 /* Next buffer not corrupted. */
1158 if (unlikely(!chan->overwrite &&
1159 (SUBBUF_TRUNC(offsets->begin, buf->chan)
b102c2b0 1160 - SUBBUF_TRUNC(uatomic_read(
b73a4c47
PMF
1161 &buf->consumed),
1162 buf->chan))
1163 >= chan->alloc_size)) {
1164 /*
1165 * We do not overwrite non consumed buffers
1166 * and we are full : event is lost.
1167 */
b102c2b0 1168 uatomic_inc(&buf->events_lost);
b73a4c47
PMF
1169 return -1;
1170 } else {
1171 /*
1172 * next buffer not corrupted, we are either in
1173 * overwrite mode or the buffer is not full.
1174 * It's safe to write in this new subbuffer.
1175 */
1176 }
1177 } else {
1178 /*
1179 * Next subbuffer corrupted. Drop event in normal and
1180 * overwrite mode. Caused by either a writer OOPS or
1181 * too many nested writes over a reserve/commit pair.
1182 */
b102c2b0 1183 uatomic_inc(&buf->events_lost);
b73a4c47
PMF
1184 return -1;
1185 }
1186 offsets->size = ust_get_header_size(chan,
1187 offsets->begin, data_size,
1188 &offsets->before_hdr_pad, *rflags);
1189 offsets->size += ltt_align(offsets->begin + offsets->size,
1190 largest_align)
1191 + data_size;
1192 if (unlikely((SUBBUF_OFFSET(offsets->begin, buf->chan)
1193 + offsets->size) > buf->chan->subbuf_size)) {
1194 /*
1195 * Event too big for subbuffers, report error, don't
1196 * complete the sub-buffer switch.
1197 */
b102c2b0 1198 uatomic_inc(&buf->events_lost);
b73a4c47
PMF
1199 return -1;
1200 } else {
1201 /*
1202 * We just made a successful buffer switch and the event
1203 * fits in the new subbuffer. Let's write.
1204 */
1205 }
1206 } else {
1207 /*
1208 * Event fits in the current buffer and we are not on a switch
1209 * boundary. It's safe to write.
1210 */
1211 }
1212 offsets->end = offsets->begin + offsets->size;
1213
1214 if (unlikely((SUBBUF_OFFSET(offsets->end, buf->chan)) == 0)) {
1215 /*
1216 * The offset_end will fall at the very beginning of the next
1217 * subbuffer.
1218 */
1219 offsets->end_switch_current = 1; /* For offsets->begin */
1220 }
b5b073e2
PMF
1221 return 0;
1222}
1223
b73a4c47
PMF
1224/**
1225 * ltt_relay_reserve_slot_lockless_slow - Atomic slot reservation in a buffer.
1226 * @trace: the trace structure to log to.
1227 * @ltt_channel: channel structure
1228 * @transport_data: data structure specific to ltt relay
1229 * @data_size: size of the variable length data to log.
1230 * @slot_size: pointer to total size of the slot (out)
1231 * @buf_offset : pointer to reserved buffer offset (out)
1232 * @tsc: pointer to the tsc at the slot reservation (out)
1233 * @cpu: cpuid
b5b073e2 1234 *
b73a4c47
PMF
1235 * Return : -ENOSPC if not enough space, else returns 0.
1236 * It will take care of sub-buffer switching.
b5b073e2 1237 */
12e81b07
PMF
1238int ltt_reserve_slot_lockless_slow(struct ust_channel *chan,
1239 struct ust_trace *trace, size_t data_size,
1240 int largest_align, int cpu,
1241 struct ust_buffer **ret_buf,
1242 size_t *slot_size, long *buf_offset,
1243 u64 *tsc, unsigned int *rflags)
b5b073e2 1244{
12e81b07 1245 struct ust_buffer *buf = *ret_buf = chan->buf[cpu];
b5b073e2 1246 struct ltt_reserve_switch_offsets offsets;
b5b073e2 1247
b5b073e2
PMF
1248 offsets.size = 0;
1249
b5b073e2 1250 do {
b73a4c47
PMF
1251 if (unlikely(ltt_relay_try_reserve_slow(chan, buf, &offsets,
1252 data_size, tsc, rflags, largest_align)))
1253 return -ENOSPC;
b102c2b0 1254 } while (unlikely(uatomic_cmpxchg(&buf->offset, offsets.old,
b73a4c47 1255 offsets.end) != offsets.old));
b5b073e2
PMF
1256
1257 /*
1258 * Atomically update last_tsc. This update races against concurrent
1259 * atomic updates, but the race will always cause supplementary full TSC
1260 * events, never the opposite (missing a full TSC event when it would be
1261 * needed).
1262 */
b73a4c47 1263 save_last_tsc(buf, *tsc);
b5b073e2
PMF
1264
1265 /*
1266 * Push the reader if necessary
1267 */
b73a4c47
PMF
1268 ltt_reserve_push_reader(chan, buf, offsets.end - 1);
1269
1270 /*
1271 * Clear noref flag for this subbuffer.
1272 */
1273//ust// ltt_clear_noref_flag(chan, buf, SUBBUF_INDEX(offsets.end - 1, chan));
b5b073e2
PMF
1274
1275 /*
1276 * Switch old subbuffer if needed.
1277 */
b73a4c47
PMF
1278 if (unlikely(offsets.end_switch_old)) {
1279//ust// ltt_clear_noref_flag(chan, buf, SUBBUF_INDEX(offsets.old - 1, chan));
1280 ltt_reserve_switch_old_subbuf(chan, buf, &offsets, tsc);
10dd3941 1281 DBG("Switching %s_%d", chan->channel_name, cpu);
b73a4c47 1282 }
b5b073e2
PMF
1283
1284 /*
1285 * Populate new subbuffer.
1286 */
b73a4c47
PMF
1287 if (unlikely(offsets.begin_switch))
1288 ltt_reserve_switch_new_subbuf(chan, buf, &offsets, tsc);
1289
1290 if (unlikely(offsets.end_switch_current))
1291 ltt_reserve_end_switch_current(chan, buf, &offsets, tsc);
1292
1293 *slot_size = offsets.size;
1294 *buf_offset = offsets.begin + offsets.before_hdr_pad;
1295 return 0;
b5b073e2
PMF
1296}
1297
b5b073e2
PMF
1298static struct ltt_transport ust_relay_transport = {
1299 .name = "ustrelay",
1300 .ops = {
1301 .create_channel = ust_buffers_create_channel,
1302 .finish_channel = ltt_relay_finish_channel,
1303 .remove_channel = ltt_relay_remove_channel,
1304 .wakeup_channel = ltt_relay_async_wakeup_chan,
b5b073e2
PMF
1305 },
1306};
1307
b5b073e2
PMF
1308static char initialized = 0;
1309
1310void __attribute__((constructor)) init_ustrelay_transport(void)
1311{
1312 if(!initialized) {
1313 ltt_transport_register(&ust_relay_transport);
1314 initialized = 1;
1315 }
1316}
1317
b73a4c47 1318static void __attribute__((destructor)) ust_buffers_exit(void)
b5b073e2
PMF
1319{
1320 ltt_transport_unregister(&ust_relay_transport);
1321}
b73a4c47 1322
12e81b07 1323size_t ltt_write_event_header_slow(struct ust_channel *channel,
b73a4c47
PMF
1324 struct ust_buffer *buf, long buf_offset,
1325 u16 eID, u32 event_size,
1326 u64 tsc, unsigned int rflags)
1327{
1328 struct ltt_event_header header;
1329 u16 small_size;
1330
1331 switch (rflags) {
1332 case LTT_RFLAG_ID_SIZE_TSC:
1333 header.id_time = 29 << LTT_TSC_BITS;
1334 break;
1335 case LTT_RFLAG_ID_SIZE:
1336 header.id_time = 30 << LTT_TSC_BITS;
1337 break;
1338 case LTT_RFLAG_ID:
1339 header.id_time = 31 << LTT_TSC_BITS;
1340 break;
1341 }
1342
1343 header.id_time |= (u32)tsc & LTT_TSC_MASK;
1344 ust_buffers_write(buf, buf_offset, &header, sizeof(header));
1345 buf_offset += sizeof(header);
1346
1347 switch (rflags) {
1348 case LTT_RFLAG_ID_SIZE_TSC:
1349 small_size = (u16)min_t(u32, event_size, LTT_MAX_SMALL_SIZE);
1350 ust_buffers_write(buf, buf_offset,
1351 &eID, sizeof(u16));
1352 buf_offset += sizeof(u16);
1353 ust_buffers_write(buf, buf_offset,
1354 &small_size, sizeof(u16));
1355 buf_offset += sizeof(u16);
1356 if (small_size == LTT_MAX_SMALL_SIZE) {
1357 ust_buffers_write(buf, buf_offset,
1358 &event_size, sizeof(u32));
1359 buf_offset += sizeof(u32);
1360 }
1361 buf_offset += ltt_align(buf_offset, sizeof(u64));
1362 ust_buffers_write(buf, buf_offset,
1363 &tsc, sizeof(u64));
1364 buf_offset += sizeof(u64);
1365 break;
1366 case LTT_RFLAG_ID_SIZE:
1367 small_size = (u16)min_t(u32, event_size, LTT_MAX_SMALL_SIZE);
1368 ust_buffers_write(buf, buf_offset,
1369 &eID, sizeof(u16));
1370 buf_offset += sizeof(u16);
1371 ust_buffers_write(buf, buf_offset,
1372 &small_size, sizeof(u16));
1373 buf_offset += sizeof(u16);
1374 if (small_size == LTT_MAX_SMALL_SIZE) {
1375 ust_buffers_write(buf, buf_offset,
1376 &event_size, sizeof(u32));
1377 buf_offset += sizeof(u32);
1378 }
1379 break;
1380 case LTT_RFLAG_ID:
1381 ust_buffers_write(buf, buf_offset,
1382 &eID, sizeof(u16));
1383 buf_offset += sizeof(u16);
1384 break;
1385 }
1386
1387 return buf_offset;
1388}
This page took 0.094086 seconds and 4 git commands to generate.