From 5348b470cf00157d738300e28bcab78ea203ec0f Mon Sep 17 00:00:00 2001 From: Julien Desfossez Date: Tue, 16 Aug 2011 19:25:04 -0400 Subject: [PATCH] Callbacks on receive and update FD The user of the lib can now take control over a new FD or the update operation of an existing FD. Opening the output tracefile is now the responsiblity of the user and not the library itself. [ Edit by Mathieu Desnoyers: comment and teardown cleanups ] Signed-off-by: Julien Desfossez Signed-off-by: Mathieu Desnoyers --- include/lttng/lttng-kconsumerd.h | 35 ++++++- liblttngkconsumerd/lttngkconsumerd.c | 131 +++++++++++++++++---------- ltt-kconsumerd/ltt-kconsumerd.c | 53 ++++++++++- 3 files changed, 162 insertions(+), 57 deletions(-) diff --git a/include/lttng/lttng-kconsumerd.h b/include/lttng/lttng-kconsumerd.h index 7e195ab7e..e09bdc3bd 100644 --- a/include/lttng/lttng-kconsumerd.h +++ b/include/lttng/lttng-kconsumerd.h @@ -79,6 +79,31 @@ struct lttng_kconsumerd_fd { struct lttng_kconsumerd_local_data { /* function to call when data is available on a buffer */ int (*on_buffer_ready)(struct lttng_kconsumerd_fd *kconsumerd_fd); + /* + * function to call when we receive a new fd, it receives a + * newly allocated kconsumerd_fd, depending on the return code + * of this function, the new FD will be handled by the + * application or the library. + * + * Returns: + * > 0 (success, FD is kept by application) + * == 0 (success, FD is left to library) + * < 0 (error) + */ + int (*on_recv_fd)(struct lttng_kconsumerd_fd *kconsumerd_fd); + /* + * function to call when a FD is getting updated by the session + * daemon, this function receives the FD as seen by the session + * daemon (sessiond_fd) and the new state, depending on the + * return code of this function the update of state for the FD + * is handled by the application or the library. + * + * Returns: + * > 0 (success, FD is kept by application) + * == 0 (success, FD is left to library) + * < 0 (error) + */ + int (*on_update_fd)(int sessiond_fd, uint32_t state); /* socket to communicate errors with sessiond */ int kconsumerd_error_socket; /* socket to exchange commands with sessiond */ @@ -98,15 +123,15 @@ struct lttng_kconsumerd_local_data { * - create the should_quit pipe (for signal handler) * - create the thread pipe (for splice) * - * Takes a function pointer as argument, this function is called when data is - * available on a buffer. This function is responsible to do the - * kernctl_get_next_subbuf, read the data with mmap or splice depending on the - * buffer configuration and then kernctl_put_next_subbuf at the end. + * Takes the function pointers to the on_buffer_ready, on_recv_fd, and + * on_update_fd callbacks. * * Returns a pointer to the new context or NULL on error. */ extern struct lttng_kconsumerd_local_data *lttng_kconsumerd_create( - int (*buffer_ready)(struct lttng_kconsumerd_fd *kconsumerd_fd)); + int (*buffer_ready)(struct lttng_kconsumerd_fd *kconsumerd_fd), + int (*recv_fd)(struct lttng_kconsumerd_fd *kconsumerd_fd), + int (*update_fd)(int sessiond_fd, uint32_t state)); /* * Close all fds associated with the instance and free the context. diff --git a/liblttngkconsumerd/lttngkconsumerd.c b/liblttngkconsumerd/lttngkconsumerd.c index 69ef9a0e8..5c9f613b1 100644 --- a/liblttngkconsumerd/lttngkconsumerd.c +++ b/liblttngkconsumerd/lttngkconsumerd.c @@ -28,6 +28,7 @@ #include #include #include +#include #include @@ -125,22 +126,21 @@ static void kconsumerd_del_fd(struct lttng_kconsumerd_fd *lcf) } /* - * Add a fd to the global list protected by a mutex. + * Create a struct lttcomm_kconsumerd_msg from the + * information received on the receiving socket */ -static int kconsumerd_add_fd(struct lttcomm_kconsumerd_msg *buf, +struct lttng_kconsumerd_fd *kconsumerd_allocate_fd( + struct lttcomm_kconsumerd_msg *buf, int consumerd_fd) { struct lttng_kconsumerd_fd *tmp_fd; - int ret = 0; - pthread_mutex_lock(&kconsumerd_data.lock); - /* Check if already exist */ - ret = kconsumerd_find_session_fd(buf->fd); - if (ret == 1) { + tmp_fd = malloc(sizeof(struct lttng_kconsumerd_fd)); + if (tmp_fd == NULL) { + perror("malloc struct lttng_kconsumerd_fd"); goto end; } - tmp_fd = malloc(sizeof(struct lttng_kconsumerd_fd)); tmp_fd->sessiond_fd = buf->fd; tmp_fd->consumerd_fd = consumerd_fd; tmp_fd->state = buf->state; @@ -152,42 +152,31 @@ static int kconsumerd_add_fd(struct lttcomm_kconsumerd_msg *buf, tmp_fd->output = buf->output; strncpy(tmp_fd->path_name, buf->path_name, PATH_MAX); tmp_fd->path_name[PATH_MAX - 1] = '\0'; + DBG("Allocated %s (sessiond_fd %d, consumerd_fd %d, out_fd %d)", + tmp_fd->path_name, tmp_fd->sessiond_fd, + tmp_fd->consumerd_fd, tmp_fd->out_fd); - /* Opening the tracefile in write mode */ - if (tmp_fd->path_name != NULL) { - ret = open(tmp_fd->path_name, - O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO); - if (ret < 0) { - ERR("Opening %s", tmp_fd->path_name); - perror("open"); - goto end; - } - tmp_fd->out_fd = ret; - DBG("Adding %s (%d, %d, %d)", tmp_fd->path_name, - tmp_fd->sessiond_fd, tmp_fd->consumerd_fd, tmp_fd->out_fd); - } +end: + return tmp_fd; +} - if (tmp_fd->output == LTTNG_EVENT_MMAP) { - /* get the len of the mmap region */ - ret = kernctl_get_mmap_len(tmp_fd->consumerd_fd, &tmp_fd->mmap_len); - if (ret != 0) { - ret = errno; - perror("kernctl_get_mmap_len"); - goto end; - } +/* + * Add a fd to the global list protected by a mutex. + */ +static int kconsumerd_add_fd(struct lttng_kconsumerd_fd *tmp_fd) +{ + int ret; - tmp_fd->mmap_base = mmap(NULL, tmp_fd->mmap_len, - PROT_READ, MAP_PRIVATE, tmp_fd->consumerd_fd, 0); - if (tmp_fd->mmap_base == MAP_FAILED) { - perror("Error mmaping"); - ret = -1; - goto end; - } + pthread_mutex_lock(&kconsumerd_data.lock); + /* Check if already exist */ + ret = kconsumerd_find_session_fd(tmp_fd->sessiond_fd); + if (ret == 1) { + goto end; } - cds_list_add(&tmp_fd->list, &kconsumerd_data.fd_list.head); kconsumerd_data.fds_count++; kconsumerd_data.need_update = 1; + end: pthread_mutex_unlock(&kconsumerd_data.lock); return ret; @@ -263,6 +252,7 @@ static int kconsumerd_consumerd_recv_fd( int nb_fd; char recv_fd[CMSG_SPACE(sizeof(int))]; struct lttcomm_kconsumerd_msg lkm; + struct lttng_kconsumerd_fd *new_fd; /* the number of fds we are about to receive */ nb_fd = size / sizeof(struct lttcomm_kconsumerd_msg); @@ -313,14 +303,34 @@ static int kconsumerd_consumerd_recv_fd( DBG("kconsumerd_add_fd %s (%d)", lkm.path_name, ((int *) CMSG_DATA(cmsg))[0]); - ret = kconsumerd_add_fd(&lkm, ((int *) CMSG_DATA(cmsg))[0]); - if (ret < 0) { + new_fd = kconsumerd_allocate_fd(&lkm, ((int *) CMSG_DATA(cmsg))[0]); + if (new_fd == NULL) { lttng_kconsumerd_send_error(ctx, KCONSUMERD_OUTFD_ERROR); goto end; } + + if (ctx->on_recv_fd != NULL) { + ret = ctx->on_recv_fd(new_fd); + if (ret == 0) { + kconsumerd_add_fd(new_fd); + } else if (ret < 0) { + goto end; + } + } else { + kconsumerd_add_fd(new_fd); + } break; case UPDATE_STREAM: - kconsumerd_change_fd_state(lkm.fd, lkm.state); + if (ctx->on_update_fd != NULL) { + ret = ctx->on_update_fd(lkm.fd, lkm.state); + if (ret == 0) { + kconsumerd_change_fd_state(lkm.fd, lkm.state); + } else if (ret < 0) { + goto end; + } + } else { + kconsumerd_change_fd_state(lkm.fd, lkm.state); + } break; default: break; @@ -754,42 +764,63 @@ end: * Returns a pointer to the new context or NULL on error. */ struct lttng_kconsumerd_local_data *lttng_kconsumerd_create( - int (*buffer_ready)(struct lttng_kconsumerd_fd *kconsumerd_fd)) + int (*buffer_ready)(struct lttng_kconsumerd_fd *kconsumerd_fd), + int (*recv_fd)(struct lttng_kconsumerd_fd *kconsumerd_fd), + int (*update_fd)(int sessiond_fd, uint32_t state)) { - int ret; + int ret, i; struct lttng_kconsumerd_local_data *ctx; ctx = malloc(sizeof(struct lttng_kconsumerd_local_data)); if (ctx == NULL) { perror("allocating context"); - goto end; + goto error; } + /* assign the callbacks */ ctx->on_buffer_ready = buffer_ready; + ctx->on_recv_fd = recv_fd; + ctx->on_update_fd = update_fd; ret = pipe(ctx->kconsumerd_poll_pipe); if (ret < 0) { perror("Error creating poll pipe"); - ctx = NULL; - goto end; + goto error_poll_pipe; } ret = pipe(ctx->kconsumerd_should_quit); if (ret < 0) { perror("Error creating recv pipe"); - ctx = NULL; - goto end; + goto error_quit_pipe; } ret = pipe(ctx->kconsumerd_thread_pipe); if (ret < 0) { perror("Error creating thread pipe"); - ctx = NULL; - goto end; + goto error_thread_pipe; } -end: return ctx; + + +error_thread_pipe: + for (i = 0; i < 2; i++) { + int err; + + err = close(ctx->kconsumerd_should_quit[i]); + assert(!err); + } +error_quit_pipe: + for (i = 0; i < 2; i++) { + int err; + + err = close(ctx->kconsumerd_poll_pipe[i]); + assert(!err); + } +error_poll_pipe: + free(ctx); +error: + return NULL; } /* diff --git a/ltt-kconsumerd/ltt-kconsumerd.c b/ltt-kconsumerd/ltt-kconsumerd.c index ca939656b..7eab42e52 100644 --- a/ltt-kconsumerd/ltt-kconsumerd.c +++ b/ltt-kconsumerd/ltt-kconsumerd.c @@ -36,6 +36,7 @@ #include #include #include +#include #include @@ -277,6 +278,54 @@ end: return ret; } +static int on_recv_fd(struct lttng_kconsumerd_fd *kconsumerd_fd) +{ + int ret; + + /* Opening the tracefile in write mode */ + if (kconsumerd_fd->path_name != NULL) { + ret = open(kconsumerd_fd->path_name, + O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO); + if (ret < 0) { + ERR("Opening %s", kconsumerd_fd->path_name); + perror("open"); + goto error; + } + kconsumerd_fd->out_fd = ret; + } + + if (kconsumerd_fd->output == LTTNG_EVENT_MMAP) { + /* get the len of the mmap region */ + ret = kernctl_get_mmap_len(kconsumerd_fd->consumerd_fd, &kconsumerd_fd->mmap_len); + if (ret != 0) { + ret = errno; + perror("kernctl_get_mmap_len"); + goto error_close_fd; + } + + kconsumerd_fd->mmap_base = mmap(NULL, kconsumerd_fd->mmap_len, + PROT_READ, MAP_PRIVATE, kconsumerd_fd->consumerd_fd, 0); + if (kconsumerd_fd->mmap_base == MAP_FAILED) { + perror("Error mmaping"); + ret = -1; + goto error_close_fd; + } + } + + /* we return 0 to let the library handle the FD internally */ + return 0; + +error_close_fd: + { + int err; + + err = close(kconsumerd_fd->out_fd); + assert(!err); + } +error: + return ret; +} + /* * main */ @@ -303,8 +352,8 @@ int main(int argc, char **argv) snprintf(command_sock_path, PATH_MAX, KCONSUMERD_CMD_SOCK_PATH); } - /* create the pipe to wake to receiving thread when needed */ - ctx = lttng_kconsumerd_create(read_subbuffer); + /* create the consumer instance with and assign the callbacks */ + ctx = lttng_kconsumerd_create(read_subbuffer, on_recv_fd, NULL); if (ctx == NULL) { goto error; } -- 2.34.1