From 6aea26bc854e8090e609502e53a3e2923cb3b2c5 Mon Sep 17 00:00:00 2001 From: Julien Desfossez Date: Tue, 24 May 2011 17:31:14 +0200 Subject: [PATCH] Fix handling of multiple FDs This patch fixes the (normal) case where we handle more than one reading fd. Previous versions were only tested with one FD, as of now we can consume multiple fd (metadata and data for example). Signed-off-by: Julien Desfossez --- kconsumerd/kconsumerd.c | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/kconsumerd/kconsumerd.c b/kconsumerd/kconsumerd.c index 4c9c089aa..66f3877d0 100644 --- a/kconsumerd/kconsumerd.c +++ b/kconsumerd/kconsumerd.c @@ -79,6 +79,7 @@ static char error_sock_path[PATH_MAX]; /* Global error path */ */ static void del_fd(struct ltt_kconsumerd_fd *lcf) { + DBG("Removing %d", lcf->consumerd_fd); pthread_mutex_lock(&kconsumerd_lock_fds); cds_list_del(&lcf->list); if (fds_count > 0) { @@ -103,7 +104,6 @@ static void cleanup() { struct ltt_kconsumerd_fd *iter; - /* remove the socket file */ unlink(command_sock_path); @@ -118,7 +118,8 @@ static void cleanup() } } -/* send_error +/* + * send_error * * send return code to ltt-sessiond */ @@ -330,7 +331,7 @@ static int read_subbuffer(struct ltt_kconsumerd_fd *kconsumerd_fd) long ret = 0; int infd = kconsumerd_fd->consumerd_fd; - DBG("In read_subbuffer"); + DBG("In read_subbuffer (infd : %d)", infd); /* Get the next subbuffer */ err = kernctl_get_next_subbuf(infd); if (err != 0) { @@ -518,23 +519,21 @@ static void *thread_receive_fds(void *data) goto error; } while (1) { - /* We first get the number of fd we are about to receive */ ret = lttcomm_recv_unix_sock(sock, &tmp, sizeof(struct lttcomm_kconsumerd_header)); - if (ret < 0) { + if (ret <= 0) { ERR("Receiving the lttcomm_kconsumerd_header, exiting"); goto error; } ret = consumerd_recv_fd(sock, tmp.payload_size, tmp.cmd_type); - if (ret < 0) { + if (ret <= 0) { ERR("Receiving the FD, exiting"); goto error; } } error: - cleanup(); return NULL; } @@ -608,7 +607,7 @@ static void *thread_poll_fds(void *data) int num_rdy, num_hup, high_prio, ret, i; struct pollfd *pollfd = NULL; /* local view of the fds */ - struct ltt_kconsumerd_fd *local_kconsumerd_fd = NULL; + struct ltt_kconsumerd_fd **local_kconsumerd_fd = NULL; /* local view of fds_count */ int nb_fd = 0; @@ -618,6 +617,8 @@ static void *thread_poll_fds(void *data) goto end; } + local_kconsumerd_fd = malloc(sizeof(struct ltt_kconsumerd_fd)); + while (1) { high_prio = 0; num_hup = 0; @@ -627,7 +628,7 @@ static void *thread_poll_fds(void *data) * local array as well */ if (update_fd_array) { - ret = update_poll_array(&pollfd, &local_kconsumerd_fd); + ret = update_poll_array(&pollfd, local_kconsumerd_fd); if (ret < 0) { ERR("Error in allocating pollfd or local_outfds"); send_error(KCONSUMERD_POLL_ERROR); @@ -666,7 +667,7 @@ static void *thread_poll_fds(void *data) case POLLPRI: DBG("Urgent read on fd %d", pollfd[i].fd); high_prio = 1; - ret = read_subbuffer(&local_kconsumerd_fd[i]); + ret = read_subbuffer(local_kconsumerd_fd[i]); /* it's ok to have an unavailable sub-buffer (FIXME : is it ?) */ if (ret == EAGAIN) { ret = 0; @@ -679,7 +680,7 @@ static void *thread_poll_fds(void *data) if (nb_fd > 0 && num_hup == nb_fd) { DBG("every buffer FD has hung up\n"); send_error(KCONSUMERD_POLL_HUP); - continue; + goto end; } /* Take care of low priority channels. */ @@ -688,7 +689,7 @@ static void *thread_poll_fds(void *data) switch(pollfd[i].revents) { case POLLIN: DBG("Normal read on fd %d", pollfd[i].fd); - ret = read_subbuffer(&local_kconsumerd_fd[i]); + ret = read_subbuffer(local_kconsumerd_fd[i]); /* it's ok to have an unavailable subbuffer (FIXME : is it ?) */ if (ret == EAGAIN) { ret = 0; -- 2.34.1