d2a9712a20c1ca78cbe99cc6be7337be9c602e31
[ust.git] / ustd / ustd.c
1 /* Copyright (C) 2009 Pierre-Marc Fournier
2 *
3 * This library is free software; you can redistribute it and/or
4 * modify it under the terms of the GNU Lesser General Public
5 * License as published by the Free Software Foundation; either
6 * version 2.1 of the License, or (at your option) any later version.
7 *
8 * This library is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 * Lesser General Public License for more details.
12 *
13 * You should have received a copy of the GNU Lesser General Public
14 * License along with this library; if not, write to the Free Software
15 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17
18 #define _GNU_SOURCE
19
20 #include <sys/types.h>
21 #include <sys/stat.h>
22 #include <sys/shm.h>
23 #include <fcntl.h>
24 #include <unistd.h>
25 #include <pthread.h>
26 #include <signal.h>
27
28 #include <stdlib.h>
29 #include <stdio.h>
30 #include <string.h>
31 #include <errno.h>
32 #include <assert.h>
33 #include <getopt.h>
34
35 #include "ustd.h"
36 #include "usterr.h"
37 #include "ustcomm.h"
38
39 /* return value: 0 = subbuffer is finished, it won't produce data anymore
40 * 1 = got subbuffer successfully
41 * <0 = error
42 */
43
44 #define GET_SUBBUF_OK 1
45 #define GET_SUBBUF_DONE 0
46 #define GET_SUBBUF_DIED 2
47
48 #define PUT_SUBBUF_OK 1
49 #define PUT_SUBBUF_DIED 0
50 #define PUT_SUBBUF_PUSHED 2
51 #define PUT_SUBBUF_DONE 3
52
53 char *sock_path=NULL;
54 char *trace_path=NULL;
55 int daemon_mode = 0;
56 char *pidfile = NULL;
57
58 /* Number of active buffers and the mutex to protect it. */
59 int active_buffers = 0;
60 pthread_mutex_t active_buffers_mutex = PTHREAD_MUTEX_INITIALIZER;
61 /* Whether a request to end the program was received. */
62 sig_atomic_t terminate_req = 0;
63
64 int get_subbuffer(struct buffer_info *buf)
65 {
66 char *send_msg=NULL;
67 char *received_msg=NULL;
68 char *rep_code=NULL;
69 int retval;
70 int result;
71
72 asprintf(&send_msg, "get_subbuffer %s", buf->name);
73 result = ustcomm_send_request(&buf->conn, send_msg, &received_msg);
74 if((result == -1 && errno == EPIPE) || result == 0) {
75 DBG("app died while being traced");
76 retval = GET_SUBBUF_DIED;
77 goto end;
78 }
79 else if(result < 0) {
80 ERR("get_subbuffer: ustcomm_send_request failed");
81 retval = -1;
82 goto end;
83 }
84
85 result = sscanf(received_msg, "%as %ld", &rep_code, &buf->consumed_old);
86 if(result != 2 && result != 1) {
87 ERR("unable to parse response to get_subbuffer");
88 retval = -1;
89 goto end_rep;
90 }
91
92 DBG("received msg is %s", received_msg);
93
94 if(!strcmp(rep_code, "OK")) {
95 DBG("got subbuffer %s", buf->name);
96 retval = GET_SUBBUF_OK;
97 }
98 else if(nth_token_is(received_msg, "END", 0) == 1) {
99 retval = GET_SUBBUF_DONE;
100 goto end_rep;
101 }
102 else if(!strcmp(received_msg, "NOTFOUND")) {
103 WARN("For buffer %s, the trace was not found. This likely means it was destroyed by the user.", buf->name);
104 retval = GET_SUBBUF_DONE;
105 goto end_rep;
106 }
107 else {
108 DBG("error getting subbuffer %s", buf->name);
109 retval = -1;
110 }
111
112 /* FIMXE: free correctly the stuff */
113 end_rep:
114 if(rep_code)
115 free(rep_code);
116 end:
117 if(send_msg)
118 free(send_msg);
119 if(received_msg)
120 free(received_msg);
121
122 return retval;
123 }
124
125 int put_subbuffer(struct buffer_info *buf)
126 {
127 char *send_msg=NULL;
128 char *received_msg=NULL;
129 char *rep_code=NULL;
130 int retval;
131 int result;
132
133 asprintf(&send_msg, "put_subbuffer %s %ld", buf->name, buf->consumed_old);
134 result = ustcomm_send_request(&buf->conn, send_msg, &received_msg);
135 if(result < 0 && errno == ECONNRESET) {
136 retval = PUT_SUBBUF_DIED;
137 goto end;
138 }
139 else if(result < 0) {
140 ERR("put_subbuffer: send_message failed");
141 retval = -1;
142 goto end;
143 }
144 else if(result == 0) {
145 /* Program seems finished. However this might not be
146 * the last subbuffer that has to be collected.
147 */
148 retval = PUT_SUBBUF_DIED;
149 goto end;
150 }
151
152 result = sscanf(received_msg, "%as", &rep_code);
153 if(result != 1) {
154 ERR("unable to parse response to put_subbuffer");
155 retval = -1;
156 goto end_rep;
157 }
158
159 if(!strcmp(rep_code, "OK")) {
160 DBG("subbuffer put %s", buf->name);
161 retval = PUT_SUBBUF_OK;
162 }
163 else if(!strcmp(received_msg, "NOTFOUND")) {
164 WARN("For buffer %s, the trace was not found. This likely means it was destroyed by the user.", buf->name);
165 /* However, maybe this was not the last subbuffer. So
166 * we return the program died.
167 */
168 retval = PUT_SUBBUF_DIED;
169 goto end_rep;
170 }
171 else {
172 DBG("put_subbuffer: received error, we were pushed");
173 retval = PUT_SUBBUF_PUSHED;
174 goto end_rep;
175 }
176
177 end_rep:
178 if(rep_code)
179 free(rep_code);
180
181 end:
182 if(send_msg)
183 free(send_msg);
184 if(received_msg)
185 free(received_msg);
186
187 return retval;
188 }
189
190 void decrement_active_buffers(void *arg)
191 {
192 pthread_mutex_lock(&active_buffers_mutex);
193 active_buffers--;
194 pthread_mutex_unlock(&active_buffers_mutex);
195 }
196
197 int create_dir_if_needed(char *dir)
198 {
199 int result;
200 result = mkdir(dir, 0777);
201 if(result == -1) {
202 if(errno != EEXIST) {
203 PERROR("mkdir");
204 return -1;
205 }
206 }
207
208 return 0;
209 }
210
211 int is_directory(const char *dir)
212 {
213 int result;
214 struct stat st;
215
216 result = stat(dir, &st);
217 if(result == -1) {
218 PERROR("stat");
219 return 0;
220 }
221
222 if(!S_ISDIR(st.st_mode)) {
223 return 0;
224 }
225
226 return 1;
227 }
228
229 struct buffer_info *connect_buffer(pid_t pid, const char *bufname)
230 {
231 struct buffer_info *buf;
232 char *send_msg;
233 char *received_msg;
234 int result;
235 char *tmp;
236 int fd;
237 struct shmid_ds shmds;
238
239 buf = (struct buffer_info *) malloc(sizeof(struct buffer_info));
240 if(buf == NULL) {
241 ERR("add_buffer: insufficient memory");
242 return NULL;
243 }
244
245 buf->name = bufname;
246 buf->pid = pid;
247
248 /* connect to app */
249 result = ustcomm_connect_app(buf->pid, &buf->conn);
250 if(result) {
251 WARN("unable to connect to process, it probably died before we were able to connect");
252 return NULL;
253 }
254
255 /* get pidunique */
256 asprintf(&send_msg, "get_pidunique");
257 result = ustcomm_send_request(&buf->conn, send_msg, &received_msg);
258 free(send_msg);
259 if(result == -1) {
260 ERR("problem in ustcomm_send_request(get_pidunique)");
261 return NULL;
262 }
263 if(result == 0) {
264 goto error;
265 }
266
267 result = sscanf(received_msg, "%lld", &buf->pidunique);
268 if(result != 1) {
269 ERR("unable to parse response to get_pidunique");
270 return NULL;
271 }
272 free(received_msg);
273 DBG("got pidunique %lld", buf->pidunique);
274
275 /* get shmid */
276 asprintf(&send_msg, "get_shmid %s", buf->name);
277 result = ustcomm_send_request(&buf->conn, send_msg, &received_msg);
278 free(send_msg);
279 if(result == -1) {
280 ERR("problem in ustcomm_send_request(get_shmid)");
281 return NULL;
282 }
283 if(result == 0) {
284 goto error;
285 }
286
287 result = sscanf(received_msg, "%d %d", &buf->shmid, &buf->bufstruct_shmid);
288 if(result != 2) {
289 ERR("unable to parse response to get_shmid (\"%s\")", received_msg);
290 return NULL;
291 }
292 free(received_msg);
293 DBG("got shmids %d %d", buf->shmid, buf->bufstruct_shmid);
294
295 /* get n_subbufs */
296 asprintf(&send_msg, "get_n_subbufs %s", buf->name);
297 result = ustcomm_send_request(&buf->conn, send_msg, &received_msg);
298 free(send_msg);
299 if(result == -1) {
300 ERR("problem in ustcomm_send_request(g_n_subbufs)");
301 return NULL;
302 }
303 if(result == 0) {
304 goto error;
305 }
306
307 result = sscanf(received_msg, "%d", &buf->n_subbufs);
308 if(result != 1) {
309 ERR("unable to parse response to get_n_subbufs");
310 return NULL;
311 }
312 free(received_msg);
313 DBG("got n_subbufs %d", buf->n_subbufs);
314
315 /* get subbuf size */
316 asprintf(&send_msg, "get_subbuf_size %s", buf->name);
317 result = ustcomm_send_request(&buf->conn, send_msg, &received_msg);
318 free(send_msg);
319 if(result == -1) {
320 ERR("problem in ustcomm_send_request(get_subbuf_size)");
321 return NULL;
322 }
323 if(result == 0) {
324 goto error;
325 }
326
327 result = sscanf(received_msg, "%d", &buf->subbuf_size);
328 if(result != 1) {
329 ERR("unable to parse response to get_subbuf_size");
330 return NULL;
331 }
332 free(received_msg);
333 DBG("got subbuf_size %d", buf->subbuf_size);
334
335 /* attach memory */
336 buf->mem = shmat(buf->shmid, NULL, 0);
337 if(buf->mem == (void *) 0) {
338 PERROR("shmat");
339 return NULL;
340 }
341 DBG("successfully attached buffer memory");
342
343 buf->bufstruct_mem = shmat(buf->bufstruct_shmid, NULL, 0);
344 if(buf->bufstruct_mem == (void *) 0) {
345 PERROR("shmat");
346 return NULL;
347 }
348 DBG("successfully attached buffer bufstruct memory");
349
350 /* obtain info on the memory segment */
351 result = shmctl(buf->shmid, IPC_STAT, &shmds);
352 if(result == -1) {
353 PERROR("shmctl");
354 return NULL;
355 }
356 buf->memlen = shmds.shm_segsz;
357
358 /* open file for output */
359 if(!trace_path) {
360 /* Only create the directory if using the default path, because
361 * of the risk of typo when using trace path override. We don't
362 * want to risk creating plenty of useless directories in that case.
363 */
364 result = create_dir_if_needed(USTD_DEFAULT_TRACE_PATH);
365 if(result == -1) {
366 ERR("could not create directory %s", USTD_DEFAULT_TRACE_PATH);
367 return NULL;
368 }
369
370 trace_path = USTD_DEFAULT_TRACE_PATH;
371 }
372
373 asprintf(&tmp, "%s/%u_%lld", trace_path, buf->pid, buf->pidunique);
374 result = create_dir_if_needed(tmp);
375 if(result == -1) {
376 ERR("could not create directory %s", tmp);
377 free(tmp);
378 return NULL;
379 }
380 free(tmp);
381
382 asprintf(&tmp, "%s/%u_%lld/%s", trace_path, buf->pid, buf->pidunique, buf->name);
383 result = fd = open(tmp, O_WRONLY | O_CREAT | O_TRUNC | O_EXCL, 00600);
384 if(result == -1) {
385 PERROR("open");
386 ERR("failed opening trace file %s", tmp);
387 return NULL;
388 }
389 buf->file_fd = fd;
390 free(tmp);
391
392 pthread_mutex_lock(&active_buffers_mutex);
393 active_buffers++;
394 pthread_mutex_unlock(&active_buffers_mutex);
395
396 return buf;
397
398 error:
399 free(buf);
400 return NULL;
401 }
402
403 int write_current_subbuffer(struct buffer_info *buf)
404 {
405 int result;
406
407 void *subbuf_mem = buf->mem + (buf->consumed_old & (buf->n_subbufs * buf->subbuf_size-1));
408
409 size_t cur_sb_size = subbuffer_data_size(subbuf_mem);
410
411 result = patient_write(buf->file_fd, subbuf_mem, cur_sb_size);
412 if(result == -1) {
413 PERROR("write");
414 /* FIXME: maybe drop this trace */
415 return 0;
416 }
417
418 return 0;
419 }
420
421 int consumer_loop(struct buffer_info *buf)
422 {
423 int result;
424
425 pthread_cleanup_push(decrement_active_buffers, NULL);
426
427 for(;;) {
428 /* get the subbuffer */
429 result = get_subbuffer(buf);
430 if(result == -1) {
431 ERR("error getting subbuffer");
432 continue;
433 }
434 else if(result == GET_SUBBUF_DONE) {
435 /* this is done */
436 break;
437 }
438 else if(result == GET_SUBBUF_DIED) {
439 finish_consuming_dead_subbuffer(buf);
440 break;
441 }
442
443 /* write data to file */
444 write_current_subbuffer(buf);
445 /* FIXME: handle return value? */
446
447 /* put the subbuffer */
448 /* FIXME: we actually should unput the buffer before consuming... */
449 result = put_subbuffer(buf);
450 if(result == -1) {
451 ERR("unknown error putting subbuffer (channel=%s)", buf->name);
452 break;
453 }
454 else if(result == PUT_SUBBUF_PUSHED) {
455 ERR("Buffer overflow (channel=%s), reader pushed. This channel will not be usable passed this point.", buf->name);
456 break;
457 }
458 else if(result == PUT_SUBBUF_DIED) {
459 WARN("application died while putting subbuffer");
460 /* FIXME: probably need to skip the first subbuffer in finish_consuming_dead_subbuffer */
461 finish_consuming_dead_subbuffer(buf);
462 break;
463 }
464 else if(result == PUT_SUBBUF_DONE) {
465 /* Done with this subbuffer */
466 /* FIXME: add a case where this branch is used? Upon
467 * normal trace termination, at put_subbuf time, a
468 * special last-subbuffer code could be returned by
469 * the listener.
470 */
471 break;
472 }
473 else if(result == PUT_SUBBUF_OK) {
474 }
475 }
476
477 DBG("thread for buffer %s is stopping", buf->name);
478
479 /* FIXME: destroy, unalloc... */
480
481 pthread_cleanup_pop(1);
482
483 return 0;
484 }
485
486 void free_buffer(struct buffer_info *buf)
487 {
488 }
489
490 struct consumer_thread_args {
491 pid_t pid;
492 const char *bufname;
493 };
494
495 void *consumer_thread(void *arg)
496 {
497 struct buffer_info *buf = (struct buffer_info *) arg;
498 struct consumer_thread_args *args = (struct consumer_thread_args *) arg;
499
500 DBG("GOT ARGS: pid %d bufname %s", args->pid, args->bufname);
501
502 buf = connect_buffer(args->pid, args->bufname);
503 if(buf == NULL) {
504 ERR("failed to connect to buffer");
505 goto end;
506 }
507
508 consumer_loop(buf);
509
510 free_buffer(buf);
511
512 end:
513 /* bufname is free'd in free_buffer() */
514 free(args);
515 return NULL;
516 }
517
518 int start_consuming_buffer(pid_t pid, const char *bufname)
519 {
520 pthread_t thr;
521 struct consumer_thread_args *args;
522
523 DBG("beginning of start_consuming_buffer: args: pid %d bufname %s", pid, bufname);
524
525 args = (struct consumer_thread_args *) malloc(sizeof(struct consumer_thread_args));
526
527 args->pid = pid;
528 args->bufname = strdup(bufname);
529 DBG("beginning2 of start_consuming_buffer: args: pid %d bufname %s", args->pid, args->bufname);
530
531 pthread_create(&thr, NULL, consumer_thread, args);
532 DBG("end of start_consuming_buffer: args: pid %d bufname %s", args->pid, args->bufname);
533
534 return 0;
535 }
536
537 void usage(void)
538 {
539 fprintf(stderr, "Usage:\nustd OPTIONS\n\nOptions:\n"
540 "\t-h\t\tDisplay this usage.\n"
541 "\t-o DIR\t\tSpecify the directory where to output the traces.\n"
542 "\t-s PATH\t\tSpecify the path to use for the daemon socket.\n"
543 "\t-d\t\tStart as a daemon.\n"
544 "\t--pidfile FILE\tWrite the PID in this file (when using -d).\n");
545 }
546
547 int parse_args(int argc, char **argv)
548 {
549 int c;
550
551 while (1) {
552 int option_index = 0;
553 static struct option long_options[] = {
554 {"pidfile", 1, 0, 'p'},
555 {"help", 0, 0, 'h'},
556 {"version", 0, 0, 'V'},
557 {0, 0, 0, 0}
558 };
559
560 c = getopt_long(argc, argv, "hs:o:d", long_options, &option_index);
561 if (c == -1)
562 break;
563
564 switch (c) {
565 case 0:
566 printf("option %s", long_options[option_index].name);
567 if (optarg)
568 printf(" with arg %s", optarg);
569 printf("\n");
570 break;
571 case 's':
572 sock_path = optarg;
573 break;
574 case 'o':
575 trace_path = optarg;
576 if(!is_directory(trace_path)) {
577 ERR("Not a valid directory. (%s)", trace_path);
578 return -1;
579 }
580 break;
581 case 'd':
582 daemon_mode = 1;
583 break;
584 case 'p':
585 pidfile = strdup(optarg);
586 break;
587 case 'h':
588 usage();
589 exit(0);
590 case 'V':
591 printf("Version 0.0\n");
592 break;
593
594 default:
595 /* unknown option or other error; error is
596 printed by getopt, just return */
597 return -1;
598 }
599 }
600
601 return 0;
602 }
603
604 void sigterm_handler(int sig)
605 {
606 terminate_req = 1;
607 }
608
609 static int write_pidfile(const char *file_name, pid_t pid)
610 {
611 FILE *pidfp;
612
613 pidfp = fopen(file_name, "w");
614 if(!pidfp) {
615 PERROR("fopen (%s)", pidfile);
616 WARN("killing child process");
617 return -1;
618 }
619
620 fprintf(pidfp, "%d\n", pid);
621
622 fclose(pidfp);
623
624 return 0;
625 }
626
627 int start_ustd(int fd)
628 {
629 struct ustcomm_ustd ustd;
630 int result;
631 sigset_t sigset;
632 struct sigaction sa;
633
634 result = sigemptyset(&sigset);
635 if(result == -1) {
636 PERROR("sigemptyset");
637 return 1;
638 }
639 sa.sa_handler = sigterm_handler;
640 sa.sa_mask = sigset;
641 sa.sa_flags = SA_RESTART;
642 result = sigaction(SIGTERM, &sa, NULL);
643 if(result == -1) {
644 PERROR("sigaction");
645 return 1;
646 }
647
648 result = ustcomm_init_ustd(&ustd, sock_path);
649 if(result == -1) {
650 ERR("failed to initialize socket");
651 return 1;
652 }
653
654 /* setup handler for SIGPIPE */
655 result = sigemptyset(&sigset);
656 if(result == -1) {
657 PERROR("sigemptyset");
658 return 1;
659 }
660 result = sigaddset(&sigset, SIGPIPE);
661 if(result == -1) {
662 PERROR("sigaddset");
663 return 1;
664 }
665 result = sigprocmask(SIG_BLOCK, &sigset, NULL);
666 if(result == -1) {
667 PERROR("sigprocmask");
668 return 1;
669 }
670
671 /* Write pidfile */
672 if(pidfile) {
673 result = write_pidfile(pidfile, getpid());
674 if(result == -1) {
675 ERR("failed to write pidfile");
676 return 1;
677 }
678 }
679
680 /* Notify parent that we are successfully started. */
681 if(fd != -1) {
682 /* write any one character */
683 result = write(fd, "!", 1);
684 if(result == -1) {
685 PERROR("write");
686 return -1;
687 }
688 if(result != 1) {
689 ERR("Problem sending confirmation of daemon start to parent");
690 return -1;
691 }
692 result = close(fd);
693 if(result == -1) {
694 PERROR("close");
695 }
696 }
697
698 /* app loop */
699 for(;;) {
700 char *recvbuf;
701
702 /* check for requests on our public socket */
703 result = ustcomm_ustd_recv_message(&ustd, &recvbuf, NULL, 100);
704 if(result == -1) {
705 ERR("error in ustcomm_ustd_recv_message");
706 goto loop_end;
707 }
708 if(result > 0) {
709 if(!strncmp(recvbuf, "collect", 7)) {
710 pid_t pid;
711 char *bufname;
712 int result;
713
714 result = sscanf(recvbuf, "%*s %d %50as", &pid, &bufname);
715 if(result != 2) {
716 ERR("parsing error: %s", recvbuf);
717 goto free_bufname;
718 }
719
720 result = start_consuming_buffer(pid, bufname);
721 if(result < 0) {
722 ERR("error in add_buffer");
723 goto free_bufname;
724 }
725
726 free_bufname:
727 free(bufname);
728 }
729
730 free(recvbuf);
731 }
732
733 loop_end:
734
735 if(terminate_req) {
736 pthread_mutex_lock(&active_buffers_mutex);
737 if(active_buffers == 0) {
738 pthread_mutex_unlock(&active_buffers_mutex);
739 break;
740 }
741 pthread_mutex_unlock(&active_buffers_mutex);
742 }
743 }
744
745 return 0;
746 }
747
748 int start_ustd_daemon()
749 {
750 int result;
751 int fd[2];
752 pid_t child_pid;
753
754 result = pipe(fd);
755
756 result = child_pid = fork();
757 if(result == -1) {
758 PERROR("fork");
759 return -1;
760 }
761 else if(result == 0) {
762 return start_ustd(fd[1]);
763 }
764 else {
765 char buf;
766
767 result = read(fd[0], &buf, 1);
768 if(result == -1) {
769 PERROR("read");
770 return -1;
771 }
772 if(result != 1) {
773 ERR("did not receive valid confirmation that the daemon is started");
774 return -1;
775 }
776
777 result = close(fd[0]);
778 if(result == -1) {
779 PERROR("close");
780 }
781
782 DBG("The daemon is now successfully started");
783 }
784
785 /* Wait for confirmation that the server is ready. */
786
787
788 return 0;
789 }
790
791 int main(int argc, char **argv)
792 {
793 int result;
794
795 result = parse_args(argc, argv);
796 if(result == -1) {
797 exit(1);
798 }
799
800 if(daemon_mode) {
801 result = start_ustd_daemon();
802 }
803 else {
804 result = start_ustd(-1);
805 }
806
807 return result;
808 }
This page took 0.045091 seconds and 3 git commands to generate.