X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=ltt-kconsumerd%2Fltt-kconsumerd.c;h=ceaee716c8e1a8cfc2f331cc0e1c6369f17702f3;hp=ade19399da97931fb06c32b99ddc70bd02481305;hb=8b270bdb4dccf5571180a9906084126e90eec949;hpb=82a3637f639486c07ff937ab03e1e9532379d26a diff --git a/ltt-kconsumerd/ltt-kconsumerd.c b/ltt-kconsumerd/ltt-kconsumerd.c index ade19399d..ceaee716c 100644 --- a/ltt-kconsumerd/ltt-kconsumerd.c +++ b/ltt-kconsumerd/ltt-kconsumerd.c @@ -38,8 +38,8 @@ #include #include "lttngerr.h" -#include "libkernelctl.h" -#include "liblttkconsumerd.h" +#include "kernelctl.h" +#include "lttkconsumerd.h" /* the two threads (receive fd and poll) */ pthread_t threads[2]; @@ -55,6 +55,9 @@ static const char *progname; char command_sock_path[PATH_MAX]; /* Global command socket path */ char error_sock_path[PATH_MAX]; /* Global error path */ +/* the liblttkconsumerd context */ +struct kconsumerd_local_data *ctx; + /* * sighandler * @@ -67,7 +70,7 @@ static void sighandler(int sig) return; } - kconsumerd_should_exit(); + kconsumerd_should_exit(ctx); } /* @@ -190,6 +193,86 @@ static void parse_args(int argc, char **argv) } } +/* + * read_subbuffer + * + * Consume data on a file descriptor and write it on a trace file + */ +static int read_subbuffer(struct kconsumerd_fd *kconsumerd_fd) +{ + unsigned long len; + int err; + long ret = 0; + int infd = kconsumerd_fd->consumerd_fd; + + DBG("In kconsumerd_read_subbuffer (infd : %d)", infd); + /* Get the next subbuffer */ + err = kernctl_get_next_subbuf(infd); + if (err != 0) { + ret = errno; + perror("Reserving sub buffer failed (everything is normal, " + "it is due to concurrency)"); + goto end; + } + + switch (DEFAULT_KERNEL_CHANNEL_OUTPUT) { + case LTTNG_EVENT_SPLICE: + /* read the whole subbuffer */ + err = kernctl_get_padded_subbuf_size(infd, &len); + if (err != 0) { + ret = errno; + perror("Getting sub-buffer len failed."); + goto end; + } + + /* splice the subbuffer to the tracefile */ + ret = kconsumerd_on_read_subbuffer_splice(ctx, kconsumerd_fd, len); + if (ret < 0) { + /* + * display the error but continue processing to try + * to release the subbuffer + */ + ERR("Error splicing to tracefile"); + } + break; + case LTTNG_EVENT_MMAP: + /* read the used subbuffer size */ + err = kernctl_get_padded_subbuf_size(infd, &len); + if (err != 0) { + ret = errno; + perror("Getting sub-buffer len failed."); + goto end; + } + /* write the subbuffer to the tracefile */ + ret = kconsumerd_on_read_subbuffer_mmap(ctx, kconsumerd_fd, len); + if (ret < 0) { + /* + * display the error but continue processing to try + * to release the subbuffer + */ + ERR("Error writing to tracefile"); + } + break; + default: + ERR("Unknown output method"); + ret = -1; + } + + err = kernctl_put_next_subbuf(infd); + if (err != 0) { + ret = errno; + if (errno == EFAULT) { + perror("Error in unreserving sub buffer\n"); + } else if (errno == EIO) { + /* Should never happen with newer LTTng versions */ + perror("Reader has been pushed by the writer, last sub-buffer corrupted."); + } + goto end; + } + +end: + return ret; +} /* * main @@ -217,7 +300,13 @@ int main(int argc, char **argv) snprintf(command_sock_path, PATH_MAX, KCONSUMERD_CMD_SOCK_PATH); } - kconsumerd_set_command_socket_path(command_sock_path); + /* create the pipe to wake to receiving thread when needed */ + ctx = kconsumerd_create(read_subbuffer); + if (ctx == NULL) { + goto error; + } + + kconsumerd_set_command_socket_path(ctx, command_sock_path); if (strlen(error_sock_path) == 0) { snprintf(error_sock_path, PATH_MAX, KCONSUMERD_ERR_SOCK_PATH); @@ -227,13 +316,6 @@ int main(int argc, char **argv) goto error; } - /* create the pipe to wake to polling thread when needed */ - ret = kconsumerd_create_poll_pipe(); - if (ret < 0) { - perror("Error creating poll pipe"); - goto end; - } - /* Connect to the socket created by ltt-sessiond to report errors */ DBG("Connecting to error socket %s", error_sock_path); ret = lttcomm_connect_unix_sock(error_sock_path); @@ -241,11 +323,11 @@ int main(int argc, char **argv) if (ret < 0) { WARN("Cannot connect to error socket, is ltt-sessiond started ?"); } - kconsumerd_set_error_socket(ret); + kconsumerd_set_error_socket(ctx, ret); /* Create the thread to manage the receive of fd */ ret = pthread_create(&threads[0], NULL, kconsumerd_thread_receive_fds, - (void *) NULL); + (void *) ctx); if (ret != 0) { perror("pthread_create"); goto error; @@ -253,7 +335,7 @@ int main(int argc, char **argv) /* Create thread to manage the polling/writing of traces */ ret = pthread_create(&threads[1], NULL, kconsumerd_thread_poll_fds, - (void *) NULL); + (void *) ctx); if (ret != 0) { perror("pthread_create"); goto error; @@ -267,14 +349,15 @@ int main(int argc, char **argv) } } ret = EXIT_SUCCESS; - kconsumerd_send_error(KCONSUMERD_EXIT_SUCCESS); + kconsumerd_send_error(ctx, KCONSUMERD_EXIT_SUCCESS); goto end; error: ret = EXIT_FAILURE; - kconsumerd_send_error(KCONSUMERD_EXIT_FAILURE); + kconsumerd_send_error(ctx, KCONSUMERD_EXIT_FAILURE); end: + kconsumerd_destroy(ctx); kconsumerd_cleanup(); return ret;