X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=liblttngkconsumerd%2Flttngkconsumerd.c;h=1893e0ae66a2f110150b7e6aa219d32771c0b0c6;hp=ba26026c4458cb955ce70a5c2727a8b3b7413e48;hb=26d988bc6ff8b009808219710e7970d7627b8ff6;hpb=6533b585a3a53a0b52c2da14baec5e874d1bf3bb diff --git a/liblttngkconsumerd/lttngkconsumerd.c b/liblttngkconsumerd/lttngkconsumerd.c index ba26026c4..1893e0ae6 100644 --- a/liblttngkconsumerd/lttngkconsumerd.c +++ b/liblttngkconsumerd/lttngkconsumerd.c @@ -18,6 +18,7 @@ */ #define _GNU_SOURCE +#include #include #include #include @@ -27,13 +28,11 @@ #include #include #include -#include +#include +#include #include - -#include "kernelctl.h" -#include "lttngerr.h" -#include "lttng-sessiond-comm.h" +#include static struct lttng_kconsumerd_global_data { /* @@ -125,22 +124,21 @@ static void kconsumerd_del_fd(struct lttng_kconsumerd_fd *lcf) } /* - * Add a fd to the global list protected by a mutex. + * Create a struct lttcomm_kconsumerd_msg from the + * information received on the receiving socket */ -static int kconsumerd_add_fd(struct lttcomm_kconsumerd_msg *buf, +struct lttng_kconsumerd_fd *kconsumerd_allocate_fd( + struct lttcomm_kconsumerd_msg *buf, int consumerd_fd) { struct lttng_kconsumerd_fd *tmp_fd; - int ret = 0; - pthread_mutex_lock(&kconsumerd_data.lock); - /* Check if already exist */ - ret = kconsumerd_find_session_fd(buf->fd); - if (ret == 1) { + tmp_fd = malloc(sizeof(struct lttng_kconsumerd_fd)); + if (tmp_fd == NULL) { + perror("malloc struct lttng_kconsumerd_fd"); goto end; } - tmp_fd = malloc(sizeof(struct lttng_kconsumerd_fd)); tmp_fd->sessiond_fd = buf->fd; tmp_fd->consumerd_fd = consumerd_fd; tmp_fd->state = buf->state; @@ -152,42 +150,31 @@ static int kconsumerd_add_fd(struct lttcomm_kconsumerd_msg *buf, tmp_fd->output = buf->output; strncpy(tmp_fd->path_name, buf->path_name, PATH_MAX); tmp_fd->path_name[PATH_MAX - 1] = '\0'; + DBG("Allocated %s (sessiond_fd %d, consumerd_fd %d, out_fd %d)", + tmp_fd->path_name, tmp_fd->sessiond_fd, + tmp_fd->consumerd_fd, tmp_fd->out_fd); - /* Opening the tracefile in write mode */ - if (tmp_fd->path_name != NULL) { - ret = open(tmp_fd->path_name, - O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO); - if (ret < 0) { - ERR("Opening %s", tmp_fd->path_name); - perror("open"); - goto end; - } - tmp_fd->out_fd = ret; - DBG("Adding %s (%d, %d, %d)", tmp_fd->path_name, - tmp_fd->sessiond_fd, tmp_fd->consumerd_fd, tmp_fd->out_fd); - } +end: + return tmp_fd; +} - if (tmp_fd->output == LTTNG_EVENT_MMAP) { - /* get the len of the mmap region */ - ret = kernctl_get_mmap_len(tmp_fd->consumerd_fd, &tmp_fd->mmap_len); - if (ret != 0) { - ret = errno; - perror("kernctl_get_mmap_len"); - goto end; - } +/* + * Add a fd to the global list protected by a mutex. + */ +static int kconsumerd_add_fd(struct lttng_kconsumerd_fd *tmp_fd) +{ + int ret; - tmp_fd->mmap_base = mmap(NULL, tmp_fd->mmap_len, - PROT_READ, MAP_PRIVATE, tmp_fd->consumerd_fd, 0); - if (tmp_fd->mmap_base == MAP_FAILED) { - perror("Error mmaping"); - ret = -1; - goto end; - } + pthread_mutex_lock(&kconsumerd_data.lock); + /* Check if already exist */ + ret = kconsumerd_find_session_fd(tmp_fd->sessiond_fd); + if (ret == 1) { + goto end; } - cds_list_add(&tmp_fd->list, &kconsumerd_data.fd_list.head); kconsumerd_data.fds_count++; kconsumerd_data.need_update = 1; + end: pthread_mutex_unlock(&kconsumerd_data.lock); return ret; @@ -258,11 +245,16 @@ static int kconsumerd_consumerd_recv_fd( enum lttng_kconsumerd_command cmd_type) { struct iovec iov[1]; - int ret = 0, i, tmp2; + int ret = 0, i, j, tmp2; struct cmsghdr *cmsg; int nb_fd; char recv_fd[CMSG_SPACE(sizeof(int))]; struct lttcomm_kconsumerd_msg lkm; + struct lttng_kconsumerd_fd *new_fd; + union { + unsigned char vc[4]; + int vi; + } tmp; /* the number of fds we are about to receive */ nb_fd = size / sizeof(struct lttcomm_kconsumerd_msg); @@ -310,17 +302,37 @@ static int kconsumerd_consumerd_recv_fd( if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) { switch (cmd_type) { case ADD_STREAM: - DBG("kconsumerd_add_fd %s (%d)", lkm.path_name, - ((int *) CMSG_DATA(cmsg))[0]); - - ret = kconsumerd_add_fd(&lkm, ((int *) CMSG_DATA(cmsg))[0]); - if (ret < 0) { + for (j = 0; j < sizeof(int); j++) + tmp.vc[j] = CMSG_DATA(cmsg)[j]; + DBG("kconsumerd_add_fd %s (%d)", lkm.path_name, tmp.vi); + new_fd = kconsumerd_allocate_fd(&lkm, tmp.vi); + if (new_fd == NULL) { lttng_kconsumerd_send_error(ctx, KCONSUMERD_OUTFD_ERROR); goto end; } + + if (ctx->on_recv_fd != NULL) { + ret = ctx->on_recv_fd(new_fd); + if (ret == 0) { + kconsumerd_add_fd(new_fd); + } else if (ret < 0) { + goto end; + } + } else { + kconsumerd_add_fd(new_fd); + } break; case UPDATE_STREAM: - kconsumerd_change_fd_state(lkm.fd, lkm.state); + if (ctx->on_update_fd != NULL) { + ret = ctx->on_update_fd(lkm.fd, lkm.state); + if (ret == 0) { + kconsumerd_change_fd_state(lkm.fd, lkm.state); + } else if (ret < 0) { + goto end; + } + } else { + kconsumerd_change_fd_state(lkm.fd, lkm.state); + } break; default: break; @@ -361,6 +373,42 @@ void lttng_kconsumerd_set_command_sock_path( ctx->kconsumerd_command_sock_path = sock; } +static void lttng_kconsumerd_sync_trace_file( + struct lttng_kconsumerd_fd *kconsumerd_fd, off_t orig_offset) +{ + int outfd = kconsumerd_fd->out_fd; + /* + * This does a blocking write-and-wait on any page that belongs to the + * subbuffer prior to the one we just wrote. + * Don't care about error values, as these are just hints and ways to + * limit the amount of page cache used. + */ + if (orig_offset >= kconsumerd_fd->max_sb_size) { + sync_file_range(outfd, orig_offset - kconsumerd_fd->max_sb_size, + kconsumerd_fd->max_sb_size, + SYNC_FILE_RANGE_WAIT_BEFORE + | SYNC_FILE_RANGE_WRITE + | SYNC_FILE_RANGE_WAIT_AFTER); + /* + * Give hints to the kernel about how we access the file: + * POSIX_FADV_DONTNEED : we won't re-access data in a near future after + * we write it. + * + * We need to call fadvise again after the file grows because the + * kernel does not seem to apply fadvise to non-existing parts of the + * file. + * + * Call fadvise _after_ having waited for the page writeback to + * complete because the dirty page writeback semantic is not well + * defined. So it can be expected to lead to lower throughput in + * streaming. + */ + posix_fadvise(outfd, orig_offset - kconsumerd_fd->max_sb_size, + kconsumerd_fd->max_sb_size, POSIX_FADV_DONTNEED); + } +} + + /* * Mmap the ring buffer, read it and write the data to the tracefile. * @@ -371,7 +419,6 @@ int lttng_kconsumerd_on_read_subbuffer_mmap( struct lttng_kconsumerd_fd *kconsumerd_fd, unsigned long len) { unsigned long mmap_offset; - char *padding = NULL; long ret = 0; off_t orig_offset = kconsumerd_fd->out_fd_offset; int fd = kconsumerd_fd->consumerd_fd; @@ -400,42 +447,11 @@ int lttng_kconsumerd_on_read_subbuffer_mmap( kconsumerd_fd->out_fd_offset += ret; } - /* - * This does a blocking write-and-wait on any page that belongs to the - * subbuffer prior to the one we just wrote. - * Don't care about error values, as these are just hints and ways to - * limit the amount of page cache used. - */ - if (orig_offset >= kconsumerd_fd->max_sb_size) { - sync_file_range(outfd, orig_offset - kconsumerd_fd->max_sb_size, - kconsumerd_fd->max_sb_size, - SYNC_FILE_RANGE_WAIT_BEFORE - | SYNC_FILE_RANGE_WRITE - | SYNC_FILE_RANGE_WAIT_AFTER); + lttng_kconsumerd_sync_trace_file(kconsumerd_fd, orig_offset); - /* - * Give hints to the kernel about how we access the file: - * POSIX_FADV_DONTNEED : we won't re-access data in a near future after - * we write it. - * - * We need to call fadvise again after the file grows because the - * kernel does not seem to apply fadvise to non-existing parts of the - * file. - * - * Call fadvise _after_ having waited for the page writeback to - * complete because the dirty page writeback semantic is not well - * defined. So it can be expected to lead to lower throughput in - * streaming. - */ - posix_fadvise(outfd, orig_offset - kconsumerd_fd->max_sb_size, - kconsumerd_fd->max_sb_size, POSIX_FADV_DONTNEED); - } goto end; end: - if (padding != NULL) { - free(padding); - } return ret; } @@ -474,44 +490,14 @@ int lttng_kconsumerd_on_read_subbuffer_splice( perror("Error in file splice"); goto splice_error; } - if (ret >= len) { - len = 0; - } + len -= ret; /* This won't block, but will start writeout asynchronously */ sync_file_range(outfd, kconsumerd_fd->out_fd_offset, ret, SYNC_FILE_RANGE_WRITE); kconsumerd_fd->out_fd_offset += ret; } + lttng_kconsumerd_sync_trace_file(kconsumerd_fd, orig_offset); - /* - * This does a blocking write-and-wait on any page that belongs to the - * subbuffer prior to the one we just wrote. - * Don't care about error values, as these are just hints and ways to - * limit the amount of page cache used. - */ - if (orig_offset >= kconsumerd_fd->max_sb_size) { - sync_file_range(outfd, orig_offset - kconsumerd_fd->max_sb_size, - kconsumerd_fd->max_sb_size, - SYNC_FILE_RANGE_WAIT_BEFORE - | SYNC_FILE_RANGE_WRITE - | SYNC_FILE_RANGE_WAIT_AFTER); - /* - * Give hints to the kernel about how we access the file: - * POSIX_FADV_DONTNEED : we won't re-access data in a near future after - * we write it. - * - * We need to call fadvise again after the file grows because the - * kernel does not seem to apply fadvise to non-existing parts of the - * file. - * - * Call fadvise _after_ having waited for the page writeback to - * complete because the dirty page writeback semantic is not well - * defined. So it can be expected to lead to lower throughput in - * streaming. - */ - posix_fadvise(outfd, orig_offset - kconsumerd_fd->max_sb_size, - kconsumerd_fd->max_sb_size, POSIX_FADV_DONTNEED); - } goto end; splice_error: @@ -535,6 +521,48 @@ end: return ret; } +/* + * Take a snapshot for a specific fd + * + * Returns 0 on success, < 0 on error + */ +int lttng_kconsumerd_take_snapshot(struct lttng_kconsumerd_local_data *ctx, + struct lttng_kconsumerd_fd *kconsumerd_fd) +{ + int ret = 0; + int infd = kconsumerd_fd->consumerd_fd; + + ret = kernctl_snapshot(infd); + if (ret != 0) { + ret = errno; + perror("Getting sub-buffer snapshot."); + } + + return ret; +} + +/* + * Get the produced position + * + * Returns 0 on success, < 0 on error + */ +int lttng_kconsumerd_get_produced_snapshot( + struct lttng_kconsumerd_local_data *ctx, + struct lttng_kconsumerd_fd *kconsumerd_fd, + unsigned long *pos) +{ + int ret; + int infd = kconsumerd_fd->consumerd_fd; + + ret = kernctl_snapshot_get_produced(infd, pos); + if (ret != 0) { + ret = errno; + perror("kernctl_snapshot_get_produced"); + } + + return ret; +} + /* * Poll on the should_quit pipe and the command socket return -1 on error and * should exit, 0 if data is available on the command socket @@ -738,42 +766,64 @@ end: * Returns a pointer to the new context or NULL on error. */ struct lttng_kconsumerd_local_data *lttng_kconsumerd_create( - int (*buffer_ready)(struct lttng_kconsumerd_fd *kconsumerd_fd)) + int (*buffer_ready)(struct lttng_kconsumerd_fd *kconsumerd_fd), + int (*recv_fd)(struct lttng_kconsumerd_fd *kconsumerd_fd), + int (*update_fd)(int sessiond_fd, uint32_t state)) { - int ret; + int ret, i; struct lttng_kconsumerd_local_data *ctx; ctx = malloc(sizeof(struct lttng_kconsumerd_local_data)); if (ctx == NULL) { perror("allocating context"); - goto end; + goto error; } + ctx->kconsumerd_error_socket = -1; + /* assign the callbacks */ ctx->on_buffer_ready = buffer_ready; + ctx->on_recv_fd = recv_fd; + ctx->on_update_fd = update_fd; ret = pipe(ctx->kconsumerd_poll_pipe); if (ret < 0) { perror("Error creating poll pipe"); - ctx = NULL; - goto end; + goto error_poll_pipe; } ret = pipe(ctx->kconsumerd_should_quit); if (ret < 0) { perror("Error creating recv pipe"); - ctx = NULL; - goto end; + goto error_quit_pipe; } ret = pipe(ctx->kconsumerd_thread_pipe); if (ret < 0) { perror("Error creating thread pipe"); - ctx = NULL; - goto end; + goto error_thread_pipe; } -end: return ctx; + + +error_thread_pipe: + for (i = 0; i < 2; i++) { + int err; + + err = close(ctx->kconsumerd_should_quit[i]); + assert(!err); + } +error_quit_pipe: + for (i = 0; i < 2; i++) { + int err; + + err = close(ctx->kconsumerd_poll_pipe[i]); + assert(!err); + } +error_poll_pipe: + free(ctx); +error: + return NULL; } /* @@ -824,6 +874,7 @@ void *lttng_kconsumerd_thread_receive_fds(void *data) DBG("Sending ready command to ltt-sessiond"); ret = lttng_kconsumerd_send_error(ctx, KCONSUMERD_COMMAND_SOCK_READY); + /* return < 0 on error, but == 0 is not fatal */ if (ret < 0) { ERR("Error sending ready command to ltt-sessiond"); goto end; @@ -952,6 +1003,7 @@ void lttng_kconsumerd_should_exit(struct lttng_kconsumerd_local_data *ctx) /* * Send return code to the session daemon. + * If the socket is not defined, we return 0, it is not a fatal error */ int lttng_kconsumerd_send_error( struct lttng_kconsumerd_local_data *ctx, int cmd)