X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=kconsumerd%2Fkconsumerd.c;h=66f3877d020cddae531fd123fc7df4ab31757e18;hp=4c9c089aa9e5268da6575ca777b05f4d1ad5ec15;hb=6aea26bc854e8090e609502e53a3e2923cb3b2c5;hpb=f34daff7d0139d162e949f1b4fbb4cd003746b15 diff --git a/kconsumerd/kconsumerd.c b/kconsumerd/kconsumerd.c index 4c9c089aa..66f3877d0 100644 --- a/kconsumerd/kconsumerd.c +++ b/kconsumerd/kconsumerd.c @@ -79,6 +79,7 @@ static char error_sock_path[PATH_MAX]; /* Global error path */ */ static void del_fd(struct ltt_kconsumerd_fd *lcf) { + DBG("Removing %d", lcf->consumerd_fd); pthread_mutex_lock(&kconsumerd_lock_fds); cds_list_del(&lcf->list); if (fds_count > 0) { @@ -103,7 +104,6 @@ static void cleanup() { struct ltt_kconsumerd_fd *iter; - /* remove the socket file */ unlink(command_sock_path); @@ -118,7 +118,8 @@ static void cleanup() } } -/* send_error +/* + * send_error * * send return code to ltt-sessiond */ @@ -330,7 +331,7 @@ static int read_subbuffer(struct ltt_kconsumerd_fd *kconsumerd_fd) long ret = 0; int infd = kconsumerd_fd->consumerd_fd; - DBG("In read_subbuffer"); + DBG("In read_subbuffer (infd : %d)", infd); /* Get the next subbuffer */ err = kernctl_get_next_subbuf(infd); if (err != 0) { @@ -518,23 +519,21 @@ static void *thread_receive_fds(void *data) goto error; } while (1) { - /* We first get the number of fd we are about to receive */ ret = lttcomm_recv_unix_sock(sock, &tmp, sizeof(struct lttcomm_kconsumerd_header)); - if (ret < 0) { + if (ret <= 0) { ERR("Receiving the lttcomm_kconsumerd_header, exiting"); goto error; } ret = consumerd_recv_fd(sock, tmp.payload_size, tmp.cmd_type); - if (ret < 0) { + if (ret <= 0) { ERR("Receiving the FD, exiting"); goto error; } } error: - cleanup(); return NULL; } @@ -608,7 +607,7 @@ static void *thread_poll_fds(void *data) int num_rdy, num_hup, high_prio, ret, i; struct pollfd *pollfd = NULL; /* local view of the fds */ - struct ltt_kconsumerd_fd *local_kconsumerd_fd = NULL; + struct ltt_kconsumerd_fd **local_kconsumerd_fd = NULL; /* local view of fds_count */ int nb_fd = 0; @@ -618,6 +617,8 @@ static void *thread_poll_fds(void *data) goto end; } + local_kconsumerd_fd = malloc(sizeof(struct ltt_kconsumerd_fd)); + while (1) { high_prio = 0; num_hup = 0; @@ -627,7 +628,7 @@ static void *thread_poll_fds(void *data) * local array as well */ if (update_fd_array) { - ret = update_poll_array(&pollfd, &local_kconsumerd_fd); + ret = update_poll_array(&pollfd, local_kconsumerd_fd); if (ret < 0) { ERR("Error in allocating pollfd or local_outfds"); send_error(KCONSUMERD_POLL_ERROR); @@ -666,7 +667,7 @@ static void *thread_poll_fds(void *data) case POLLPRI: DBG("Urgent read on fd %d", pollfd[i].fd); high_prio = 1; - ret = read_subbuffer(&local_kconsumerd_fd[i]); + ret = read_subbuffer(local_kconsumerd_fd[i]); /* it's ok to have an unavailable sub-buffer (FIXME : is it ?) */ if (ret == EAGAIN) { ret = 0; @@ -679,7 +680,7 @@ static void *thread_poll_fds(void *data) if (nb_fd > 0 && num_hup == nb_fd) { DBG("every buffer FD has hung up\n"); send_error(KCONSUMERD_POLL_HUP); - continue; + goto end; } /* Take care of low priority channels. */ @@ -688,7 +689,7 @@ static void *thread_poll_fds(void *data) switch(pollfd[i].revents) { case POLLIN: DBG("Normal read on fd %d", pollfd[i].fd); - ret = read_subbuffer(&local_kconsumerd_fd[i]); + ret = read_subbuffer(local_kconsumerd_fd[i]); /* it's ok to have an unavailable subbuffer (FIXME : is it ?) */ if (ret == EAGAIN) { ret = 0;