improve error handling
[ust.git] / ustd / ustd.c
1 /* Copyright (C) 2009 Pierre-Marc Fournier
2 *
3 * This library is free software; you can redistribute it and/or
4 * modify it under the terms of the GNU Lesser General Public
5 * License as published by the Free Software Foundation; either
6 * version 2.1 of the License, or (at your option) any later version.
7 *
8 * This library is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 * Lesser General Public License for more details.
12 *
13 * You should have received a copy of the GNU Lesser General Public
14 * License along with this library; if not, write to the Free Software
15 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17
18 #define _GNU_SOURCE
19
20 #include <sys/types.h>
21 #include <sys/stat.h>
22 #include <sys/shm.h>
23 #include <fcntl.h>
24 #include <unistd.h>
25 #include <pthread.h>
26 #include <signal.h>
27
28 #include <stdlib.h>
29 #include <stdio.h>
30 #include <string.h>
31 #include <errno.h>
32 #include <assert.h>
33 #include <getopt.h>
34
35 #include "ustd.h"
36 #include "localerr.h"
37 #include "ustcomm.h"
38
39 /* return value: 0 = subbuffer is finished, it won't produce data anymore
40 * 1 = got subbuffer successfully
41 * <0 = error
42 */
43
44 #define GET_SUBBUF_OK 1
45 #define GET_SUBBUF_DONE 0
46 #define GET_SUBBUF_DIED 2
47
48 #define PUT_SUBBUF_OK 1
49 #define PUT_SUBBUF_DIED 0
50 #define PUT_SUBBUF_PUSHED 2
51
52 char *sock_path=NULL;
53 char *trace_path=NULL;
54
55 /* Number of active buffers and the mutex to protect it. */
56 int active_buffers = 0;
57 pthread_mutex_t active_buffers_mutex = PTHREAD_MUTEX_INITIALIZER;
58 /* Whether a request to end the program was received. */
59 sig_atomic_t terminate_req = 0;
60
61 int test_sigpipe(void)
62 {
63 sigset_t sigset;
64 int result;
65
66 result = sigemptyset(&sigset);
67 if(result == -1) {
68 PERROR("sigemptyset");
69 return -1;
70 }
71 result = sigaddset(&sigset, SIGPIPE);
72 if(result == -1) {
73 PERROR("sigaddset");
74 return -1;
75 }
76
77 result = sigtimedwait(&sigset, NULL, &(struct timespec){0,0});
78 if(result == -1 && errno == EAGAIN) {
79 /* no signal received */
80 return 0;
81 }
82 else if(result == -1) {
83 PERROR("sigtimedwait");
84 return -1;
85 }
86 else if(result == SIGPIPE) {
87 /* received sigpipe */
88 return 1;
89 }
90 else {
91 assert(0);
92 }
93 }
94
95 int get_subbuffer(struct buffer_info *buf)
96 {
97 char *send_msg=NULL;
98 char *received_msg=NULL;
99 char *rep_code=NULL;
100 int retval;
101 int result;
102
103 asprintf(&send_msg, "get_subbuffer %s", buf->name);
104 result = ustcomm_send_request(&buf->conn, send_msg, &received_msg);
105 if(test_sigpipe()) {
106 WARN("process %d destroyed before we could connect to it", buf->pid);
107 retval = GET_SUBBUF_DONE;
108 goto end;
109 }
110 else if(result < 0) {
111 ERR("get_subbuffer: ustcomm_send_request failed");
112 retval = -1;
113 goto end;
114 }
115
116 result = sscanf(received_msg, "%as %ld", &rep_code, &buf->consumed_old);
117 if(result != 2 && result != 1) {
118 ERR("unable to parse response to get_subbuffer");
119 retval = -1;
120 goto end_rep;
121 }
122
123 DBG("received msg is %s", received_msg);
124
125 if(!strcmp(rep_code, "OK")) {
126 DBG("got subbuffer %s", buf->name);
127 retval = GET_SUBBUF_OK;
128 }
129 else if(nth_token_is(received_msg, "END", 0) == 1) {
130 retval = GET_SUBBUF_DONE;
131 goto end_rep;
132 }
133 else {
134 DBG("error getting subbuffer %s", buf->name);
135 retval = -1;
136 }
137
138 /* FIMXE: free correctly the stuff */
139 end_rep:
140 if(rep_code)
141 free(rep_code);
142 end:
143 if(send_msg)
144 free(send_msg);
145 if(received_msg)
146 free(received_msg);
147
148 return retval;
149 }
150
151 int put_subbuffer(struct buffer_info *buf)
152 {
153 char *send_msg=NULL;
154 char *received_msg=NULL;
155 char *rep_code=NULL;
156 int retval;
157 int result;
158
159 asprintf(&send_msg, "put_subbuffer %s %ld", buf->name, buf->consumed_old);
160 result = ustcomm_send_request(&buf->conn, send_msg, &received_msg);
161 if(result < 0 && errno == ECONNRESET) {
162 retval = PUT_SUBBUF_DIED;
163 goto end;
164 }
165 if(result < 0) {
166 ERR("put_subbuffer: send_message failed");
167 retval = -1;
168 goto end;
169 }
170
171 result = sscanf(received_msg, "%as", &rep_code);
172 if(result != 1) {
173 ERR("unable to parse response to put_subbuffer");
174 retval = -1;
175 goto end_rep;
176 }
177
178 if(!strcmp(rep_code, "OK")) {
179 DBG("subbuffer put %s", buf->name);
180 retval = PUT_SUBBUF_OK;
181 }
182 else {
183 DBG("put_subbuffer: received error, we were pushed");
184 retval = PUT_SUBBUF_PUSHED;
185 goto end_rep;
186 }
187
188 end_rep:
189 if(rep_code)
190 free(rep_code);
191
192 end:
193 if(send_msg)
194 free(send_msg);
195 if(received_msg)
196 free(received_msg);
197
198 return retval;
199 }
200
201 /* This write is patient because it restarts if it was incomplete.
202 */
203
204 ssize_t patient_write(int fd, const void *buf, size_t count)
205 {
206 const char *bufc = (const char *) buf;
207 int result;
208
209 for(;;) {
210 result = write(fd, bufc, count);
211 if(result <= 0) {
212 return result;
213 }
214 count -= result;
215 bufc += result;
216
217 if(count == 0) {
218 break;
219 }
220 }
221
222 return bufc-(const char *)buf;
223 }
224
225 void decrement_active_buffers(void *arg)
226 {
227 pthread_mutex_lock(&active_buffers_mutex);
228 active_buffers--;
229 pthread_mutex_unlock(&active_buffers_mutex);
230 }
231
232 void *consumer_thread(void *arg)
233 {
234 struct buffer_info *buf = (struct buffer_info *) arg;
235 int result;
236
237 pthread_cleanup_push(decrement_active_buffers, NULL);
238
239 for(;;) {
240 /* get the subbuffer */
241 result = get_subbuffer(buf);
242 if(result == -1) {
243 ERR("error getting subbuffer");
244 continue;
245 }
246 else if(result == GET_SUBBUF_DONE) {
247 /* this is done */
248 break;
249 }
250 else if(result == GET_SUBBUF_DIED) {
251 finish_consuming_dead_subbuffer(buf);
252 break;
253 }
254
255 /* write data to file */
256 result = patient_write(buf->file_fd, buf->mem + (buf->consumed_old & (buf->n_subbufs * buf->subbuf_size-1)), buf->subbuf_size);
257 if(result == -1) {
258 PERROR("write");
259 /* FIXME: maybe drop this trace */
260 }
261
262 /* put the subbuffer */
263 result = put_subbuffer(buf);
264 if(result == -1) {
265 ERR("unknown error putting subbuffer (channel=%s)", buf->name);
266 break;
267 }
268 else if(result == PUT_SUBBUF_PUSHED) {
269 ERR("Buffer overflow (channel=%s), reader pushed. This channel will not be usable passed this point.", buf->name);
270 break;
271 }
272 else if(result == PUT_SUBBUF_DIED) {
273 WARN("application died while putting subbuffer");
274 /* FIXME: probably need to skip the first subbuffer in finish_consuming_dead_subbuffer */
275 finish_consuming_dead_subbuffer(buf);
276 break;
277 }
278 else if(result == PUT_SUBBUF_OK) {
279 }
280 }
281
282 DBG("thread for buffer %s is stopping", buf->name);
283
284 /* FIXME: destroy, unalloc... */
285
286 pthread_cleanup_pop(1);
287
288 return NULL;
289 }
290
291 int create_dir_if_needed(char *dir)
292 {
293 int result;
294 result = mkdir(dir, 0777);
295 if(result == -1) {
296 if(errno != EEXIST) {
297 PERROR("mkdir");
298 return -1;
299 }
300 }
301
302 return 0;
303 }
304
305 int is_directory(const char *dir)
306 {
307 int result;
308 struct stat st;
309
310 result = stat(dir, &st);
311 if(result == -1) {
312 PERROR("stat");
313 return 0;
314 }
315
316 if(!S_ISDIR(st.st_mode)) {
317 return 0;
318 }
319
320 return 1;
321 }
322
323 int add_buffer(pid_t pid, char *bufname)
324 {
325 struct buffer_info *buf;
326 char *send_msg;
327 char *received_msg;
328 int result;
329 char *tmp;
330 int fd;
331 pthread_t thr;
332 struct shmid_ds shmds;
333
334 buf = (struct buffer_info *) malloc(sizeof(struct buffer_info));
335 if(buf == NULL) {
336 ERR("add_buffer: insufficient memory");
337 return -1;
338 }
339
340 buf->name = bufname;
341 buf->pid = pid;
342
343 /* connect to app */
344 result = ustcomm_connect_app(buf->pid, &buf->conn);
345 if(result) {
346 WARN("unable to connect to process, it probably died before we were able to connect");
347 return -1;
348 }
349
350 /* get pidunique */
351 asprintf(&send_msg, "get_pidunique");
352 result = ustcomm_send_request(&buf->conn, send_msg, &received_msg);
353 free(send_msg);
354 if(result == -1) {
355 ERR("problem in ustcomm_send_request(get_pidunique)");
356 return -1;
357 }
358
359 result = sscanf(received_msg, "%lld", &buf->pidunique);
360 if(result != 1) {
361 ERR("unable to parse response to get_pidunique");
362 return -1;
363 }
364 free(received_msg);
365 DBG("got pidunique %lld", buf->pidunique);
366
367 /* get shmid */
368 asprintf(&send_msg, "get_shmid %s", buf->name);
369 result = ustcomm_send_request(&buf->conn, send_msg, &received_msg);
370 free(send_msg);
371 if(result == -1) {
372 ERR("problem in ustcomm_send_request(get_shmid)");
373 return -1;
374 }
375
376 result = sscanf(received_msg, "%d %d", &buf->shmid, &buf->bufstruct_shmid);
377 if(result != 2) {
378 ERR("unable to parse response to get_shmid");
379 return -1;
380 }
381 free(received_msg);
382 DBG("got shmids %d %d", buf->shmid, buf->bufstruct_shmid);
383
384 /* get n_subbufs */
385 asprintf(&send_msg, "get_n_subbufs %s", buf->name);
386 result = ustcomm_send_request(&buf->conn, send_msg, &received_msg);
387 free(send_msg);
388 if(result == -1) {
389 ERR("problem in ustcomm_send_request(g_n_subbufs)");
390 return -1;
391 }
392
393 result = sscanf(received_msg, "%d", &buf->n_subbufs);
394 if(result != 1) {
395 ERR("unable to parse response to get_n_subbufs");
396 return -1;
397 }
398 free(received_msg);
399 DBG("got n_subbufs %d", buf->n_subbufs);
400
401 /* get subbuf size */
402 asprintf(&send_msg, "get_subbuf_size %s", buf->name);
403 ustcomm_send_request(&buf->conn, send_msg, &received_msg);
404 free(send_msg);
405
406 result = sscanf(received_msg, "%d", &buf->subbuf_size);
407 if(result != 1) {
408 ERR("unable to parse response to get_subbuf_size");
409 return -1;
410 }
411 free(received_msg);
412 DBG("got subbuf_size %d", buf->subbuf_size);
413
414 /* attach memory */
415 buf->mem = shmat(buf->shmid, NULL, 0);
416 if(buf->mem == (void *) 0) {
417 PERROR("shmat");
418 return -1;
419 }
420 DBG("successfully attached buffer memory");
421
422 buf->bufstruct_mem = shmat(buf->bufstruct_shmid, NULL, 0);
423 if(buf->bufstruct_mem == (void *) 0) {
424 PERROR("shmat");
425 return -1;
426 }
427 DBG("successfully attached buffer bufstruct memory");
428
429 /* obtain info on the memory segment */
430 result = shmctl(buf->shmid, IPC_STAT, &shmds);
431 if(result == -1) {
432 PERROR("shmctl");
433 return -1;
434 }
435 buf->memlen = shmds.shm_segsz;
436
437 /* open file for output */
438 if(!trace_path) {
439 /* Only create the directory if using the default path, because
440 * of the risk of typo when using trace path override. We don't
441 * want to risk creating plenty of useless directories in that case.
442 */
443 result = create_dir_if_needed(USTD_DEFAULT_TRACE_PATH);
444 if(result == -1) {
445 ERR("could not create directory %s", USTD_DEFAULT_TRACE_PATH);
446 return -1;
447 }
448
449 trace_path = USTD_DEFAULT_TRACE_PATH;
450 }
451
452 asprintf(&tmp, "%s/%u_%lld", trace_path, buf->pid, buf->pidunique);
453 result = create_dir_if_needed(tmp);
454 if(result == -1) {
455 ERR("could not create directory %s", tmp);
456 free(tmp);
457 return -1;
458 }
459 free(tmp);
460
461 asprintf(&tmp, "%s/%u_%lld/%s_0", trace_path, buf->pid, buf->pidunique, buf->name);
462 result = fd = open(tmp, O_WRONLY | O_CREAT | O_TRUNC | O_EXCL, 00600);
463 if(result == -1) {
464 PERROR("open");
465 ERR("failed opening trace file %s", tmp);
466 return -1;
467 }
468 buf->file_fd = fd;
469 free(tmp);
470
471 pthread_mutex_lock(&active_buffers_mutex);
472 active_buffers++;
473 pthread_mutex_unlock(&active_buffers_mutex);
474
475 pthread_create(&thr, NULL, consumer_thread, buf);
476
477 return 0;
478 }
479
480 void usage(void)
481 {
482 fprintf(stderr, "Usage:\nustd OPTIONS\n\nOptions:\n"
483 "\t-h\t\tDisplay this usage.\n"
484 "\t-o DIR\t\tSpecify the directory where to output the traces.\n"
485 "\t-s PATH\t\tSpecify the path to use for the daemon socket.\n");
486 }
487
488 int parse_args(int argc, char **argv)
489 {
490 int c;
491
492 while (1) {
493 int option_index = 0;
494 static struct option long_options[] = {
495 {"help", 0, 0, 'h'},
496 {"version", 0, 0, 'V'},
497 {0, 0, 0, 0}
498 };
499
500 c = getopt_long(argc, argv, "hs:o:", long_options, &option_index);
501 if (c == -1)
502 break;
503
504 switch (c) {
505 case 0:
506 printf("option %s", long_options[option_index].name);
507 if (optarg)
508 printf(" with arg %s", optarg);
509 printf("\n");
510 break;
511 case 's':
512 sock_path = optarg;
513 break;
514 case 'o':
515 trace_path = optarg;
516 if(!is_directory(trace_path)) {
517 ERR("Not a valid directory. (%s)", trace_path);
518 return -1;
519 }
520 break;
521 case 'h':
522 usage();
523 exit(0);
524 case 'V':
525 printf("Version 0.0\n");
526 break;
527
528 default:
529 /* unknown option or other error; error is
530 printed by getopt, just return */
531 return -1;
532 }
533 }
534
535 return 0;
536 }
537
538 void sigterm_handler(int sig)
539 {
540 terminate_req = 1;
541 }
542
543 int main(int argc, char **argv)
544 {
545 struct ustcomm_ustd ustd;
546 int result;
547 sigset_t sigset;
548 struct sigaction sa;
549
550 result = sigemptyset(&sigset);
551 if(result == -1) {
552 PERROR("sigemptyset");
553 return 1;
554 }
555 sa.sa_handler = sigterm_handler;
556 sa.sa_mask = sigset;
557 sa.sa_flags = SA_RESTART;
558 result = sigaction(SIGTERM, &sa, NULL);
559 if(result == -1) {
560 PERROR("sigaction");
561 return 1;
562 }
563
564 result = parse_args(argc, argv);
565 if(result == -1) {
566 exit(1);
567 }
568
569 result = ustcomm_init_ustd(&ustd, sock_path);
570 if(result == -1) {
571 ERR("failed to initialize socket");
572 return 1;
573 }
574
575 /* setup handler for SIGPIPE */
576 result = sigemptyset(&sigset);
577 if(result == -1) {
578 PERROR("sigemptyset");
579 return 1;
580 }
581 result = sigaddset(&sigset, SIGPIPE);
582 if(result == -1) {
583 PERROR("sigaddset");
584 return 1;
585 }
586 result = sigprocmask(SIG_BLOCK, &sigset, NULL);
587 if(result == -1) {
588 PERROR("sigprocmask");
589 return 1;
590 }
591
592 /* app loop */
593 for(;;) {
594 char *recvbuf;
595
596 /* check for requests on our public socket */
597 result = ustcomm_ustd_recv_message(&ustd, &recvbuf, NULL, 100);
598 if(result == -1) {
599 ERR("error in ustcomm_ustd_recv_message");
600 continue;
601 }
602 if(result > 0) {
603 if(!strncmp(recvbuf, "collect", 7)) {
604 pid_t pid;
605 char *bufname;
606 int result;
607
608 result = sscanf(recvbuf, "%*s %d %50as", &pid, &bufname);
609 if(result != 2) {
610 fprintf(stderr, "parsing error: %s\n", recvbuf);
611 }
612
613 result = add_buffer(pid, bufname);
614 if(result < 0) {
615 ERR("error in add_buffer");
616 continue;
617 }
618 }
619
620 free(recvbuf);
621 }
622
623 if(terminate_req) {
624 pthread_mutex_lock(&active_buffers_mutex);
625 if(active_buffers == 0) {
626 pthread_mutex_unlock(&active_buffers_mutex);
627 break;
628 }
629 pthread_mutex_unlock(&active_buffers_mutex);
630 }
631 }
632
633 return 0;
634 }
This page took 0.040544 seconds and 4 git commands to generate.