X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=kconsumerd%2Fkconsumerd.c;h=fcd4101a62d0bdbdff1b7ffcd34a86365324afbf;hp=a00f0793211218b58252ba2cc3c967414da3c941;hb=252fd492b48721806ed8d72b508ea3733bc70845;hpb=0632499a602c5588da22d191c14827be3b86a01a diff --git a/kconsumerd/kconsumerd.c b/kconsumerd/kconsumerd.c index a00f07932..fcd4101a6 100644 --- a/kconsumerd/kconsumerd.c +++ b/kconsumerd/kconsumerd.c @@ -61,6 +61,9 @@ static pthread_t threads[2]; /* communication with splice */ static int thread_pipe[2]; +/* pipe to wake the poll thread when necessary */ +static int poll_pipe[2]; + /* socket to communicate errors with sessiond */ static int error_socket = -1; @@ -72,6 +75,29 @@ static const char *progname; static char command_sock_path[PATH_MAX]; /* Global command socket path */ static char error_sock_path[PATH_MAX]; /* Global error path */ +/* + * del_fd + * + * Remove a fd from the global list protected by a mutex + */ +static void del_fd(struct ltt_kconsumerd_fd *lcf) +{ + DBG("Removing %d", lcf->consumerd_fd); + pthread_mutex_lock(&kconsumerd_lock_fds); + cds_list_del(&lcf->list); + if (fds_count > 0) { + fds_count--; + DBG("Removed ltt_kconsumerd_fd"); + if (lcf != NULL) { + close(lcf->out_fd); + close(lcf->consumerd_fd); + free(lcf); + lcf = NULL; + } + } + pthread_mutex_unlock(&kconsumerd_lock_fds); +} + /* * cleanup * @@ -79,10 +105,24 @@ static char error_sock_path[PATH_MAX]; /* Global error path */ */ static void cleanup() { + struct ltt_kconsumerd_fd *iter; + + /* remove the socket file */ unlink(command_sock_path); + + /* unblock the threads */ + WARN("Terminating the threads before exiting"); + pthread_cancel(threads[0]); + pthread_cancel(threads[1]); + + /* close all outfd */ + cds_list_for_each_entry(iter, &kconsumerd_fd_list.head, list) { + del_fd(iter); + } } -/* send_error +/* + * send_error * * send return code to ltt-sessiond */ @@ -96,21 +136,6 @@ static int send_error(enum lttcomm_return_code cmd) } } -/* - * cleanup_kconsumerd_fd - * - * Close the FDs and frees a ltt_kconsumerd_fd struct - */ -static void cleanup_kconsumerd_fd(struct ltt_kconsumerd_fd *lcf) -{ - if (lcf != NULL) { - close(lcf->out_fd); - close(lcf->consumerd_fd); - free(lcf); - lcf = NULL; - } -} - /* * add_fd * @@ -152,36 +177,6 @@ end: return ret; } -/* - * del_fd - * - * Remove a fd from the global list protected by a mutex - */ -static void del_fd(struct ltt_kconsumerd_fd *lcf) -{ - pthread_mutex_lock(&kconsumerd_lock_fds); - cds_list_del(&lcf->list); - if (fds_count > 0) { - fds_count--; - DBG("Removed ltt_kconsumerd_fd"); - cleanup_kconsumerd_fd(lcf); - } - pthread_mutex_unlock(&kconsumerd_lock_fds); -} - -/* - * close_outfds - * - * Close all fds in the previous fd_list - * Must be used with kconsumerd_lock_fds lock held - */ -static void close_outfds() -{ - struct ltt_kconsumerd_fd *iter; - cds_list_for_each_entry(iter, &kconsumerd_fd_list.head, list) { - del_fd(iter); - } -} /* * sighandler @@ -190,11 +185,6 @@ static void close_outfds() */ static void sighandler(int sig) { - /* unblock the threads */ - pthread_cancel(threads[0]); - pthread_cancel(threads[1]); - - close_outfds(); cleanup(); return; @@ -344,7 +334,7 @@ static int read_subbuffer(struct ltt_kconsumerd_fd *kconsumerd_fd) long ret = 0; int infd = kconsumerd_fd->consumerd_fd; - DBG("In read_subbuffer"); + DBG("In read_subbuffer (infd : %d)", infd); /* Get the next subbuffer */ err = kernctl_get_next_subbuf(infd); if (err != 0) { @@ -417,7 +407,7 @@ static int consumerd_recv_fd(int sfd, int size, { struct msghdr msg; struct iovec iov[1]; - int ret, i; + int ret, i, tmp2; struct cmsghdr *cmsg; int nb_fd; char tmp[CMSG_SPACE(size)]; @@ -478,7 +468,8 @@ static int consumerd_recv_fd(int sfd, int size, } /* flag to tell the polling thread to update its fd array */ update_fd_array = 1; - send_error(KCONSUMERD_SUCCESS_RECV_FD); + /* signal the poll thread */ + tmp2 = write(poll_pipe[1], "4", 1); } else { ERR("Didn't received any fd"); send_error(KCONSUMERD_ERROR_RECV_FD); @@ -525,23 +516,24 @@ static void *thread_receive_fds(void *data) goto error; } + /* Blocking call, waiting for transmission */ + sock = lttcomm_accept_unix_sock(client_socket); + if (sock <= 0) { + WARN("On accept, retrying"); + goto error; + } while (1) { - /* Blocking call, waiting for transmission */ - sock = lttcomm_accept_unix_sock(client_socket); - if (sock <= 0) { - continue; - } - /* We first get the number of fd we are about to receive */ ret = lttcomm_recv_unix_sock(sock, &tmp, sizeof(struct lttcomm_kconsumerd_header)); - if (ret < 0) { - ERR("Receiving the lttcomm_kconsumerd_header"); - continue; + if (ret <= 0) { + ERR("Receiving the lttcomm_kconsumerd_header, exiting"); + goto error; } ret = consumerd_recv_fd(sock, tmp.payload_size, tmp.cmd_type); - if (ret < 0) { - continue; + if (ret <= 0) { + ERR("Receiving the FD, exiting"); + goto error; } } @@ -573,13 +565,15 @@ static int update_poll_array(struct pollfd **pollfd, *local_kconsumerd_fd = NULL; } - *pollfd = malloc(fds_count * sizeof(struct pollfd)); + /* allocate for all fds + 1 for the poll_pipe */ + *pollfd = malloc((fds_count + 1) * sizeof(struct pollfd)); if (*pollfd == NULL) { perror("pollfd malloc"); goto error_mem; } - *local_kconsumerd_fd = malloc(fds_count * sizeof(struct ltt_kconsumerd_fd)); + /* allocate for all fds + 1 for the poll_pipe */ + *local_kconsumerd_fd = malloc((fds_count + 1) * sizeof(struct ltt_kconsumerd_fd)); if (*local_kconsumerd_fd == NULL) { perror("local_kconsumerd_fd malloc"); goto error_mem; @@ -592,14 +586,21 @@ static int update_poll_array(struct pollfd **pollfd, DBG("Inside for each"); if (iter->state == ACTIVE_FD) { DBG("Active FD %d", iter->consumerd_fd); - pollfd[i]->fd = iter->consumerd_fd; - pollfd[i]->events = POLLIN | POLLPRI; + (*pollfd)[i].fd = iter->consumerd_fd; + (*pollfd)[i].events = POLLIN | POLLPRI; local_kconsumerd_fd[i] = iter; i++; } else if (iter->state == DELETE_FD) { del_fd(iter); } } + /* + * insert the poll_pipe at the end of the array and don't increment i + * so nb_fd is the number of real FD + */ + (*pollfd)[i].fd = poll_pipe[0]; + (*pollfd)[i].events = POLLIN; + update_fd_array = 0; pthread_mutex_unlock(&kconsumerd_lock_fds); return i; @@ -619,9 +620,11 @@ static void *thread_poll_fds(void *data) int num_rdy, num_hup, high_prio, ret, i; struct pollfd *pollfd = NULL; /* local view of the fds */ - struct ltt_kconsumerd_fd *local_kconsumerd_fd = NULL; + struct ltt_kconsumerd_fd **local_kconsumerd_fd = NULL; /* local view of fds_count */ int nb_fd = 0; + char tmp; + int tmp2; ret = pipe(thread_pipe); if (ret < 0) { @@ -629,6 +632,8 @@ static void *thread_poll_fds(void *data) goto end; } + local_kconsumerd_fd = malloc(sizeof(struct ltt_kconsumerd_fd)); + while (1) { high_prio = 0; num_hup = 0; @@ -638,7 +643,7 @@ static void *thread_poll_fds(void *data) * local array as well */ if (update_fd_array) { - ret = update_poll_array(&pollfd, &local_kconsumerd_fd); + ret = update_poll_array(&pollfd, local_kconsumerd_fd); if (ret < 0) { ERR("Error in allocating pollfd or local_outfds"); send_error(KCONSUMERD_POLL_ERROR); @@ -648,8 +653,8 @@ static void *thread_poll_fds(void *data) } /* poll on the array of fds */ - DBG("polling on %d fd", nb_fd); - num_rdy = poll(pollfd, nb_fd, POLL_TIMEOUT); + DBG("polling on %d fd", nb_fd + 1); + num_rdy = poll(pollfd, nb_fd + 1, -1); DBG("poll num_rdy : %d", num_rdy); if (num_rdy == -1) { perror("Poll error"); @@ -657,6 +662,16 @@ static void *thread_poll_fds(void *data) goto end; } + /* + * if only the poll_pipe triggered poll to return just return to the + * beginning of the loop to update the array + */ + if (num_rdy == 1 && pollfd[nb_fd].revents == POLLIN) { + DBG("poll_pipe wake up"); + tmp2 = read(poll_pipe[0], &tmp, 1); + continue; + } + /* Take care of high priority channels first. */ for (i = 0; i < nb_fd; i++) { switch(pollfd[i].revents) { @@ -677,7 +692,7 @@ static void *thread_poll_fds(void *data) case POLLPRI: DBG("Urgent read on fd %d", pollfd[i].fd); high_prio = 1; - ret = read_subbuffer(&local_kconsumerd_fd[i]); + ret = read_subbuffer(local_kconsumerd_fd[i]); /* it's ok to have an unavailable sub-buffer (FIXME : is it ?) */ if (ret == EAGAIN) { ret = 0; @@ -690,7 +705,7 @@ static void *thread_poll_fds(void *data) if (nb_fd > 0 && num_hup == nb_fd) { DBG("every buffer FD has hung up\n"); send_error(KCONSUMERD_POLL_HUP); - continue; + goto end; } /* Take care of low priority channels. */ @@ -699,7 +714,7 @@ static void *thread_poll_fds(void *data) switch(pollfd[i].revents) { case POLLIN: DBG("Normal read on fd %d", pollfd[i].fd); - ret = read_subbuffer(&local_kconsumerd_fd[i]); + ret = read_subbuffer(local_kconsumerd_fd[i]); /* it's ok to have an unavailable subbuffer (FIXME : is it ?) */ if (ret == EAGAIN) { ret = 0; @@ -718,6 +733,7 @@ end: free(local_kconsumerd_fd); local_kconsumerd_fd = NULL; } + cleanup(); return NULL; } @@ -839,6 +855,13 @@ int main(int argc, char **argv) goto error; } + /* create the pipe to wake to polling thread when needed */ + ret = pipe(poll_pipe); + if (ret < 0) { + perror("Error creating poll pipe"); + goto end; + } + /* Connect to the socket created by ltt-sessiond to report errors */ DBG("Connecting to error socket %s", error_sock_path); error_socket = lttcomm_connect_unix_sock(error_sock_path);