X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=kconsumerd%2Fkconsumerd.c;h=f0ccb8c9e797a049141a1ba0fc131f70fe6d3197;hb=6d8076ad2a383e3a1d9a5c78fa67e882751bbb1a;hp=66f3877d020cddae531fd123fc7df4ab31757e18;hpb=6aea26bc854e8090e609502e53a3e2923cb3b2c5;p=lttng-tools.git diff --git a/kconsumerd/kconsumerd.c b/kconsumerd/kconsumerd.c index 66f3877d0..f0ccb8c9e 100644 --- a/kconsumerd/kconsumerd.c +++ b/kconsumerd/kconsumerd.c @@ -61,6 +61,9 @@ static pthread_t threads[2]; /* communication with splice */ static int thread_pipe[2]; +/* pipe to wake the poll thread when necessary */ +static int poll_pipe[2]; + /* socket to communicate errors with sessiond */ static int error_socket = -1; @@ -404,7 +407,7 @@ static int consumerd_recv_fd(int sfd, int size, { struct msghdr msg; struct iovec iov[1]; - int ret, i; + int ret, i, tmp2; struct cmsghdr *cmsg; int nb_fd; char tmp[CMSG_SPACE(size)]; @@ -465,7 +468,8 @@ static int consumerd_recv_fd(int sfd, int size, } /* flag to tell the polling thread to update its fd array */ update_fd_array = 1; - send_error(KCONSUMERD_SUCCESS_RECV_FD); + /* signal the poll thread */ + tmp2 = write(poll_pipe[1], "4", 1); } else { ERR("Didn't received any fd"); send_error(KCONSUMERD_ERROR_RECV_FD); @@ -515,7 +519,7 @@ static void *thread_receive_fds(void *data) /* Blocking call, waiting for transmission */ sock = lttcomm_accept_unix_sock(client_socket); if (sock <= 0) { - WARN("On accept, retrying"); + WARN("On accept"); goto error; } while (1) { @@ -551,27 +555,6 @@ static int update_poll_array(struct pollfd **pollfd, struct ltt_kconsumerd_fd *iter; int i = 0; - if (*pollfd != NULL) { - free(*pollfd); - *pollfd = NULL; - } - - if (*local_kconsumerd_fd != NULL) { - free(*local_kconsumerd_fd); - *local_kconsumerd_fd = NULL; - } - - *pollfd = malloc(fds_count * sizeof(struct pollfd)); - if (*pollfd == NULL) { - perror("pollfd malloc"); - goto error_mem; - } - - *local_kconsumerd_fd = malloc(fds_count * sizeof(struct ltt_kconsumerd_fd)); - if (*local_kconsumerd_fd == NULL) { - perror("local_kconsumerd_fd malloc"); - goto error_mem; - } DBG("Updating poll fd array"); pthread_mutex_lock(&kconsumerd_lock_fds); @@ -588,12 +571,17 @@ static int update_poll_array(struct pollfd **pollfd, del_fd(iter); } } + /* + * insert the poll_pipe at the end of the array and don't increment i + * so nb_fd is the number of real FD + */ + (*pollfd)[i].fd = poll_pipe[0]; + (*pollfd)[i].events = POLLIN; + update_fd_array = 0; pthread_mutex_unlock(&kconsumerd_lock_fds); return i; -error_mem: - return -ENOMEM; } /* @@ -610,6 +598,8 @@ static void *thread_poll_fds(void *data) struct ltt_kconsumerd_fd **local_kconsumerd_fd = NULL; /* local view of fds_count */ int nb_fd = 0; + char tmp; + int tmp2; ret = pipe(thread_pipe); if (ret < 0) { @@ -628,6 +618,27 @@ static void *thread_poll_fds(void *data) * local array as well */ if (update_fd_array) { + if (pollfd != NULL) { + free(pollfd); + pollfd = NULL; + } + if (local_kconsumerd_fd != NULL) { + free(local_kconsumerd_fd); + local_kconsumerd_fd = NULL; + } + /* allocate for all fds + 1 for the poll_pipe */ + pollfd = malloc((fds_count + 1) * sizeof(struct pollfd)); + if (pollfd == NULL) { + perror("pollfd malloc"); + goto end; + } + /* allocate for all fds + 1 for the poll_pipe */ + local_kconsumerd_fd = malloc((fds_count + 1) * sizeof(struct ltt_kconsumerd_fd)); + if (local_kconsumerd_fd == NULL) { + perror("local_kconsumerd_fd malloc"); + goto end; + } + ret = update_poll_array(&pollfd, local_kconsumerd_fd); if (ret < 0) { ERR("Error in allocating pollfd or local_outfds"); @@ -638,8 +649,8 @@ static void *thread_poll_fds(void *data) } /* poll on the array of fds */ - DBG("polling on %d fd", nb_fd); - num_rdy = poll(pollfd, nb_fd, POLL_TIMEOUT); + DBG("polling on %d fd", nb_fd + 1); + num_rdy = poll(pollfd, nb_fd + 1, -1); DBG("poll num_rdy : %d", num_rdy); if (num_rdy == -1) { perror("Poll error"); @@ -647,6 +658,16 @@ static void *thread_poll_fds(void *data) goto end; } + /* + * if only the poll_pipe triggered poll to return just return to the + * beginning of the loop to update the array + */ + if (num_rdy == 1 && pollfd[nb_fd].revents == POLLIN) { + DBG("poll_pipe wake up"); + tmp2 = read(poll_pipe[0], &tmp, 1); + continue; + } + /* Take care of high priority channels first. */ for (i = 0; i < nb_fd; i++) { switch(pollfd[i].revents) { @@ -830,6 +851,13 @@ int main(int argc, char **argv) goto error; } + /* create the pipe to wake to polling thread when needed */ + ret = pipe(poll_pipe); + if (ret < 0) { + perror("Error creating poll pipe"); + goto end; + } + /* Connect to the socket created by ltt-sessiond to report errors */ DBG("Connecting to error socket %s", error_sock_path); error_socket = lttcomm_connect_unix_sock(error_sock_path);