X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=ltt-kconsumerd%2Fltt-kconsumerd.c;h=a81be55e7c9dbdaed5daf63c2c68d4c6280ac1a8;hp=ceaee716c8e1a8cfc2f331cc0e1c6369f17702f3;hb=558e70e9829c97ecd228516f6e312768be0536d7;hpb=8b270bdb4dccf5571180a9906084126e90eec949 diff --git a/ltt-kconsumerd/ltt-kconsumerd.c b/ltt-kconsumerd/ltt-kconsumerd.c index ceaee716c..a81be55e7 100644 --- a/ltt-kconsumerd/ltt-kconsumerd.c +++ b/ltt-kconsumerd/ltt-kconsumerd.c @@ -36,13 +36,16 @@ #include #include #include +#include -#include "lttngerr.h" -#include "kernelctl.h" -#include "lttkconsumerd.h" +#include +#include +#include +#include +#include /* the two threads (receive fd and poll) */ -pthread_t threads[2]; +static pthread_t threads[2]; /* to count the number of time the user pressed ctrl+c */ static int sigintcount = 0; @@ -52,16 +55,14 @@ int opt_quiet; int opt_verbose; static int opt_daemon; static const char *progname; -char command_sock_path[PATH_MAX]; /* Global command socket path */ -char error_sock_path[PATH_MAX]; /* Global error path */ +static char command_sock_path[PATH_MAX]; /* Global command socket path */ +static char error_sock_path[PATH_MAX]; /* Global error path */ -/* the liblttkconsumerd context */ -struct kconsumerd_local_data *ctx; +/* the liblttngkconsumerd context */ +static struct lttng_kconsumerd_local_data *ctx; /* - * sighandler - * - * Signal handler for the daemon + * Signal handler for the daemon */ static void sighandler(int sig) { @@ -70,13 +71,11 @@ static void sighandler(int sig) return; } - kconsumerd_should_exit(ctx); + lttng_kconsumerd_should_exit(ctx); } /* - * set_signal_handler - * - * Setup signal handler for : + * Setup signal handler for : * SIGINT, SIGTERM, SIGPIPE */ static int set_signal_handler(void) @@ -194,11 +193,9 @@ static void parse_args(int argc, char **argv) } /* - * read_subbuffer - * - * Consume data on a file descriptor and write it on a trace file + * Consume data on a file descriptor and write it on a trace file. */ -static int read_subbuffer(struct kconsumerd_fd *kconsumerd_fd) +static int read_subbuffer(struct lttng_kconsumerd_fd *kconsumerd_fd) { unsigned long len; int err; @@ -210,12 +207,18 @@ static int read_subbuffer(struct kconsumerd_fd *kconsumerd_fd) err = kernctl_get_next_subbuf(infd); if (err != 0) { ret = errno; - perror("Reserving sub buffer failed (everything is normal, " + /* + * This is a debug message even for single-threaded consumer, + * because poll() have more relaxed criterions than get subbuf, + * so get_subbuf may fail for short race windows where poll() + * would issue wakeups. + */ + DBG("Reserving sub buffer failed (everything is normal, " "it is due to concurrency)"); goto end; } - switch (DEFAULT_KERNEL_CHANNEL_OUTPUT) { + switch (kconsumerd_fd->output) { case LTTNG_EVENT_SPLICE: /* read the whole subbuffer */ err = kernctl_get_padded_subbuf_size(infd, &len); @@ -226,7 +229,7 @@ static int read_subbuffer(struct kconsumerd_fd *kconsumerd_fd) } /* splice the subbuffer to the tracefile */ - ret = kconsumerd_on_read_subbuffer_splice(ctx, kconsumerd_fd, len); + ret = lttng_kconsumerd_on_read_subbuffer_splice(ctx, kconsumerd_fd, len); if (ret < 0) { /* * display the error but continue processing to try @@ -244,7 +247,7 @@ static int read_subbuffer(struct kconsumerd_fd *kconsumerd_fd) goto end; } /* write the subbuffer to the tracefile */ - ret = kconsumerd_on_read_subbuffer_mmap(ctx, kconsumerd_fd, len); + ret = lttng_kconsumerd_on_read_subbuffer_mmap(ctx, kconsumerd_fd, len); if (ret < 0) { /* * display the error but continue processing to try @@ -274,6 +277,57 @@ end: return ret; } +static int on_recv_fd(struct lttng_kconsumerd_fd *kconsumerd_fd) +{ + int ret; + + /* Opening the tracefile in write mode */ + if (kconsumerd_fd->path_name != NULL) { + ret = open(kconsumerd_fd->path_name, + O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO); + if (ret < 0) { + ERR("Opening %s", kconsumerd_fd->path_name); + perror("open"); + goto error; + } + kconsumerd_fd->out_fd = ret; + } + + if (kconsumerd_fd->output == LTTNG_EVENT_MMAP) { + /* get the len of the mmap region */ + unsigned long mmap_len; + + ret = kernctl_get_mmap_len(kconsumerd_fd->consumerd_fd, &mmap_len); + if (ret != 0) { + ret = errno; + perror("kernctl_get_mmap_len"); + goto error_close_fd; + } + kconsumerd_fd->mmap_len = (size_t) mmap_len; + + kconsumerd_fd->mmap_base = mmap(NULL, kconsumerd_fd->mmap_len, + PROT_READ, MAP_PRIVATE, kconsumerd_fd->consumerd_fd, 0); + if (kconsumerd_fd->mmap_base == MAP_FAILED) { + perror("Error mmaping"); + ret = -1; + goto error_close_fd; + } + } + + /* we return 0 to let the library handle the FD internally */ + return 0; + +error_close_fd: + { + int err; + + err = close(kconsumerd_fd->out_fd); + assert(!err); + } +error: + return ret; +} + /* * main */ @@ -300,13 +354,13 @@ int main(int argc, char **argv) snprintf(command_sock_path, PATH_MAX, KCONSUMERD_CMD_SOCK_PATH); } - /* create the pipe to wake to receiving thread when needed */ - ctx = kconsumerd_create(read_subbuffer); + /* create the consumer instance with and assign the callbacks */ + ctx = lttng_kconsumerd_create(read_subbuffer, on_recv_fd, NULL); if (ctx == NULL) { goto error; } - kconsumerd_set_command_socket_path(ctx, command_sock_path); + lttng_kconsumerd_set_command_sock_path(ctx, command_sock_path); if (strlen(error_sock_path) == 0) { snprintf(error_sock_path, PATH_MAX, KCONSUMERD_ERR_SOCK_PATH); @@ -323,10 +377,10 @@ int main(int argc, char **argv) if (ret < 0) { WARN("Cannot connect to error socket, is ltt-sessiond started ?"); } - kconsumerd_set_error_socket(ctx, ret); + lttng_kconsumerd_set_error_sock(ctx, ret); /* Create the thread to manage the receive of fd */ - ret = pthread_create(&threads[0], NULL, kconsumerd_thread_receive_fds, + ret = pthread_create(&threads[0], NULL, lttng_kconsumerd_thread_receive_fds, (void *) ctx); if (ret != 0) { perror("pthread_create"); @@ -334,7 +388,7 @@ int main(int argc, char **argv) } /* Create thread to manage the polling/writing of traces */ - ret = pthread_create(&threads[1], NULL, kconsumerd_thread_poll_fds, + ret = pthread_create(&threads[1], NULL, lttng_kconsumerd_thread_poll_fds, (void *) ctx); if (ret != 0) { perror("pthread_create"); @@ -349,16 +403,16 @@ int main(int argc, char **argv) } } ret = EXIT_SUCCESS; - kconsumerd_send_error(ctx, KCONSUMERD_EXIT_SUCCESS); + lttng_kconsumerd_send_error(ctx, KCONSUMERD_EXIT_SUCCESS); goto end; error: ret = EXIT_FAILURE; - kconsumerd_send_error(ctx, KCONSUMERD_EXIT_FAILURE); + lttng_kconsumerd_send_error(ctx, KCONSUMERD_EXIT_FAILURE); end: - kconsumerd_destroy(ctx); - kconsumerd_cleanup(); + lttng_kconsumerd_destroy(ctx); + lttng_kconsumerd_cleanup(); return ret; }