work in prog
[lttv.git] / usertrace-fast / ltt-usertrace-fast.c
1 /* LTTng user-space "fast" library
2 *
3 * This daemon is spawned by each traced thread (to share the mmap).
4 *
5 * Its job is to dump periodically this buffer to disk (when it receives a
6 * SIGUSR1 from its parent).
7 *
8 * It uses the control information in the shared memory area (producer/consumer
9 * count).
10 *
11 * When the parent thread dies (yes, those thing may happen) ;) , this daemon
12 * will flush the last buffer and write it to disk.
13 *
14 * Supplement note for streaming : the daemon is responsible for flushing
15 * periodically the buffer if it is streaming data.
16 *
17 *
18 * Notes :
19 * shm memory is typically limited to 4096 units (system wide limit SHMMNI in
20 * /proc/sys/kernel/shmmni). As it requires computation time upon creation, we
21 * do not use it : we will use a shared mmap() instead which is passed through
22 * the fork().
23 * MAP_SHARED mmap segment. Updated when msync or munmap are called.
24 * MAP_ANONYMOUS.
25 * Memory mapped by mmap() is preserved across fork(2), with the same
26 * attributes.
27 *
28 * Eventually, there will be two mode :
29 * * Slow thread spawn : a fork() is done for each new thread. If the process
30 * dies, the data is not lost.
31 * * Fast thread spawn : a pthread_create() is done by the application for each
32 * new thread.
33 *
34 * We use a timer to check periodically if the parent died. I think it is less
35 * intrusive than a ptrace() on the parent, which would get every signal. The
36 * side effect of this is that we won't be notified if the parent does an
37 * exec(). In this case, we will just sit there until the parent exits.
38 *
39 *
40 * Copyright 2006 Mathieu Desnoyers
41 *
42 */
43
44 #define _GNU_SOURCE
45 #define LTT_TRACE
46 #include <sys/types.h>
47 #include <sys/wait.h>
48 #include <unistd.h>
49 #include <stdlib.h>
50 #include <stdio.h>
51 #include <signal.h>
52 #include <syscall.h>
53 #include <features.h>
54 #include <pthread.h>
55 #include <malloc.h>
56 #include <string.h>
57 #include <sys/mman.h>
58 #include <signal.h>
59 #include <sys/stat.h>
60 #include <fcntl.h>
61 #include <stdlib.h>
62 #include <sys/param.h>
63 #include <sys/time.h>
64 #include <errno.h>
65
66 #include <asm/atomic.h>
67 #include <asm/timex.h> //for get_cycles()
68
69 _syscall0(pid_t,gettid)
70
71 #include "ltt-usertrace-fast.h"
72
73 enum force_switch_mode { FORCE_ACTIVE, FORCE_FLUSH };
74
75 /* Writer (the traced application) */
76
77 __thread struct ltt_trace_info *thread_trace_info = NULL;
78
79 void ltt_usertrace_fast_buffer_switch(void)
80 {
81 struct ltt_trace_info *tmp = thread_trace_info;
82 if(tmp)
83 kill(tmp->daemon_id, SIGUSR1);
84 }
85
86 /* The cleanup should never be called from a signal handler */
87 static void ltt_usertrace_fast_cleanup(void *arg)
88 {
89 struct ltt_trace_info *tmp = thread_trace_info;
90 if(tmp) {
91 thread_trace_info = NULL;
92 kill(tmp->daemon_id, SIGUSR2);
93 munmap(tmp, sizeof(*tmp));
94 }
95 }
96
97 /* Reader (the disk dumper daemon) */
98
99 static pid_t traced_pid = 0;
100 static pid_t traced_tid = 0;
101 static int parent_exited = 0;
102
103 /* signal handling */
104 static void handler_sigusr1(int signo)
105 {
106 printf("LTT Signal %d received : parent buffer switch.\n", signo);
107 }
108
109 static void handler_sigusr2(int signo)
110 {
111 printf("LTT Signal %d received : parent exited.\n", signo);
112 parent_exited = 1;
113 }
114
115 static void handler_sigalarm(int signo)
116 {
117 printf("LTT Signal %d received\n", signo);
118
119 if(getppid() != traced_pid) {
120 /* Parent died */
121 printf("LTT Parent %lu died, cleaning up\n", traced_pid);
122 traced_pid = 0;
123 }
124 alarm(3);
125 }
126
127 /* Do a buffer switch. Don't switch if buffer is completely empty */
128 static void flush_buffer(struct ltt_buf *ltt_buf, enum force_switch_mode mode)
129 {
130 uint64_t tsc;
131 int offset_begin, offset_end, offset_old;
132 int reserve_commit_diff;
133 int consumed_old, consumed_new;
134 int commit_count, reserve_count;
135 int end_switch_old;
136
137 do {
138 offset_old = atomic_read(&ltt_buf->offset);
139 offset_begin = offset_old;
140 end_switch_old = 0;
141 tsc = ltt_get_timestamp();
142 if(tsc == 0) {
143 /* Error in getting the timestamp : should not happen : it would
144 * mean we are called from an NMI during a write seqlock on xtime. */
145 return;
146 }
147
148 if(SUBBUF_OFFSET(offset_begin, ltt_buf) != 0) {
149 offset_begin = SUBBUF_ALIGN(offset_begin, ltt_buf);
150 end_switch_old = 1;
151 } else {
152 /* we do not have to switch : buffer is empty */
153 return;
154 }
155 if(mode == FORCE_ACTIVE)
156 offset_begin += ltt_subbuf_header_len(ltt_buf);
157 /* Always begin_switch in FORCE_ACTIVE mode */
158
159 /* Test new buffer integrity */
160 reserve_commit_diff =
161 atomic_read(
162 &ltt_buf->reserve_count[SUBBUF_INDEX(offset_begin, ltt_buf)])
163 - atomic_read(
164 &ltt_buf->commit_count[SUBBUF_INDEX(offset_begin, ltt_buf)]);
165 if(reserve_commit_diff == 0) {
166 /* Next buffer not corrupted. */
167 if(mode == FORCE_ACTIVE
168 && (offset_begin-atomic_read(&ltt_buf->consumed))
169 >= ltt_buf->alloc_size) {
170 /* We do not overwrite non consumed buffers and we are full : ignore
171 switch while tracing is active. */
172 return;
173 }
174 } else {
175 /* Next subbuffer corrupted. Force pushing reader even in normal mode */
176 }
177
178 offset_end = offset_begin;
179 } while(atomic_cmpxchg(&ltt_buf->offset, offset_old, offset_end)
180 != offset_old);
181
182
183 if(mode == FORCE_ACTIVE) {
184 /* Push the reader if necessary */
185 do {
186 consumed_old = atomic_read(&ltt_buf->consumed);
187 /* If buffer is in overwrite mode, push the reader consumed count if
188 the write position has reached it and we are not at the first
189 iteration (don't push the reader farther than the writer).
190 This operation can be done concurrently by many writers in the
191 same buffer, the writer being at the fartest write position sub-buffer
192 index in the buffer being the one which will win this loop. */
193 /* If the buffer is not in overwrite mode, pushing the reader only
194 happen if a sub-buffer is corrupted */
195 if((SUBBUF_TRUNC(offset_end, ltt_buf)
196 - SUBBUF_TRUNC(consumed_old, ltt_buf))
197 >= ltt_buf->alloc_size)
198 consumed_new = SUBBUF_ALIGN(consumed_old, ltt_buf);
199 else {
200 consumed_new = consumed_old;
201 break;
202 }
203 } while(atomic_cmpxchg(&ltt_buf->consumed, consumed_old, consumed_new)
204 != consumed_old);
205
206 if(consumed_old != consumed_new) {
207 /* Reader pushed : we are the winner of the push, we can therefore
208 reequilibrate reserve and commit. Atomic increment of the commit
209 count permits other writers to play around with this variable
210 before us. We keep track of corrupted_subbuffers even in overwrite
211 mode :
212 we never want to write over a non completely committed sub-buffer :
213 possible causes : the buffer size is too low compared to the unordered
214 data input, or there is a writer who died between the reserve and the
215 commit. */
216 if(reserve_commit_diff) {
217 /* We have to alter the sub-buffer commit count : a sub-buffer is
218 corrupted */
219 atomic_add(reserve_commit_diff,
220 &ltt_buf->commit_count[SUBBUF_INDEX(offset_begin, ltt_buf)]);
221 atomic_inc(&ltt_buf->corrupted_subbuffers);
222 }
223 }
224 }
225
226 /* Always switch */
227
228 if(end_switch_old) {
229 /* old subbuffer */
230 /* Concurrency safe because we are the last and only thread to alter this
231 sub-buffer. As long as it is not delivered and read, no other thread can
232 alter the offset, alter the reserve_count or call the
233 client_buffer_end_callback on this sub-buffer.
234 The only remaining threads could be the ones with pending commits. They
235 will have to do the deliver themself.
236 Not concurrency safe in overwrite mode. We detect corrupted subbuffers with
237 commit and reserve counts. We keep a corrupted sub-buffers count and push
238 the readers across these sub-buffers.
239 Not concurrency safe if a writer is stalled in a subbuffer and
240 another writer switches in, finding out it's corrupted. The result will be
241 than the old (uncommited) subbuffer will be declared corrupted, and that
242 the new subbuffer will be declared corrupted too because of the commit
243 count adjustment.
244 Offset old should never be 0. */
245 ltt_buffer_end_callback(ltt_buf, tsc, offset_old,
246 SUBBUF_INDEX((offset_old), ltt_buf));
247 /* Setting this reserve_count will allow the sub-buffer to be delivered by
248 the last committer. */
249 reserve_count = atomic_add_return((SUBBUF_OFFSET((offset_old-1),
250 ltt_buf) + 1),
251 &ltt_buf->reserve_count[SUBBUF_INDEX((offset_old),
252 ltt_buf)]);
253 if(reserve_count == atomic_read(
254 &ltt_buf->commit_count[SUBBUF_INDEX((offset_old), ltt_buf)])) {
255 ltt_deliver_callback(ltt_buf, SUBBUF_INDEX((offset_old), ltt_buf), NULL);
256 }
257 }
258
259 if(mode == FORCE_ACTIVE) {
260 /* New sub-buffer */
261 /* This code can be executed unordered : writers may already have written
262 to the sub-buffer before this code gets executed, caution. */
263 /* The commit makes sure that this code is executed before the deliver
264 of this sub-buffer */
265 ltt_buffer_begin_callback(ltt_buf, tsc, SUBBUF_INDEX(offset_begin, ltt_buf));
266 commit_count = atomic_add_return(ltt_subbuf_header_len(ltt_buf),
267 &ltt_buf->commit_count[SUBBUF_INDEX(offset_begin, ltt_buf)]);
268 /* Check if the written buffer has to be delivered */
269 if(commit_count == atomic_read(
270 &ltt_buf->reserve_count[SUBBUF_INDEX(offset_begin, ltt_buf)])) {
271 ltt_deliver_callback(ltt_buf, SUBBUF_INDEX(offset_begin, ltt_buf), NULL);
272 }
273 }
274
275 }
276
277 static inline int ltt_buffer_get(struct ltt_buf *ltt_buf,
278 unsigned int *offset)
279 {
280 unsigned int consumed_old, consumed_idx;
281 consumed_old = atomic_read(&ltt_buf->consumed);
282 consumed_idx = SUBBUF_INDEX(consumed_old, ltt_buf);
283
284 if(atomic_read(&ltt_buf->commit_count[consumed_idx])
285 != atomic_read(&ltt_buf->reserve_count[consumed_idx])) {
286 return -EAGAIN;
287 }
288 if((SUBBUF_TRUNC(atomic_read(&ltt_buf->offset), ltt_buf)
289 -SUBBUF_TRUNC(consumed_old, ltt_buf)) == 0) {
290 return -EAGAIN;
291 }
292
293 *offset = consumed_old;
294
295 return 0;
296 }
297
298 static inline int ltt_buffer_put(struct ltt_buf *ltt_buf,
299 unsigned int offset)
300 {
301 unsigned int consumed_old, consumed_new;
302 int ret;
303
304 consumed_old = offset;
305 consumed_new = SUBBUF_ALIGN(consumed_old, ltt_buf);
306 if(atomic_cmpxchg(&ltt_buf->consumed, consumed_old, consumed_new)
307 != consumed_old) {
308 /* We have been pushed by the writer : the last buffer read _is_
309 * corrupted!
310 * It can also happen if this is a buffer we never got. */
311 return -EIO;
312 } else {
313 if(atomic_read(&ltt_buf->full) == 1) {
314 /* tell the client that buffer is now unfull */
315 ret = futex((unsigned long)&ltt_buf->full,
316 FUTEX_WAKE, 1, 0, 0, 0);
317 if(ret != 1) {
318 printf("LTT warning : race condition : writer not waiting or too many writers\n");
319 }
320 atomic_set(&ltt_buf->full, 0);
321 }
322 }
323 }
324
325 static int read_subbuffer(struct ltt_buf *ltt_buf, int fd)
326 {
327 unsigned int consumed_old;
328 int err;
329 printf("LTT read buffer\n");
330
331
332 err = ltt_buffer_get(ltt_buf, &consumed_old);
333 if(err != 0) {
334 if(err != -EAGAIN) printf("LTT Reserving sub buffer failed\n");
335 goto get_error;
336 }
337
338 err = TEMP_FAILURE_RETRY(write(fd,
339 ltt_buf->start
340 + (consumed_old & ((ltt_buf->alloc_size)-1)),
341 ltt_buf->subbuf_size));
342
343 if(err < 0) {
344 perror("Error in writing to file");
345 goto write_error;
346 }
347 #if 0
348 err = fsync(pair->trace);
349 if(err < 0) {
350 ret = errno;
351 perror("Error in writing to file");
352 goto write_error;
353 }
354 #endif //0
355 write_error:
356 err = ltt_buffer_put(ltt_buf, consumed_old);
357
358 if(err != 0) {
359 if(err == -EIO) {
360 printf("Reader has been pushed by the writer, last subbuffer corrupted.\n");
361 /* FIXME : we may delete the last written buffer if we wish. */
362 }
363 goto get_error;
364 }
365
366 get_error:
367 return err;
368 }
369
370 /* This function is called by ltt_rw_init which has signals blocked */
371 static void ltt_usertrace_fast_daemon(struct ltt_trace_info *shared_trace_info,
372 sigset_t oldset, pid_t l_traced_pid, pthread_t l_traced_tid)
373 {
374 struct sigaction act;
375 int ret;
376 int fd_fac;
377 int fd_cpu;
378 char outfile_name[PATH_MAX];
379 char identifier_name[PATH_MAX];
380
381
382 traced_pid = l_traced_pid;
383 traced_tid = l_traced_tid;
384
385 printf("LTT ltt_usertrace_fast_daemon : init is %d, pid is %lu, traced_pid is %lu, traced_tid is %lu\n",
386 shared_trace_info->init, getpid(), traced_pid, traced_tid);
387
388 act.sa_handler = handler_sigusr1;
389 act.sa_flags = 0;
390 sigemptyset(&(act.sa_mask));
391 sigaddset(&(act.sa_mask), SIGUSR1);
392 sigaction(SIGUSR1, &act, NULL);
393
394 act.sa_handler = handler_sigusr2;
395 act.sa_flags = 0;
396 sigemptyset(&(act.sa_mask));
397 sigaddset(&(act.sa_mask), SIGUSR2);
398 sigaction(SIGUSR2, &act, NULL);
399
400 act.sa_handler = handler_sigalarm;
401 act.sa_flags = 0;
402 sigemptyset(&(act.sa_mask));
403 sigaddset(&(act.sa_mask), SIGALRM);
404 sigaction(SIGALRM, &act, NULL);
405
406 /* Enable signals */
407 ret = pthread_sigmask(SIG_SETMASK, &oldset, NULL);
408 if(ret) {
409 printf("LTT Error in pthread_sigmask\n");
410 }
411
412 alarm(3);
413
414 /* Open output files */
415 umask(00000);
416 ret = mkdir(LTT_USERTRACE_ROOT, 0777);
417 if(ret < 0 && errno != EEXIST) {
418 perror("LTT Error in creating output (mkdir)");
419 exit(-1);
420 }
421 ret = chdir(LTT_USERTRACE_ROOT);
422 if(ret < 0) {
423 perror("LTT Error in creating output (chdir)");
424 exit(-1);
425 }
426 snprintf(identifier_name, PATH_MAX-1, "%lu.%lu.%llu",
427 traced_tid, traced_pid, get_cycles());
428 snprintf(outfile_name, PATH_MAX-1, "facilities-%s", identifier_name);
429 fd_fac = creat(outfile_name, 0644);
430
431 snprintf(outfile_name, PATH_MAX-1, "cpu-%s", identifier_name);
432 fd_cpu = creat(outfile_name, 0644);
433
434
435 while(1) {
436 pause();
437 if(traced_pid == 0) break; /* parent died */
438 if(parent_exited) break;
439 printf("LTT Doing a buffer switch read. pid is : %lu\n", getpid());
440
441 do {
442 ret = read_subbuffer(&shared_trace_info->channel.cpu, fd_cpu);
443 } while(ret == 0);
444
445 do {
446 ret = read_subbuffer(&shared_trace_info->channel.facilities, fd_fac);
447 } while(ret == 0);
448 }
449
450 /* The parent thread is dead and we have finished with the buffer */
451
452 /* Buffer force switch (flush). Using FLUSH instead of ACTIVE because we know
453 * there is no writer. */
454 flush_buffer(&shared_trace_info->channel.cpu, FORCE_FLUSH);
455 do {
456 ret = read_subbuffer(&shared_trace_info->channel.cpu, fd_cpu);
457 } while(ret == 0);
458
459
460 flush_buffer(&shared_trace_info->channel.facilities, FORCE_FLUSH);
461 do {
462 ret = read_subbuffer(&shared_trace_info->channel.facilities, fd_fac);
463 } while(ret == 0);
464
465 close(fd_fac);
466 close(fd_cpu);
467
468 munmap(shared_trace_info, sizeof(*shared_trace_info));
469
470 exit(0);
471 }
472
473
474 /* Reader-writer initialization */
475
476 static enum ltt_process_role { LTT_ROLE_WRITER, LTT_ROLE_READER }
477 role = LTT_ROLE_WRITER;
478
479
480 void ltt_rw_init(void)
481 {
482 pid_t pid;
483 struct ltt_trace_info *shared_trace_info;
484 int ret;
485 sigset_t set, oldset;
486 pid_t l_traced_pid = getpid();
487 pid_t l_traced_tid = gettid();
488
489 /* parent : create the shared memory map */
490 shared_trace_info = mmap(0, sizeof(*thread_trace_info),
491 PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, 0, 0);
492 memset(shared_trace_info, 0, sizeof(*shared_trace_info));
493 /* Tricky semaphore : is in a shared memory space, so it's ok for a fast
494 * mutex (futex). */
495 atomic_set(&shared_trace_info->channel.facilities.full, 0);
496 shared_trace_info->channel.facilities.alloc_size = LTT_BUF_SIZE_FACILITIES;
497 shared_trace_info->channel.facilities.subbuf_size = LTT_SUBBUF_SIZE_FACILITIES;
498 shared_trace_info->channel.facilities.start =
499 shared_trace_info->channel.facilities_buf;
500 ltt_buffer_begin_callback(&shared_trace_info->channel.facilities,
501 ltt_get_timestamp(), 0);
502
503 atomic_set(&shared_trace_info->channel.cpu.full, 0);
504 shared_trace_info->channel.cpu.alloc_size = LTT_BUF_SIZE_CPU;
505 shared_trace_info->channel.cpu.subbuf_size = LTT_SUBBUF_SIZE_CPU;
506 shared_trace_info->channel.cpu.start = shared_trace_info->channel.cpu_buf;
507 ltt_buffer_begin_callback(&shared_trace_info->channel.cpu,
508 ltt_get_timestamp(), 0);
509
510 shared_trace_info->init = 1;
511
512 /* Disable signals */
513 ret = sigfillset(&set);
514 if(ret) {
515 printf("LTT Error in sigfillset\n");
516 }
517
518
519 ret = pthread_sigmask(SIG_BLOCK, &set, &oldset);
520 if(ret) {
521 printf("LTT Error in pthread_sigmask\n");
522 }
523
524 pid = fork();
525 if(pid > 0) {
526 /* Parent */
527 shared_trace_info->daemon_id = pid;
528 thread_trace_info = shared_trace_info;
529
530 /* Enable signals */
531 ret = pthread_sigmask(SIG_SETMASK, &oldset, NULL);
532 if(ret) {
533 printf("LTT Error in pthread_sigmask\n");
534 }
535 } else if(pid == 0) {
536 /* Child */
537 role = LTT_ROLE_READER;
538 ltt_usertrace_fast_daemon(shared_trace_info, oldset, l_traced_pid,
539 l_traced_tid);
540 /* Should never return */
541 exit(-1);
542 } else if(pid < 0) {
543 /* fork error */
544 perror("LTT Error in forking ltt-usertrace-fast");
545 }
546 }
547
548 static __thread struct _pthread_cleanup_buffer cleanup_buffer;
549
550 void ltt_thread_init(void)
551 {
552 _pthread_cleanup_push(&cleanup_buffer, ltt_usertrace_fast_cleanup, NULL);
553 ltt_rw_init();
554 }
555
556 void __attribute__((constructor)) __ltt_usertrace_fast_init(void)
557 {
558 printf("LTT usertrace-fast init\n");
559
560 ltt_rw_init();
561 }
562
563 void __attribute__((destructor)) __ltt_usertrace_fast_fini(void)
564 {
565 if(role == LTT_ROLE_WRITER) {
566 printf("LTT usertrace-fast fini\n");
567 ltt_usertrace_fast_cleanup(NULL);
568 }
569 }
570
This page took 0.040589 seconds and 4 git commands to generate.