reader starts to work
[lttv.git] / usertrace-fast / ltt-usertrace-fast.c
1 /* LTTng user-space "fast" library
2 *
3 * This daemon is spawned by each traced thread (to share the mmap).
4 *
5 * Its job is to dump periodically this buffer to disk (when it receives a
6 * SIGUSR1 from its parent).
7 *
8 * It uses the control information in the shared memory area (producer/consumer
9 * count).
10 *
11 * When the parent thread dies (yes, those thing may happen) ;) , this daemon
12 * will flush the last buffer and write it to disk.
13 *
14 * Supplement note for streaming : the daemon is responsible for flushing
15 * periodically the buffer if it is streaming data.
16 *
17 *
18 * Notes :
19 * shm memory is typically limited to 4096 units (system wide limit SHMMNI in
20 * /proc/sys/kernel/shmmni). As it requires computation time upon creation, we
21 * do not use it : we will use a shared mmap() instead which is passed through
22 * the fork().
23 * MAP_SHARED mmap segment. Updated when msync or munmap are called.
24 * MAP_ANONYMOUS.
25 * Memory mapped by mmap() is preserved across fork(2), with the same
26 * attributes.
27 *
28 * Eventually, there will be two mode :
29 * * Slow thread spawn : a fork() is done for each new thread. If the process
30 * dies, the data is not lost.
31 * * Fast thread spawn : a pthread_create() is done by the application for each
32 * new thread.
33 *
34 * We use a timer to check periodically if the parent died. I think it is less
35 * intrusive than a ptrace() on the parent, which would get every signal. The
36 * side effect of this is that we won't be notified if the parent does an
37 * exec(). In this case, we will just sit there until the parent exits.
38 *
39 *
40 * Copyright 2006 Mathieu Desnoyers
41 *
42 */
43
44 #define _GNU_SOURCE
45 #include <sys/types.h>
46 #include <sys/wait.h>
47 #include <unistd.h>
48 #include <stdlib.h>
49 #include <stdio.h>
50 #include <signal.h>
51 #include <syscall.h>
52 #include <features.h>
53 #include <pthread.h>
54 #include <malloc.h>
55 #include <string.h>
56 #include <sys/mman.h>
57 #include <signal.h>
58 #include <sys/stat.h>
59 #include <fcntl.h>
60 #include <stdlib.h>
61 #include <sys/param.h>
62 #include <sys/time.h>
63
64 #include <asm/atomic.h>
65 #include <asm/timex.h> //for get_cycles()
66
67 #include "ltt-usertrace-fast.h"
68
69 enum force_switch_mode { FORCE_ACTIVE, FORCE_FLUSH };
70
71 /* Writer (the traced application) */
72
73 __thread struct ltt_trace_info *thread_trace_info = NULL;
74
75 void ltt_usertrace_fast_buffer_switch(void)
76 {
77 struct ltt_trace_info *tmp = thread_trace_info;
78 if(tmp)
79 kill(tmp->daemon_id, SIGUSR1);
80 }
81
82 /* The cleanup should never be called from a signal handler */
83 static void ltt_usertrace_fast_cleanup(void *arg)
84 {
85 struct ltt_trace_info *tmp = thread_trace_info;
86 if(tmp) {
87 thread_trace_info = NULL;
88 kill(tmp->daemon_id, SIGUSR2);
89 munmap(tmp, sizeof(*tmp));
90 }
91 }
92
93 /* Reader (the disk dumper daemon) */
94
95 static pid_t traced_pid = 0;
96 static pthread_t traced_thread = 0;
97 static int parent_exited = 0;
98
99 /* signal handling */
100 static void handler_sigusr1(int signo)
101 {
102 printf("LTT Signal %d received : parent buffer switch.\n", signo);
103 }
104
105 static void handler_sigusr2(int signo)
106 {
107 printf("LTT Signal %d received : parent exited.\n", signo);
108 parent_exited = 1;
109 }
110
111 static void handler_sigalarm(int signo)
112 {
113 printf("LTT Signal %d received\n", signo);
114
115 if(getppid() != traced_pid) {
116 /* Parent died */
117 printf("LTT Parent %lu died, cleaning up\n", traced_pid);
118 traced_pid = 0;
119 }
120 alarm(3);
121 }
122
123 /* Do a buffer switch. Don't switch if buffer is completely empty */
124 static void flush_buffer(struct ltt_buf *ltt_buf, enum force_switch_mode mode)
125 {
126 uint64_t tsc;
127 int offset_begin, offset_end, offset_old;
128 int reserve_commit_diff;
129 int consumed_old, consumed_new;
130 int commit_count, reserve_count;
131 int end_switch_old;
132
133 do {
134 offset_old = atomic_read(&ltt_buf->offset);
135 offset_begin = offset_old;
136 end_switch_old = 0;
137 tsc = ltt_get_timestamp();
138 if(tsc == 0) {
139 /* Error in getting the timestamp : should not happen : it would
140 * mean we are called from an NMI during a write seqlock on xtime. */
141 return;
142 }
143
144 if(SUBBUF_OFFSET(offset_begin, ltt_buf) != 0) {
145 offset_begin = SUBBUF_ALIGN(offset_begin, ltt_buf);
146 end_switch_old = 1;
147 } else {
148 /* we do not have to switch : buffer is empty */
149 return;
150 }
151 if(mode == FORCE_ACTIVE)
152 offset_begin += ltt_subbuf_header_len(ltt_buf);
153 /* Always begin_switch in FORCE_ACTIVE mode */
154
155 /* Test new buffer integrity */
156 reserve_commit_diff =
157 atomic_read(
158 &ltt_buf->reserve_count[SUBBUF_INDEX(offset_begin, ltt_buf)])
159 - atomic_read(
160 &ltt_buf->commit_count[SUBBUF_INDEX(offset_begin, ltt_buf)]);
161 if(reserve_commit_diff == 0) {
162 /* Next buffer not corrupted. */
163 if(mode == FORCE_ACTIVE
164 && (offset_begin-atomic_read(&ltt_buf->consumed))
165 >= ltt_buf->alloc_size) {
166 /* We do not overwrite non consumed buffers and we are full : ignore
167 switch while tracing is active. */
168 return;
169 }
170 } else {
171 /* Next subbuffer corrupted. Force pushing reader even in normal mode */
172 }
173
174 offset_end = offset_begin;
175 } while(atomic_cmpxchg(&ltt_buf->offset, offset_old, offset_end)
176 != offset_old);
177
178
179 if(mode == FORCE_ACTIVE) {
180 /* Push the reader if necessary */
181 do {
182 consumed_old = atomic_read(&ltt_buf->consumed);
183 /* If buffer is in overwrite mode, push the reader consumed count if
184 the write position has reached it and we are not at the first
185 iteration (don't push the reader farther than the writer).
186 This operation can be done concurrently by many writers in the
187 same buffer, the writer being at the fartest write position sub-buffer
188 index in the buffer being the one which will win this loop. */
189 /* If the buffer is not in overwrite mode, pushing the reader only
190 happen if a sub-buffer is corrupted */
191 if((SUBBUF_TRUNC(offset_end, ltt_buf)
192 - SUBBUF_TRUNC(consumed_old, ltt_buf))
193 >= ltt_buf->alloc_size)
194 consumed_new = SUBBUF_ALIGN(consumed_old, ltt_buf);
195 else {
196 consumed_new = consumed_old;
197 break;
198 }
199 } while(atomic_cmpxchg(&ltt_buf->consumed, consumed_old, consumed_new)
200 != consumed_old);
201
202 if(consumed_old != consumed_new) {
203 /* Reader pushed : we are the winner of the push, we can therefore
204 reequilibrate reserve and commit. Atomic increment of the commit
205 count permits other writers to play around with this variable
206 before us. We keep track of corrupted_subbuffers even in overwrite
207 mode :
208 we never want to write over a non completely committed sub-buffer :
209 possible causes : the buffer size is too low compared to the unordered
210 data input, or there is a writer who died between the reserve and the
211 commit. */
212 if(reserve_commit_diff) {
213 /* We have to alter the sub-buffer commit count : a sub-buffer is
214 corrupted */
215 atomic_add(reserve_commit_diff,
216 &ltt_buf->commit_count[SUBBUF_INDEX(offset_begin, ltt_buf)]);
217 atomic_inc(&ltt_buf->corrupted_subbuffers);
218 }
219 }
220 }
221
222 /* Always switch */
223
224 if(end_switch_old) {
225 /* old subbuffer */
226 /* Concurrency safe because we are the last and only thread to alter this
227 sub-buffer. As long as it is not delivered and read, no other thread can
228 alter the offset, alter the reserve_count or call the
229 client_buffer_end_callback on this sub-buffer.
230 The only remaining threads could be the ones with pending commits. They
231 will have to do the deliver themself.
232 Not concurrency safe in overwrite mode. We detect corrupted subbuffers with
233 commit and reserve counts. We keep a corrupted sub-buffers count and push
234 the readers across these sub-buffers.
235 Not concurrency safe if a writer is stalled in a subbuffer and
236 another writer switches in, finding out it's corrupted. The result will be
237 than the old (uncommited) subbuffer will be declared corrupted, and that
238 the new subbuffer will be declared corrupted too because of the commit
239 count adjustment.
240 Offset old should never be 0. */
241 ltt_buffer_end_callback(ltt_buf, tsc, offset_old,
242 SUBBUF_INDEX((offset_old), ltt_buf));
243 /* Setting this reserve_count will allow the sub-buffer to be delivered by
244 the last committer. */
245 reserve_count = atomic_add_return((SUBBUF_OFFSET((offset_old-1),
246 ltt_buf) + 1),
247 &ltt_buf->reserve_count[SUBBUF_INDEX((offset_old),
248 ltt_buf)]);
249 if(reserve_count == atomic_read(
250 &ltt_buf->commit_count[SUBBUF_INDEX((offset_old), ltt_buf)])) {
251 ltt_deliver_callback(ltt_buf, SUBBUF_INDEX((offset_old), ltt_buf), NULL);
252 }
253 }
254
255 if(mode == FORCE_ACTIVE) {
256 /* New sub-buffer */
257 /* This code can be executed unordered : writers may already have written
258 to the sub-buffer before this code gets executed, caution. */
259 /* The commit makes sure that this code is executed before the deliver
260 of this sub-buffer */
261 ltt_buffer_begin_callback(ltt_buf, tsc, SUBBUF_INDEX(offset_begin, ltt_buf));
262 commit_count = atomic_add_return(ltt_subbuf_header_len(ltt_buf),
263 &ltt_buf->commit_count[SUBBUF_INDEX(offset_begin, ltt_buf)]);
264 /* Check if the written buffer has to be delivered */
265 if(commit_count == atomic_read(
266 &ltt_buf->reserve_count[SUBBUF_INDEX(offset_begin, ltt_buf)])) {
267 ltt_deliver_callback(ltt_buf, SUBBUF_INDEX(offset_begin, ltt_buf), NULL);
268 }
269 }
270
271 }
272
273 static inline int ltt_buffer_get(struct ltt_buf *ltt_buf,
274 unsigned int *offset)
275 {
276 unsigned int consumed_old, consumed_idx;
277 consumed_old = atomic_read(&ltt_buf->consumed);
278 consumed_idx = SUBBUF_INDEX(consumed_old, ltt_buf);
279
280 if(atomic_read(&ltt_buf->commit_count[consumed_idx])
281 != atomic_read(&ltt_buf->reserve_count[consumed_idx])) {
282 return -EAGAIN;
283 }
284 if((SUBBUF_TRUNC(atomic_read(&ltt_buf->offset), ltt_buf)
285 -SUBBUF_TRUNC(consumed_old, ltt_buf)) == 0) {
286 return -EAGAIN;
287 }
288
289 *offset = consumed_old;
290
291 return 0;
292 }
293
294 static inline int ltt_buffer_put(struct ltt_buf *ltt_buf,
295 unsigned int offset)
296 {
297 unsigned int consumed_old, consumed_new;
298 int ret;
299
300 consumed_old = offset;
301 consumed_new = SUBBUF_ALIGN(consumed_old, ltt_buf);
302 if(atomic_cmpxchg(&ltt_buf->consumed, consumed_old, consumed_new)
303 != consumed_old) {
304 /* We have been pushed by the writer : the last buffer read _is_
305 * corrupted!
306 * It can also happen if this is a buffer we never got. */
307 return -EIO;
308 } else {
309 if(atomic_read(&ltt_buf->full) == 1) {
310 /* tell the client that buffer is now unfull */
311 ret = futex((unsigned long)&ltt_buf->full,
312 FUTEX_WAKE, 1, 0, 0, 0);
313 if(ret != 1) {
314 printf("LTT warning : race condition : writer not waiting or too many writers\n");
315 }
316 atomic_set(&ltt_buf->full, 0);
317 }
318 }
319 }
320
321 /* In the writer :
322 *
323 * if(buffer full condition) {
324 * put myself in the wait queue
325 * ltt_buf->full = 1;
326 * schedule
327 * }
328 *{
329 if(buffer_is_full) {
330 atomic_set(&ltt_buf->full, 1);
331 ret = do_futex((unsigned long)&ltt_buf->full, 1, 0, 0, 0);
332 }
333 }
334
335 */
336
337 static int read_subbuffer(struct ltt_buf *ltt_buf, int fd)
338 {
339 unsigned int consumed_old;
340 int err;
341 printf("LTT read buffer\n");
342
343
344 err = ltt_buffer_get(ltt_buf, &consumed_old);
345 if(err != -EAGAIN && err != 0) {
346 printf("LTT Reserving sub buffer failed\n");
347 goto get_error;
348 }
349
350 err = TEMP_FAILURE_RETRY(write(fd,
351 ltt_buf->start
352 + (consumed_old & ((ltt_buf->alloc_size)-1)),
353 ltt_buf->subbuf_size));
354
355 if(err < 0) {
356 perror("Error in writing to file");
357 goto write_error;
358 }
359 #if 0
360 err = fsync(pair->trace);
361 if(err < 0) {
362 ret = errno;
363 perror("Error in writing to file");
364 goto write_error;
365 }
366 #endif //0
367 write_error:
368 err = ltt_buffer_put(ltt_buf, consumed_old);
369
370 if(err != 0) {
371 if(err == -EIO) {
372 printf("Reader has been pushed by the writer, last subbuffer corrupted.\n");
373 /* FIXME : we may delete the last written buffer if we wish. */
374 }
375 goto get_error;
376 }
377
378 get_error:
379 return err;
380 }
381
382 /* This function is called by ltt_rw_init which has signals blocked */
383 static void ltt_usertrace_fast_daemon(struct ltt_trace_info *shared_trace_info,
384 sigset_t oldset, pid_t l_traced_pid, pthread_t l_traced_thread)
385 {
386 struct sigaction act;
387 int ret;
388 int fd_fac;
389 int fd_cpu;
390 char outfile_name[PATH_MAX];
391 char identifier_name[PATH_MAX];
392
393
394 traced_pid = l_traced_pid;
395 traced_thread = l_traced_thread;
396
397 printf("LTT ltt_usertrace_fast_daemon : init is %d, pid is %lu, traced_pid is %lu\n",
398 shared_trace_info->init, getpid(), traced_pid);
399
400 act.sa_handler = handler_sigusr1;
401 act.sa_flags = 0;
402 sigemptyset(&(act.sa_mask));
403 sigaddset(&(act.sa_mask), SIGUSR1);
404 sigaction(SIGUSR1, &act, NULL);
405
406 act.sa_handler = handler_sigusr2;
407 act.sa_flags = 0;
408 sigemptyset(&(act.sa_mask));
409 sigaddset(&(act.sa_mask), SIGUSR2);
410 sigaction(SIGUSR2, &act, NULL);
411
412 act.sa_handler = handler_sigalarm;
413 act.sa_flags = 0;
414 sigemptyset(&(act.sa_mask));
415 sigaddset(&(act.sa_mask), SIGALRM);
416 sigaction(SIGALRM, &act, NULL);
417
418 /* Enable signals */
419 ret = pthread_sigmask(SIG_SETMASK, &oldset, NULL);
420 if(ret) {
421 printf("LTT Error in pthread_sigmask\n");
422 }
423
424 alarm(3);
425
426 /* Open output files */
427 umask(00000);
428 ret = mkdir(LTT_USERTRACE_ROOT, 0777);
429 if(ret < 0 && errno != EEXIST) {
430 perror("LTT Error in creating output (mkdir)");
431 exit(-1);
432 }
433 ret = chdir(LTT_USERTRACE_ROOT);
434 if(ret < 0) {
435 perror("LTT Error in creating output (chdir)");
436 exit(-1);
437 }
438 snprintf(identifier_name, PATH_MAX-1, "%lu.%lu.%llu",
439 traced_pid, traced_thread, get_cycles());
440 snprintf(outfile_name, PATH_MAX-1, "facilities-%s", identifier_name);
441 fd_fac = creat(outfile_name, 0644);
442
443 snprintf(outfile_name, PATH_MAX-1, "cpu-%s", identifier_name);
444 fd_cpu = creat(outfile_name, 0644);
445
446
447 while(1) {
448 pause();
449 if(traced_pid == 0) break; /* parent died */
450 if(parent_exited) break;
451 printf("LTT Doing a buffer switch read. pid is : %lu\n", getpid());
452
453 do {
454 ret = read_subbuffer(&shared_trace_info->channel.cpu, fd_cpu);
455 } while(ret == 0);
456
457 do {
458 ret = read_subbuffer(&shared_trace_info->channel.facilities, fd_fac);
459 } while(ret == 0);
460 }
461
462 /* The parent thread is dead and we have finished with the buffer */
463
464 /* Buffer force switch (flush). Using FLUSH instead of ACTIVE because we know
465 * there is no writer. */
466 flush_buffer(&shared_trace_info->channel.cpu, FORCE_FLUSH);
467 do {
468 ret = read_subbuffer(&shared_trace_info->channel.cpu, fd_cpu);
469 } while(ret == 0);
470
471
472 flush_buffer(&shared_trace_info->channel.facilities, FORCE_FLUSH);
473 do {
474 ret = read_subbuffer(&shared_trace_info->channel.facilities, fd_fac);
475 } while(ret == 0);
476
477 close(fd_fac);
478 close(fd_cpu);
479
480 munmap(shared_trace_info, sizeof(*shared_trace_info));
481
482 exit(0);
483 }
484
485
486 /* Reader-writer initialization */
487
488 static enum ltt_process_role { LTT_ROLE_WRITER, LTT_ROLE_READER }
489 role = LTT_ROLE_WRITER;
490
491
492 void ltt_rw_init(void)
493 {
494 pid_t pid;
495 struct ltt_trace_info *shared_trace_info;
496 int ret;
497 sigset_t set, oldset;
498 pid_t l_traced_pid = getpid();
499 pthread_t l_traced_thread = pthread_self();
500
501 /* parent : create the shared memory map */
502 shared_trace_info = mmap(0, sizeof(*thread_trace_info),
503 PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, 0, 0);
504 memset(shared_trace_info, 0, sizeof(*shared_trace_info));
505 /* Tricky semaphore : is in a shared memory space, so it's ok for a fast
506 * mutex (futex). */
507 atomic_set(&shared_trace_info->channel.facilities.full, 0);
508 shared_trace_info->channel.facilities.alloc_size = LTT_BUF_SIZE_FACILITIES;
509 shared_trace_info->channel.facilities.subbuf_size = LTT_SUBBUF_SIZE_FACILITIES;
510 shared_trace_info->channel.facilities.start =
511 shared_trace_info->channel.facilities_buf;
512
513 atomic_set(&shared_trace_info->channel.cpu.full, 0);
514 shared_trace_info->channel.cpu.alloc_size = LTT_BUF_SIZE_CPU;
515 shared_trace_info->channel.cpu.subbuf_size = LTT_SUBBUF_SIZE_CPU;
516 shared_trace_info->channel.cpu.start = shared_trace_info->channel.cpu_buf;
517 shared_trace_info->init = 1;
518
519 /* Disable signals */
520 ret = sigfillset(&set);
521 if(ret) {
522 printf("LTT Error in sigfillset\n");
523 }
524
525
526 ret = pthread_sigmask(SIG_BLOCK, &set, &oldset);
527 if(ret) {
528 printf("LTT Error in pthread_sigmask\n");
529 }
530
531 pid = fork();
532 if(pid > 0) {
533 /* Parent */
534 shared_trace_info->daemon_id = pid;
535 thread_trace_info = shared_trace_info;
536
537 /* Enable signals */
538 ret = pthread_sigmask(SIG_SETMASK, &oldset, NULL);
539 if(ret) {
540 printf("LTT Error in pthread_sigmask\n");
541 }
542 } else if(pid == 0) {
543 /* Child */
544 role = LTT_ROLE_READER;
545 ltt_usertrace_fast_daemon(shared_trace_info, oldset, l_traced_pid,
546 l_traced_thread);
547 /* Should never return */
548 exit(-1);
549 } else if(pid < 0) {
550 /* fork error */
551 perror("LTT Error in forking ltt-usertrace-fast");
552 }
553 }
554
555 static __thread struct _pthread_cleanup_buffer cleanup_buffer;
556
557 void ltt_thread_init(void)
558 {
559 _pthread_cleanup_push(&cleanup_buffer, ltt_usertrace_fast_cleanup, NULL);
560 ltt_rw_init();
561 }
562
563 void __attribute__((constructor)) __ltt_usertrace_fast_init(void)
564 {
565 printf("LTT usertrace-fast init\n");
566
567 ltt_rw_init();
568 }
569
570 void __attribute__((destructor)) __ltt_usertrace_fast_fini(void)
571 {
572 if(role == LTT_ROLE_WRITER) {
573 printf("LTT usertrace-fast fini\n");
574 ltt_usertrace_fast_cleanup(NULL);
575 }
576 }
577
This page took 0.049465 seconds and 4 git commands to generate.