X-Git-Url: http://git.lttng.org/?a=blobdiff_plain;f=ltt%2Fbranches%2Fpoly%2Flttd%2Flttd.c;h=27240882189de464882d2f2ab1d64fa674fc8f29;hb=cdad978721add90e7f10411e68795099bf87946e;hp=fc4e4472e09806d4b698e2c613c6dec8022b1fa2;hpb=90ccaa9add7209dac24ed9b407a04fd9304f5762;p=ltt-control.git diff --git a/ltt/branches/poly/lttd/lttd.c b/ltt/branches/poly/lttd/lttd.c index fc4e447..2724088 100644 --- a/ltt/branches/poly/lttd/lttd.c +++ b/ltt/branches/poly/lttd/lttd.c @@ -10,16 +10,41 @@ * Mathieu Desnoyers */ +#ifdef HAVE_CONFIG_H +#include +#endif + +#define _REENTRANT +#define _GNU_SOURCE +#include #include +#include #include #include #include -#include #include #include #include #include #include +#include +#include +#include + +/* Relayfs IOCTL */ +#include +#include + +/* Get the next sub buffer that can be read. */ +#define RELAYFS_GET_SUBBUF _IOR(0xF4, 0x00,__u32) +/* Release the oldest reserved (by "get") sub buffer. */ +#define RELAYFS_PUT_SUBBUF _IOW(0xF4, 0x01,__u32) +/* returns the number of sub buffers in the per cpu channel. */ +#define RELAYFS_GET_N_SUBBUFS _IOR(0xF4, 0x02,__u32) +/* returns the size of the sub buffers. */ +#define RELAYFS_GET_SUBBUF_SIZE _IOR(0xF4, 0x03,__u32) + + enum { GET_SUBBUF, @@ -31,6 +56,10 @@ enum { struct fd_pair { int channel; int trace; + unsigned int n_subbufs; + unsigned int subbuf_size; + void *mmap; + pthread_mutex_t mutex; }; struct channel_trace_fd { @@ -42,6 +71,8 @@ static char *trace_name = NULL; static char *channel_name = NULL; static int daemon_mode = 0; static int append_mode = 0; +static unsigned long num_threads = 1; +volatile static int quit_program = 0; /* For signal handler */ /* Args : * @@ -49,6 +80,7 @@ static int append_mode = 0; * -c directory Root directory of the relayfs trace channels. * -d Run in background (daemon). * -a Trace append mode. + * -s Send SIGUSR1 to parent when ready for IO. */ void show_arguments(void) { @@ -59,6 +91,7 @@ void show_arguments(void) printf("-c directory Root directory of the relayfs trace channels.\n"); printf("-d Run in background (daemon).\n"); printf("-a Append to an possibly existing trace.\n"); + printf("-N Number of threads to start.\n"); printf("\n"); } @@ -104,6 +137,12 @@ int parse_arguments(int argc, char **argv) case 'a': append_mode = 1; break; + case 'N': + if(argn+1 < argc) { + num_threads = strtoul(argv[argn+1], NULL, 0); + argn++; + } + break; default: printf("Invalid argument '%s'.\n", argv[argn]); printf("\n"); @@ -143,6 +182,16 @@ void show_info(void) } +/* signal handling */ + +static void handler(int signo) +{ + printf("Signal %d received : exiting cleanly\n", signo); + quit_program = 1; +} + + + int open_channel_trace_pairs(char *subchannel_name, char *subtrace_name, struct channel_trace_fd *fd_pairs) { @@ -156,22 +205,21 @@ int open_channel_trace_pairs(char *subchannel_name, char *subtrace_name, char path_trace[PATH_MAX]; int path_trace_len; char *path_trace_ptr; + int open_ret = 0; if(channel_dir == NULL) { perror(subchannel_name); - return ENOENT; + open_ret = ENOENT; + goto end; } - //FIXME : check if the directory already exist, and ask the user if he wants - //to append to the traces. printf("Creating trace subdirectory %s\n", subtrace_name); ret = mkdir(subtrace_name, S_IRWXU|S_IRWXG|S_IRWXO); if(ret == -1) { - if(errno == EEXIST && append_mode) { - printf("Appending to directory %s as resquested\n", subtrace_name); - } else { + if(errno != EEXIST) { perror(subtrace_name); - return -1; + open_ret = -1; + goto end; } } @@ -218,12 +266,14 @@ int open_channel_trace_pairs(char *subchannel_name, char *subtrace_name, open(path_channel, O_RDONLY | O_NONBLOCK); if(fd_pairs->pair[fd_pairs->num_pairs-1].channel == -1) { perror(path_channel); + fd_pairs->num_pairs--; + continue; } /* Open the trace in write mode, only append if append_mode */ ret = stat(path_trace, &stat_buf); if(ret == 0) { if(append_mode) { - printf("Appending to file %s as resquested\n", path_trace); + printf("Appending to file %s as requested\n", path_trace); fd_pairs->pair[fd_pairs->num_pairs-1].trace = open(path_trace, O_WRONLY|O_APPEND, @@ -234,7 +284,8 @@ int open_channel_trace_pairs(char *subchannel_name, char *subtrace_name, } } else { printf("File %s exists, cannot open. Try append mode.\n", path_trace); - return -1; + open_ret = -1; + goto end; } } else { if(errno == ENOENT) { @@ -249,18 +300,164 @@ int open_channel_trace_pairs(char *subchannel_name, char *subtrace_name, } } +end: closedir(channel_dir); - return 0; + return open_ret; +} + + +int read_subbuffer(struct fd_pair *pair) +{ + unsigned int consumed_old; + int err, ret=0; + + + err = ioctl(pair->channel, RELAYFS_GET_SUBBUF, + &consumed_old); + printf("cookie : %u\n", consumed_old); + if(err != 0) { + ret = errno; + perror("Reserving sub buffer failed (everything is normal)"); + goto get_error; + } + + err = TEMP_FAILURE_RETRY(write(pair->trace, + pair->mmap + + (consumed_old & ((pair->n_subbufs * pair->subbuf_size)-1)), + pair->subbuf_size)); + + if(err < 0) { + ret = errno; + perror("Error in writing to file"); + goto write_error; + } + + +write_error: + err = ioctl(pair->channel, RELAYFS_PUT_SUBBUF, &consumed_old); + if(err != 0) { + ret = errno; + if(errno == -EFAULT) { + perror("Error in unreserving sub buffer\n"); + } else if(errno == -EIO) { + perror("Reader has been pushed by the writer, last subbuffer corrupted."); + /* FIXME : we may delete the last written buffer if we wish. */ + } + goto get_error; + } + +get_error: + return ret; +} + + + +int map_channels(struct channel_trace_fd *fd_pairs) +{ + int i,j; + int ret=0; + + if(fd_pairs->num_pairs <= 0) { + printf("No channel to read\n"); + goto end; + } + + /* Get the subbuf sizes and number */ + + for(i=0;inum_pairs;i++) { + struct fd_pair *pair = &fd_pairs->pair[i]; + + ret = ioctl(pair->channel, RELAYFS_GET_N_SUBBUFS, + &pair->n_subbufs); + if(ret != 0) { + perror("Error in getting the number of subbuffers"); + goto end; + } + ret = ioctl(pair->channel, RELAYFS_GET_SUBBUF_SIZE, + &pair->subbuf_size); + if(ret != 0) { + perror("Error in getting the size of the subbuffers"); + goto end; + } + ret = pthread_mutex_init(&pair->mutex, NULL); /* Fast mutex */ + if(ret != 0) { + perror("Error in mutex init"); + goto end; + } + } + + /* Mmap each FD */ + for(i=0;inum_pairs;i++) { + struct fd_pair *pair = &fd_pairs->pair[i]; + + pair->mmap = mmap(0, pair->subbuf_size * pair->n_subbufs, PROT_READ, + MAP_SHARED, pair->channel, 0); + if(pair->mmap == MAP_FAILED) { + perror("Mmap error"); + goto munmap; + } + } + + goto end; /* success */ + + /* Error handling */ + /* munmap only the successfully mmapped indexes */ +munmap: + /* Munmap each FD */ + for(j=0;jpair[j]; + int err_ret; + + err_ret = munmap(pair->mmap, pair->subbuf_size * pair->n_subbufs); + if(err_ret != 0) { + perror("Error in munmap"); + } + ret |= err_ret; + } + +end: + return ret; + + +} + + +int unmap_channels(struct channel_trace_fd *fd_pairs) +{ + int j; + int ret=0; + + /* Munmap each FD */ + for(j=0;jnum_pairs;j++) { + struct fd_pair *pair = &fd_pairs->pair[j]; + int err_ret; + + err_ret = munmap(pair->mmap, pair->subbuf_size * pair->n_subbufs); + if(err_ret != 0) { + perror("Error in munmap"); + } + ret |= err_ret; + err_ret = pthread_mutex_destroy(&pair->mutex); + if(err_ret != 0) { + perror("Error in mutex destroy"); + } + ret |= err_ret; + } + + return ret; } + /* read_channels * - * Read the realyfs channels and write them in the paired tracefiles. + * Thread worker. + * + * Read the relayfs channels and write them in the paired tracefiles. * * @fd_pairs : paired channels and trace files. * - * returns 0 on success, -1 on error. + * returns (void*)0 on success, (void*)-1 on error. * * Note that the high priority polled channels are consumed first. We then poll * again to see if these channels are still in priority. Only when no @@ -270,25 +467,42 @@ int open_channel_trace_pairs(char *subchannel_name, char *subtrace_name, * full. */ -int read_channels(struct channel_trace_fd *fd_pairs) +void * read_channels(void *arg) { struct pollfd *pollfd; - int i; - int num_rdy; + int i,j; + int num_rdy, num_hup; + int high_prio; + int ret = 0; + struct channel_trace_fd *fd_pairs = (struct channel_trace_fd *)arg; + /* Start polling the FD */ + pollfd = malloc(fd_pairs->num_pairs * sizeof(struct pollfd)); + /* Note : index in pollfd is the same index as fd_pair->pair */ for(i=0;inum_pairs;i++) { pollfd[i].fd = fd_pairs->pair[i].channel; pollfd[i].events = POLLIN|POLLPRI; } while(1) { + high_prio = 0; + num_hup = 0; +#ifdef DEBUG + printf("Press a key for next poll...\n"); + char buf[1]; + read(STDIN_FILENO, &buf, 1); + printf("Next poll (polling %d fd) :\n", fd_pairs->num_pairs); +#endif //DEBUG + + /* Have we received a signal ? */ + if(quit_program) break; num_rdy = poll(pollfd, fd_pairs->num_pairs, -1); if(num_rdy == -1) { perror("Poll error"); - return -1; + goto free_fd; } printf("Data received\n"); @@ -297,32 +511,62 @@ int read_channels(struct channel_trace_fd *fd_pairs) switch(pollfd[i].revents) { case POLLERR: printf("Error returned in polling fd %d.\n", pollfd[i].fd); + num_hup++; break; case POLLHUP: printf("Polling fd %d tells it has hung up.\n", pollfd[i].fd); + num_hup++; break; case POLLNVAL: printf("Polling fd %d tells fd is not open.\n", pollfd[i].fd); + num_hup++; break; case POLLPRI: - /* Take care of high priority channels first. */ + if(pthread_mutex_trylock(&fd_pairs->pair[i].mutex) == 0) { + printf("Urgent read on fd %d\n", pollfd[i].fd); + /* Take care of high priority channels first. */ + high_prio = 1; + /* it's ok to have an unavailable subbuffer */ + ret = read_subbuffer(&fd_pairs->pair[i]); + if(ret == -EAGAIN) ret = 0; + + ret = pthread_mutex_unlock(&fd_pairs->pair[i].mutex); + if(ret) + printf("Error in mutex unlock : %s\n", strerror(ret)); + } break; - default: + } } - - for(i=0;inum_pairs;i++) { - switch(pollfd[i].revents) { - case POLLIN: - /* Take care of low priority channels. */ - break; - default: + /* If every FD has hung up, we end the read loop here */ + if(num_hup == fd_pairs->num_pairs) break; + + if(!high_prio) { + for(i=0;inum_pairs;i++) { + switch(pollfd[i].revents) { + case POLLIN: + if(pthread_mutex_trylock(&fd_pairs->pair[i].mutex) == 0) { + /* Take care of low priority channels. */ + printf("Normal read on fd %d\n", pollfd[i].fd); + /* it's ok to have an unavailable subbuffer */ + ret = read_subbuffer(&fd_pairs->pair[i]); + if(ret == -EAGAIN) ret = 0; + + ret = pthread_mutex_unlock(&fd_pairs->pair[i].mutex); + if(ret) + printf("Error in mutex unlock : %s\n", strerror(ret)); + } + break; + } + } } } +free_fd: free(pollfd); - return 0; +end: + return (void*)ret; } @@ -342,9 +586,12 @@ void close_channel_trace_pairs(struct channel_trace_fd *fd_pairs) int main(int argc, char ** argv) { - int ret; - pid_t pid; + int ret = 0; struct channel_trace_fd fd_pairs = { NULL, 0 }; + struct sigaction act; + pthread_t *tids; + unsigned int i; + void *tret; ret = parse_arguments(argc, argv); @@ -355,23 +602,55 @@ int main(int argc, char ** argv) show_info(); if(daemon_mode) { - pid = fork(); - - if(pid > 0) { - /* parent */ - return 0; - } else if(pid < 0) { - /* error */ - printf("An error occured while forking.\n"); - return -1; - } - /* else, we are the child, continue... */ - } + ret = daemon(0, 0); + + if(ret == -1) { + perror("An error occured while daemonizing."); + exit(-1); + } + } + + /* Connect the signal handlers */ + act.sa_handler = handler; + act.sa_flags = 0; + sigemptyset(&(act.sa_mask)); + sigaddset(&(act.sa_mask), SIGTERM); + sigaddset(&(act.sa_mask), SIGQUIT); + sigaddset(&(act.sa_mask), SIGINT); + sigaction(SIGTERM, &act, NULL); + sigaction(SIGQUIT, &act, NULL); + sigaction(SIGINT, &act, NULL); + if(ret = open_channel_trace_pairs(channel_name, trace_name, &fd_pairs)) goto close_channel; - ret = read_channels(&fd_pairs); + if(ret = map_channels(&fd_pairs)) + goto close_channel; + + tids = malloc(sizeof(pthread_t) * num_threads); + for(i=0; i