X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=liblttngkconsumerd%2Flttngkconsumerd.c;h=5c9f613b11eddf48e4450e0cfd304b00c54d27e6;hp=69ef9a0e8b65173e9d6544bea875e6a9b6e9753c;hb=5348b470cf00157d738300e28bcab78ea203ec0f;hpb=cabd9892f83445ff6e23ee4f277d66d11ce616c7 diff --git a/liblttngkconsumerd/lttngkconsumerd.c b/liblttngkconsumerd/lttngkconsumerd.c index 69ef9a0e8..5c9f613b1 100644 --- a/liblttngkconsumerd/lttngkconsumerd.c +++ b/liblttngkconsumerd/lttngkconsumerd.c @@ -28,6 +28,7 @@ #include #include #include +#include #include @@ -125,22 +126,21 @@ static void kconsumerd_del_fd(struct lttng_kconsumerd_fd *lcf) } /* - * Add a fd to the global list protected by a mutex. + * Create a struct lttcomm_kconsumerd_msg from the + * information received on the receiving socket */ -static int kconsumerd_add_fd(struct lttcomm_kconsumerd_msg *buf, +struct lttng_kconsumerd_fd *kconsumerd_allocate_fd( + struct lttcomm_kconsumerd_msg *buf, int consumerd_fd) { struct lttng_kconsumerd_fd *tmp_fd; - int ret = 0; - pthread_mutex_lock(&kconsumerd_data.lock); - /* Check if already exist */ - ret = kconsumerd_find_session_fd(buf->fd); - if (ret == 1) { + tmp_fd = malloc(sizeof(struct lttng_kconsumerd_fd)); + if (tmp_fd == NULL) { + perror("malloc struct lttng_kconsumerd_fd"); goto end; } - tmp_fd = malloc(sizeof(struct lttng_kconsumerd_fd)); tmp_fd->sessiond_fd = buf->fd; tmp_fd->consumerd_fd = consumerd_fd; tmp_fd->state = buf->state; @@ -152,42 +152,31 @@ static int kconsumerd_add_fd(struct lttcomm_kconsumerd_msg *buf, tmp_fd->output = buf->output; strncpy(tmp_fd->path_name, buf->path_name, PATH_MAX); tmp_fd->path_name[PATH_MAX - 1] = '\0'; + DBG("Allocated %s (sessiond_fd %d, consumerd_fd %d, out_fd %d)", + tmp_fd->path_name, tmp_fd->sessiond_fd, + tmp_fd->consumerd_fd, tmp_fd->out_fd); - /* Opening the tracefile in write mode */ - if (tmp_fd->path_name != NULL) { - ret = open(tmp_fd->path_name, - O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO); - if (ret < 0) { - ERR("Opening %s", tmp_fd->path_name); - perror("open"); - goto end; - } - tmp_fd->out_fd = ret; - DBG("Adding %s (%d, %d, %d)", tmp_fd->path_name, - tmp_fd->sessiond_fd, tmp_fd->consumerd_fd, tmp_fd->out_fd); - } +end: + return tmp_fd; +} - if (tmp_fd->output == LTTNG_EVENT_MMAP) { - /* get the len of the mmap region */ - ret = kernctl_get_mmap_len(tmp_fd->consumerd_fd, &tmp_fd->mmap_len); - if (ret != 0) { - ret = errno; - perror("kernctl_get_mmap_len"); - goto end; - } +/* + * Add a fd to the global list protected by a mutex. + */ +static int kconsumerd_add_fd(struct lttng_kconsumerd_fd *tmp_fd) +{ + int ret; - tmp_fd->mmap_base = mmap(NULL, tmp_fd->mmap_len, - PROT_READ, MAP_PRIVATE, tmp_fd->consumerd_fd, 0); - if (tmp_fd->mmap_base == MAP_FAILED) { - perror("Error mmaping"); - ret = -1; - goto end; - } + pthread_mutex_lock(&kconsumerd_data.lock); + /* Check if already exist */ + ret = kconsumerd_find_session_fd(tmp_fd->sessiond_fd); + if (ret == 1) { + goto end; } - cds_list_add(&tmp_fd->list, &kconsumerd_data.fd_list.head); kconsumerd_data.fds_count++; kconsumerd_data.need_update = 1; + end: pthread_mutex_unlock(&kconsumerd_data.lock); return ret; @@ -263,6 +252,7 @@ static int kconsumerd_consumerd_recv_fd( int nb_fd; char recv_fd[CMSG_SPACE(sizeof(int))]; struct lttcomm_kconsumerd_msg lkm; + struct lttng_kconsumerd_fd *new_fd; /* the number of fds we are about to receive */ nb_fd = size / sizeof(struct lttcomm_kconsumerd_msg); @@ -313,14 +303,34 @@ static int kconsumerd_consumerd_recv_fd( DBG("kconsumerd_add_fd %s (%d)", lkm.path_name, ((int *) CMSG_DATA(cmsg))[0]); - ret = kconsumerd_add_fd(&lkm, ((int *) CMSG_DATA(cmsg))[0]); - if (ret < 0) { + new_fd = kconsumerd_allocate_fd(&lkm, ((int *) CMSG_DATA(cmsg))[0]); + if (new_fd == NULL) { lttng_kconsumerd_send_error(ctx, KCONSUMERD_OUTFD_ERROR); goto end; } + + if (ctx->on_recv_fd != NULL) { + ret = ctx->on_recv_fd(new_fd); + if (ret == 0) { + kconsumerd_add_fd(new_fd); + } else if (ret < 0) { + goto end; + } + } else { + kconsumerd_add_fd(new_fd); + } break; case UPDATE_STREAM: - kconsumerd_change_fd_state(lkm.fd, lkm.state); + if (ctx->on_update_fd != NULL) { + ret = ctx->on_update_fd(lkm.fd, lkm.state); + if (ret == 0) { + kconsumerd_change_fd_state(lkm.fd, lkm.state); + } else if (ret < 0) { + goto end; + } + } else { + kconsumerd_change_fd_state(lkm.fd, lkm.state); + } break; default: break; @@ -754,42 +764,63 @@ end: * Returns a pointer to the new context or NULL on error. */ struct lttng_kconsumerd_local_data *lttng_kconsumerd_create( - int (*buffer_ready)(struct lttng_kconsumerd_fd *kconsumerd_fd)) + int (*buffer_ready)(struct lttng_kconsumerd_fd *kconsumerd_fd), + int (*recv_fd)(struct lttng_kconsumerd_fd *kconsumerd_fd), + int (*update_fd)(int sessiond_fd, uint32_t state)) { - int ret; + int ret, i; struct lttng_kconsumerd_local_data *ctx; ctx = malloc(sizeof(struct lttng_kconsumerd_local_data)); if (ctx == NULL) { perror("allocating context"); - goto end; + goto error; } + /* assign the callbacks */ ctx->on_buffer_ready = buffer_ready; + ctx->on_recv_fd = recv_fd; + ctx->on_update_fd = update_fd; ret = pipe(ctx->kconsumerd_poll_pipe); if (ret < 0) { perror("Error creating poll pipe"); - ctx = NULL; - goto end; + goto error_poll_pipe; } ret = pipe(ctx->kconsumerd_should_quit); if (ret < 0) { perror("Error creating recv pipe"); - ctx = NULL; - goto end; + goto error_quit_pipe; } ret = pipe(ctx->kconsumerd_thread_pipe); if (ret < 0) { perror("Error creating thread pipe"); - ctx = NULL; - goto end; + goto error_thread_pipe; } -end: return ctx; + + +error_thread_pipe: + for (i = 0; i < 2; i++) { + int err; + + err = close(ctx->kconsumerd_should_quit[i]); + assert(!err); + } +error_quit_pipe: + for (i = 0; i < 2; i++) { + int err; + + err = close(ctx->kconsumerd_poll_pipe[i]); + assert(!err); + } +error_poll_pipe: + free(ctx); +error: + return NULL; } /*