improve error handling
[ust.git] / ustd / ustd.c
CommitLineData
c39c72ee 1/* Copyright (C) 2009 Pierre-Marc Fournier
1f8b0dff 2 *
c39c72ee
PMF
3 * This library is free software; you can redistribute it and/or
4 * modify it under the terms of the GNU Lesser General Public
5 * License as published by the Free Software Foundation; either
6 * version 2.1 of the License, or (at your option) any later version.
1f8b0dff 7 *
c39c72ee 8 * This library is distributed in the hope that it will be useful,
1f8b0dff 9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
c39c72ee
PMF
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 * Lesser General Public License for more details.
1f8b0dff 12 *
c39c72ee
PMF
13 * You should have received a copy of the GNU Lesser General Public
14 * License along with this library; if not, write to the Free Software
15 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
1f8b0dff
PMF
16 */
17
3796af9b
PMF
18#define _GNU_SOURCE
19
20#include <sys/types.h>
cd226f25 21#include <sys/stat.h>
3796af9b 22#include <sys/shm.h>
688760ef
PMF
23#include <fcntl.h>
24#include <unistd.h>
3a7b90de 25#include <pthread.h>
a3cdd4a7 26#include <signal.h>
3796af9b
PMF
27
28#include <stdlib.h>
29#include <stdio.h>
30#include <string.h>
a3cdd4a7
PMF
31#include <errno.h>
32#include <assert.h>
cd226f25 33#include <getopt.h>
3796af9b 34
0b0cd937 35#include "ustd.h"
3796af9b
PMF
36#include "localerr.h"
37#include "ustcomm.h"
38
3a7b90de
PMF
39/* return value: 0 = subbuffer is finished, it won't produce data anymore
40 * 1 = got subbuffer successfully
41 * <0 = error
42 */
3796af9b 43
8cefc145
PMF
44#define GET_SUBBUF_OK 1
45#define GET_SUBBUF_DONE 0
46#define GET_SUBBUF_DIED 2
47
a3cdd4a7
PMF
48#define PUT_SUBBUF_OK 1
49#define PUT_SUBBUF_DIED 0
50#define PUT_SUBBUF_PUSHED 2
51
c97d4437 52char *sock_path=NULL;
cd226f25
PMF
53char *trace_path=NULL;
54
3158b808
PMF
55/* Number of active buffers and the mutex to protect it. */
56int active_buffers = 0;
57pthread_mutex_t active_buffers_mutex = PTHREAD_MUTEX_INITIALIZER;
58/* Whether a request to end the program was received. */
59sig_atomic_t terminate_req = 0;
60
a3cdd4a7
PMF
61int test_sigpipe(void)
62{
63 sigset_t sigset;
64 int result;
65
66 result = sigemptyset(&sigset);
67 if(result == -1) {
4d70f833 68 PERROR("sigemptyset");
a3cdd4a7
PMF
69 return -1;
70 }
71 result = sigaddset(&sigset, SIGPIPE);
72 if(result == -1) {
4d70f833 73 PERROR("sigaddset");
a3cdd4a7
PMF
74 return -1;
75 }
76
77 result = sigtimedwait(&sigset, NULL, &(struct timespec){0,0});
78 if(result == -1 && errno == EAGAIN) {
79 /* no signal received */
80 return 0;
81 }
82 else if(result == -1) {
4d70f833 83 PERROR("sigtimedwait");
a3cdd4a7
PMF
84 return -1;
85 }
86 else if(result == SIGPIPE) {
87 /* received sigpipe */
88 return 1;
89 }
90 else {
91 assert(0);
92 }
93}
94
688760ef
PMF
95int get_subbuffer(struct buffer_info *buf)
96{
ab805ccd
PMF
97 char *send_msg=NULL;
98 char *received_msg=NULL;
99 char *rep_code=NULL;
688760ef
PMF
100 int retval;
101 int result;
102
103 asprintf(&send_msg, "get_subbuffer %s", buf->name);
3bb56863 104 result = ustcomm_send_request(&buf->conn, send_msg, &received_msg);
a3cdd4a7
PMF
105 if(test_sigpipe()) {
106 WARN("process %d destroyed before we could connect to it", buf->pid);
ab805ccd
PMF
107 retval = GET_SUBBUF_DONE;
108 goto end;
a3cdd4a7
PMF
109 }
110 else if(result < 0) {
3bb56863 111 ERR("get_subbuffer: ustcomm_send_request failed");
ab805ccd
PMF
112 retval = -1;
113 goto end;
688760ef 114 }
688760ef
PMF
115
116 result = sscanf(received_msg, "%as %ld", &rep_code, &buf->consumed_old);
3a7b90de 117 if(result != 2 && result != 1) {
688760ef 118 ERR("unable to parse response to get_subbuffer");
ab805ccd
PMF
119 retval = -1;
120 goto end_rep;
688760ef 121 }
3a7b90de
PMF
122
123 DBG("received msg is %s", received_msg);
688760ef
PMF
124
125 if(!strcmp(rep_code, "OK")) {
126 DBG("got subbuffer %s", buf->name);
8cefc145 127 retval = GET_SUBBUF_OK;
688760ef 128 }
3a7b90de 129 else if(nth_token_is(received_msg, "END", 0) == 1) {
ab805ccd
PMF
130 retval = GET_SUBBUF_DONE;
131 goto end_rep;
3a7b90de 132 }
688760ef 133 else {
3a7b90de
PMF
134 DBG("error getting subbuffer %s", buf->name);
135 retval = -1;
688760ef
PMF
136 }
137
3a7b90de 138 /* FIMXE: free correctly the stuff */
ab805ccd
PMF
139end_rep:
140 if(rep_code)
141 free(rep_code);
142end:
143 if(send_msg)
144 free(send_msg);
145 if(received_msg)
146 free(received_msg);
147
688760ef
PMF
148 return retval;
149}
150
151int put_subbuffer(struct buffer_info *buf)
152{
ab805ccd
PMF
153 char *send_msg=NULL;
154 char *received_msg=NULL;
155 char *rep_code=NULL;
688760ef
PMF
156 int retval;
157 int result;
158
159 asprintf(&send_msg, "put_subbuffer %s %ld", buf->name, buf->consumed_old);
3bb56863 160 result = ustcomm_send_request(&buf->conn, send_msg, &received_msg);
ab805ccd
PMF
161 if(result < 0 && errno == ECONNRESET) {
162 retval = PUT_SUBBUF_DIED;
163 goto end;
164 }
688760ef
PMF
165 if(result < 0) {
166 ERR("put_subbuffer: send_message failed");
ab805ccd
PMF
167 retval = -1;
168 goto end;
688760ef 169 }
688760ef
PMF
170
171 result = sscanf(received_msg, "%as", &rep_code);
172 if(result != 1) {
173 ERR("unable to parse response to put_subbuffer");
ab805ccd
PMF
174 retval = -1;
175 goto end_rep;
688760ef 176 }
688760ef
PMF
177
178 if(!strcmp(rep_code, "OK")) {
179 DBG("subbuffer put %s", buf->name);
a3cdd4a7 180 retval = PUT_SUBBUF_OK;
688760ef
PMF
181 }
182 else {
a3cdd4a7 183 DBG("put_subbuffer: received error, we were pushed");
ab805ccd
PMF
184 retval = PUT_SUBBUF_PUSHED;
185 goto end_rep;
688760ef
PMF
186 }
187
ab805ccd
PMF
188end_rep:
189 if(rep_code)
190 free(rep_code);
191
192end:
193 if(send_msg)
194 free(send_msg);
195 if(received_msg)
196 free(received_msg);
197
688760ef
PMF
198 return retval;
199}
200
a3cdd4a7
PMF
201/* This write is patient because it restarts if it was incomplete.
202 */
203
688760ef
PMF
204ssize_t patient_write(int fd, const void *buf, size_t count)
205{
206 const char *bufc = (const char *) buf;
207 int result;
208
209 for(;;) {
210 result = write(fd, bufc, count);
211 if(result <= 0) {
212 return result;
213 }
214 count -= result;
215 bufc += result;
216
217 if(count == 0) {
218 break;
219 }
220 }
221
222 return bufc-(const char *)buf;
223}
224
3158b808
PMF
225void decrement_active_buffers(void *arg)
226{
227 pthread_mutex_lock(&active_buffers_mutex);
228 active_buffers--;
229 pthread_mutex_unlock(&active_buffers_mutex);
230}
231
3a7b90de
PMF
232void *consumer_thread(void *arg)
233{
234 struct buffer_info *buf = (struct buffer_info *) arg;
235 int result;
236
3158b808
PMF
237 pthread_cleanup_push(decrement_active_buffers, NULL);
238
3a7b90de 239 for(;;) {
8cefc145 240 /* get the subbuffer */
0b0cd937
PMF
241 result = get_subbuffer(buf);
242 if(result == -1) {
243 ERR("error getting subbuffer");
244 continue;
3a7b90de 245 }
0b0cd937
PMF
246 else if(result == GET_SUBBUF_DONE) {
247 /* this is done */
248 break;
249 }
250 else if(result == GET_SUBBUF_DIED) {
251 finish_consuming_dead_subbuffer(buf);
252 break;
3a7b90de
PMF
253 }
254
255 /* write data to file */
256 result = patient_write(buf->file_fd, buf->mem + (buf->consumed_old & (buf->n_subbufs * buf->subbuf_size-1)), buf->subbuf_size);
257 if(result == -1) {
258 PERROR("write");
259 /* FIXME: maybe drop this trace */
260 }
261
8cefc145 262 /* put the subbuffer */
0b0cd937
PMF
263 result = put_subbuffer(buf);
264 if(result == -1) {
a3cdd4a7
PMF
265 ERR("unknown error putting subbuffer (channel=%s)", buf->name);
266 break;
267 }
268 else if(result == PUT_SUBBUF_PUSHED) {
269 ERR("Buffer overflow (channel=%s), reader pushed. This channel will not be usable passed this point.", buf->name);
0b0cd937 270 break;
3a7b90de 271 }
a3cdd4a7
PMF
272 else if(result == PUT_SUBBUF_DIED) {
273 WARN("application died while putting subbuffer");
274 /* FIXME: probably need to skip the first subbuffer in finish_consuming_dead_subbuffer */
275 finish_consuming_dead_subbuffer(buf);
9f654956 276 break;
a3cdd4a7
PMF
277 }
278 else if(result == PUT_SUBBUF_OK) {
279 }
3a7b90de
PMF
280 }
281
282 DBG("thread for buffer %s is stopping", buf->name);
283
8cefc145
PMF
284 /* FIXME: destroy, unalloc... */
285
3158b808
PMF
286 pthread_cleanup_pop(1);
287
3a7b90de
PMF
288 return NULL;
289}
290
72ebd39a
PMF
291int create_dir_if_needed(char *dir)
292{
293 int result;
294 result = mkdir(dir, 0777);
295 if(result == -1) {
296 if(errno != EEXIST) {
4d70f833 297 PERROR("mkdir");
72ebd39a
PMF
298 return -1;
299 }
300 }
301
302 return 0;
303}
304
cd226f25
PMF
305int is_directory(const char *dir)
306{
307 int result;
308 struct stat st;
309
310 result = stat(dir, &st);
311 if(result == -1) {
312 PERROR("stat");
313 return 0;
314 }
315
316 if(!S_ISDIR(st.st_mode)) {
317 return 0;
318 }
319
320 return 1;
321}
322
3a7b90de
PMF
323int add_buffer(pid_t pid, char *bufname)
324{
325 struct buffer_info *buf;
326 char *send_msg;
327 char *received_msg;
328 int result;
329 char *tmp;
330 int fd;
331 pthread_t thr;
a3cdd4a7 332 struct shmid_ds shmds;
3a7b90de
PMF
333
334 buf = (struct buffer_info *) malloc(sizeof(struct buffer_info));
335 if(buf == NULL) {
336 ERR("add_buffer: insufficient memory");
337 return -1;
338 }
339
340 buf->name = bufname;
341 buf->pid = pid;
342
4e2a8808
PMF
343 /* connect to app */
344 result = ustcomm_connect_app(buf->pid, &buf->conn);
345 if(result) {
a3cdd4a7 346 WARN("unable to connect to process, it probably died before we were able to connect");
4e2a8808
PMF
347 return -1;
348 }
349
ed1317e7
PMF
350 /* get pidunique */
351 asprintf(&send_msg, "get_pidunique");
352 result = ustcomm_send_request(&buf->conn, send_msg, &received_msg);
353 free(send_msg);
354 if(result == -1) {
355 ERR("problem in ustcomm_send_request(get_pidunique)");
356 return -1;
357 }
358
359 result = sscanf(received_msg, "%lld", &buf->pidunique);
360 if(result != 1) {
361 ERR("unable to parse response to get_pidunique");
362 return -1;
363 }
364 free(received_msg);
365 DBG("got pidunique %lld", buf->pidunique);
366
3a7b90de
PMF
367 /* get shmid */
368 asprintf(&send_msg, "get_shmid %s", buf->name);
a3cdd4a7 369 result = ustcomm_send_request(&buf->conn, send_msg, &received_msg);
3a7b90de 370 free(send_msg);
a3cdd4a7
PMF
371 if(result == -1) {
372 ERR("problem in ustcomm_send_request(get_shmid)");
373 return -1;
374 }
3a7b90de 375
8cefc145
PMF
376 result = sscanf(received_msg, "%d %d", &buf->shmid, &buf->bufstruct_shmid);
377 if(result != 2) {
3a7b90de
PMF
378 ERR("unable to parse response to get_shmid");
379 return -1;
380 }
381 free(received_msg);
8cefc145 382 DBG("got shmids %d %d", buf->shmid, buf->bufstruct_shmid);
3a7b90de
PMF
383
384 /* get n_subbufs */
385 asprintf(&send_msg, "get_n_subbufs %s", buf->name);
a3cdd4a7 386 result = ustcomm_send_request(&buf->conn, send_msg, &received_msg);
3a7b90de 387 free(send_msg);
a3cdd4a7
PMF
388 if(result == -1) {
389 ERR("problem in ustcomm_send_request(g_n_subbufs)");
390 return -1;
391 }
3a7b90de
PMF
392
393 result = sscanf(received_msg, "%d", &buf->n_subbufs);
394 if(result != 1) {
395 ERR("unable to parse response to get_n_subbufs");
396 return -1;
397 }
398 free(received_msg);
399 DBG("got n_subbufs %d", buf->n_subbufs);
400
401 /* get subbuf size */
402 asprintf(&send_msg, "get_subbuf_size %s", buf->name);
4e2a8808 403 ustcomm_send_request(&buf->conn, send_msg, &received_msg);
3a7b90de
PMF
404 free(send_msg);
405
406 result = sscanf(received_msg, "%d", &buf->subbuf_size);
407 if(result != 1) {
408 ERR("unable to parse response to get_subbuf_size");
409 return -1;
410 }
411 free(received_msg);
412 DBG("got subbuf_size %d", buf->subbuf_size);
413
414 /* attach memory */
415 buf->mem = shmat(buf->shmid, NULL, 0);
416 if(buf->mem == (void *) 0) {
4d70f833 417 PERROR("shmat");
3a7b90de
PMF
418 return -1;
419 }
8cefc145
PMF
420 DBG("successfully attached buffer memory");
421
422 buf->bufstruct_mem = shmat(buf->bufstruct_shmid, NULL, 0);
423 if(buf->bufstruct_mem == (void *) 0) {
4d70f833 424 PERROR("shmat");
8cefc145
PMF
425 return -1;
426 }
427 DBG("successfully attached buffer bufstruct memory");
3a7b90de 428
a3cdd4a7
PMF
429 /* obtain info on the memory segment */
430 result = shmctl(buf->shmid, IPC_STAT, &shmds);
431 if(result == -1) {
4d70f833 432 PERROR("shmctl");
a3cdd4a7
PMF
433 return -1;
434 }
435 buf->memlen = shmds.shm_segsz;
436
3a7b90de 437 /* open file for output */
cd226f25
PMF
438 if(!trace_path) {
439 /* Only create the directory if using the default path, because
440 * of the risk of typo when using trace path override. We don't
441 * want to risk creating plenty of useless directories in that case.
442 */
443 result = create_dir_if_needed(USTD_DEFAULT_TRACE_PATH);
444 if(result == -1) {
445 ERR("could not create directory %s", USTD_DEFAULT_TRACE_PATH);
446 return -1;
447 }
448
449 trace_path = USTD_DEFAULT_TRACE_PATH;
72ebd39a
PMF
450 }
451
ed1317e7 452 asprintf(&tmp, "%s/%u_%lld", trace_path, buf->pid, buf->pidunique);
72ebd39a
PMF
453 result = create_dir_if_needed(tmp);
454 if(result == -1) {
455 ERR("could not create directory %s", tmp);
456 free(tmp);
457 return -1;
458 }
459 free(tmp);
460
ed1317e7
PMF
461 asprintf(&tmp, "%s/%u_%lld/%s_0", trace_path, buf->pid, buf->pidunique, buf->name);
462 result = fd = open(tmp, O_WRONLY | O_CREAT | O_TRUNC | O_EXCL, 00600);
3a7b90de
PMF
463 if(result == -1) {
464 PERROR("open");
6cb88bc0 465 ERR("failed opening trace file %s", tmp);
3a7b90de
PMF
466 return -1;
467 }
468 buf->file_fd = fd;
469 free(tmp);
470
3158b808
PMF
471 pthread_mutex_lock(&active_buffers_mutex);
472 active_buffers++;
473 pthread_mutex_unlock(&active_buffers_mutex);
474
3a7b90de
PMF
475 pthread_create(&thr, NULL, consumer_thread, buf);
476
477 return 0;
478}
479
cd226f25
PMF
480void usage(void)
481{
482 fprintf(stderr, "Usage:\nustd OPTIONS\n\nOptions:\n"
483 "\t-h\t\tDisplay this usage.\n"
484 "\t-o DIR\t\tSpecify the directory where to output the traces.\n"
485 "\t-s PATH\t\tSpecify the path to use for the daemon socket.\n");
486}
487
488int parse_args(int argc, char **argv)
489{
490 int c;
491
492 while (1) {
493 int option_index = 0;
494 static struct option long_options[] = {
495 {"help", 0, 0, 'h'},
496 {"version", 0, 0, 'V'},
497 {0, 0, 0, 0}
498 };
499
500 c = getopt_long(argc, argv, "hs:o:", long_options, &option_index);
501 if (c == -1)
502 break;
503
504 switch (c) {
505 case 0:
506 printf("option %s", long_options[option_index].name);
507 if (optarg)
508 printf(" with arg %s", optarg);
509 printf("\n");
510 break;
511 case 's':
512 sock_path = optarg;
513 break;
514 case 'o':
515 trace_path = optarg;
516 if(!is_directory(trace_path)) {
517 ERR("Not a valid directory. (%s)", trace_path);
518 return -1;
519 }
520 break;
521 case 'h':
522 usage();
523 exit(0);
524 case 'V':
525 printf("Version 0.0\n");
526 break;
527
528 default:
529 /* unknown option or other error; error is
530 printed by getopt, just return */
531 return -1;
532 }
533 }
534
535 return 0;
536}
537
3158b808
PMF
538void sigterm_handler(int sig)
539{
540 terminate_req = 1;
541}
542
3796af9b
PMF
543int main(int argc, char **argv)
544{
545 struct ustcomm_ustd ustd;
546 int result;
a3cdd4a7 547 sigset_t sigset;
3158b808
PMF
548 struct sigaction sa;
549
550 result = sigemptyset(&sigset);
551 if(result == -1) {
4d70f833 552 PERROR("sigemptyset");
3158b808
PMF
553 return 1;
554 }
555 sa.sa_handler = sigterm_handler;
556 sa.sa_mask = sigset;
557 sa.sa_flags = SA_RESTART;
558 result = sigaction(SIGTERM, &sa, NULL);
559 if(result == -1) {
560 PERROR("sigaction");
561 return 1;
562 }
3796af9b 563
cd226f25
PMF
564 result = parse_args(argc, argv);
565 if(result == -1) {
566 exit(1);
567 }
568
569 result = ustcomm_init_ustd(&ustd, sock_path);
3796af9b
PMF
570 if(result == -1) {
571 ERR("failed to initialize socket");
572 return 1;
573 }
574
3158b808 575 /* setup handler for SIGPIPE */
a3cdd4a7
PMF
576 result = sigemptyset(&sigset);
577 if(result == -1) {
4d70f833 578 PERROR("sigemptyset");
a3cdd4a7
PMF
579 return 1;
580 }
581 result = sigaddset(&sigset, SIGPIPE);
582 if(result == -1) {
4d70f833 583 PERROR("sigaddset");
a3cdd4a7
PMF
584 return 1;
585 }
586 result = sigprocmask(SIG_BLOCK, &sigset, NULL);
587 if(result == -1) {
4d70f833 588 PERROR("sigprocmask");
a3cdd4a7
PMF
589 return 1;
590 }
591
688760ef 592 /* app loop */
3796af9b
PMF
593 for(;;) {
594 char *recvbuf;
595
3a7b90de 596 /* check for requests on our public socket */
688760ef
PMF
597 result = ustcomm_ustd_recv_message(&ustd, &recvbuf, NULL, 100);
598 if(result == -1) {
599 ERR("error in ustcomm_ustd_recv_message");
600 continue;
601 }
602 if(result > 0) {
603 if(!strncmp(recvbuf, "collect", 7)) {
604 pid_t pid;
605 char *bufname;
606 int result;
3796af9b 607
688760ef
PMF
608 result = sscanf(recvbuf, "%*s %d %50as", &pid, &bufname);
609 if(result != 2) {
610 fprintf(stderr, "parsing error: %s\n", recvbuf);
611 }
3796af9b 612
688760ef
PMF
613 result = add_buffer(pid, bufname);
614 if(result < 0) {
615 ERR("error in add_buffer");
616 continue;
617 }
3796af9b
PMF
618 }
619
688760ef 620 free(recvbuf);
3796af9b 621 }
3158b808
PMF
622
623 if(terminate_req) {
624 pthread_mutex_lock(&active_buffers_mutex);
625 if(active_buffers == 0) {
626 pthread_mutex_unlock(&active_buffers_mutex);
627 break;
628 }
629 pthread_mutex_unlock(&active_buffers_mutex);
630 }
3796af9b
PMF
631 }
632
633 return 0;
634}
This page took 0.049364 seconds and 4 git commands to generate.