Don't quit when all FD hang up
authorJulien Desfossez <julien.desfossez@polymtl.ca>
Fri, 27 May 2011 22:07:48 +0000 (00:07 +0200)
committerDavid Goulet <david.goulet@polymtl.ca>
Wed, 1 Jun 2011 14:49:55 +0000 (10:49 -0400)
We now wait for the STOP command to exit the polling thread.
When we receive this command and all FD has hung up (all data is
consumed) then we can exit cleanly.
We also close every fd as soon as they report an error.

Signed-off-by: Julien Desfossez <julien.desfossez@polymtl.ca>
include/lttng-kconsumerd.h
kconsumerd/kconsumerd.c
liblttsessiondcomm/liblttsessiondcomm.h

index fa10ee25da6ff8887c99bf194281276f046d00d0..4f6fc8dc504397ddd0482dd68718caa69d3a1dd5 100644 (file)
@@ -30,8 +30,8 @@
 /* Commands for kconsumerd */
 enum kconsumerd_command {
        ADD_STREAM,
 /* Commands for kconsumerd */
 enum kconsumerd_command {
        ADD_STREAM,
-       UPDATE_STREAM, /* pause, delete, start depending on fd state */
-       STOP, /* delete all */
+       UPDATE_STREAM, /* pause, delete, active depending on fd state */
+       STOP, /* inform the kconsumerd to quit when all fd has hang up */
 };
 
 /* State of each fd in consumerd */
 };
 
 /* State of each fd in consumerd */
index 69aa47a6e5f0b175113b407eb91429c451b9af5b..5a1fe89a77cbd6772722cf45dbbab5f67732ef1e 100644 (file)
@@ -70,6 +70,9 @@ static int error_socket = -1;
 /* to count the number of time the user pressed ctrl+c */
 static int sigintcount = 0;
 
 /* to count the number of time the user pressed ctrl+c */
 static int sigintcount = 0;
 
+/* flag to inform the polling thread to quit when all fd hung up */
+static int quit = 0;
+
 /* Argument variables */
 int opt_quiet;
 int opt_verbose;
 /* Argument variables */
 int opt_quiet;
 int opt_verbose;
@@ -486,6 +489,7 @@ static int consumerd_recv_fd(int sfd, int size,
        }
 
 end:
        }
 
 end:
+       DBG("consumerd_recv_fd thread exiting");
        if (buf != NULL) {
                free(buf);
                buf = NULL;
        if (buf != NULL) {
                free(buf);
                buf = NULL;
@@ -509,43 +513,50 @@ static void *thread_receive_fds(void *data)
        client_socket = lttcomm_create_unix_sock(command_sock_path);
        if (client_socket < 0) {
                ERR("Cannot create command socket");
        client_socket = lttcomm_create_unix_sock(command_sock_path);
        if (client_socket < 0) {
                ERR("Cannot create command socket");
-               goto error;
+               goto end;
        }
 
        ret = lttcomm_listen_unix_sock(client_socket);
        if (ret < 0) {
        }
 
        ret = lttcomm_listen_unix_sock(client_socket);
        if (ret < 0) {
-               goto error;
+               goto end;
        }
 
        DBG("Sending ready command to ltt-sessiond");
        ret = send_error(KCONSUMERD_COMMAND_SOCK_READY);
        if (ret < 0) {
                ERR("Error sending ready command to ltt-sessiond");
        }
 
        DBG("Sending ready command to ltt-sessiond");
        ret = send_error(KCONSUMERD_COMMAND_SOCK_READY);
        if (ret < 0) {
                ERR("Error sending ready command to ltt-sessiond");
-               goto error;
+               goto end;
        }
 
        /* Blocking call, waiting for transmission */
        sock = lttcomm_accept_unix_sock(client_socket);
        if (sock <= 0) {
                WARN("On accept");
        }
 
        /* Blocking call, waiting for transmission */
        sock = lttcomm_accept_unix_sock(client_socket);
        if (sock <= 0) {
                WARN("On accept");
-               goto error;
+               goto end;
        }
        while (1) {
                /* We first get the number of fd we are about to receive */
                ret = lttcomm_recv_unix_sock(sock, &tmp,
                                sizeof(struct lttcomm_kconsumerd_header));
                if (ret <= 0) {
        }
        while (1) {
                /* We first get the number of fd we are about to receive */
                ret = lttcomm_recv_unix_sock(sock, &tmp,
                                sizeof(struct lttcomm_kconsumerd_header));
                if (ret <= 0) {
-                       ERR("Receiving the lttcomm_kconsumerd_header, exiting");
-                       goto error;
+                       ERR("Communication interrupted on command socket");
+                       goto end;
                }
                }
+               if (tmp.cmd_type == STOP) {
+                       DBG("Received STOP command");
+                       quit = 1;
+                       goto end;
+               }
+               /* we received a command to add or update fds */
                ret = consumerd_recv_fd(sock, tmp.payload_size, tmp.cmd_type);
                if (ret <= 0) {
                        ERR("Receiving the FD, exiting");
                ret = consumerd_recv_fd(sock, tmp.payload_size, tmp.cmd_type);
                if (ret <= 0) {
                        ERR("Receiving the FD, exiting");
-                       goto error;
+                       goto end;
                }
        }
 
                }
        }
 
-error:
+end:
+       DBG("thread_receive_fds exiting");
        return NULL;
 }
 
        return NULL;
 }
 
@@ -575,8 +586,6 @@ static int update_poll_array(struct pollfd **pollfd,
                        (*pollfd)[i].events = POLLIN | POLLPRI;
                        local_kconsumerd_fd[i] = iter;
                        i++;
                        (*pollfd)[i].events = POLLIN | POLLPRI;
                        local_kconsumerd_fd[i] = iter;
                        i++;
-               } else if (iter->state == DELETE_FD) {
-                       del_fd(iter);
                }
        }
        /*
                }
        }
        /*
@@ -681,16 +690,20 @@ static void *thread_poll_fds(void *data)
                        switch(pollfd[i].revents) {
                        case POLLERR:
                                ERR("Error returned in polling fd %d.", pollfd[i].fd);
                        switch(pollfd[i].revents) {
                        case POLLERR:
                                ERR("Error returned in polling fd %d.", pollfd[i].fd);
+                               del_fd(local_kconsumerd_fd[i]);
+                               update_fd_array = 1;
                                num_hup++;
                                num_hup++;
-                               send_error(KCONSUMERD_POLL_ERROR);
                                break;
                        case POLLHUP:
                                ERR("Polling fd %d tells it has hung up.", pollfd[i].fd);
                                break;
                        case POLLHUP:
                                ERR("Polling fd %d tells it has hung up.", pollfd[i].fd);
+                               del_fd(local_kconsumerd_fd[i]);
+                               update_fd_array = 1;
                                num_hup++;
                                break;
                        case POLLNVAL:
                                ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
                                num_hup++;
                                break;
                        case POLLNVAL:
                                ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
-                               send_error(KCONSUMERD_POLL_NVAL);
+                               del_fd(local_kconsumerd_fd[i]);
+                               update_fd_array = 1;
                                num_hup++;
                                break;
                        case POLLPRI:
                                num_hup++;
                                break;
                        case POLLPRI:
@@ -708,8 +721,10 @@ static void *thread_poll_fds(void *data)
                /* If every buffer FD has hung up, we end the read loop here */
                if (nb_fd > 0 && num_hup == nb_fd) {
                        DBG("every buffer FD has hung up\n");
                /* If every buffer FD has hung up, we end the read loop here */
                if (nb_fd > 0 && num_hup == nb_fd) {
                        DBG("every buffer FD has hung up\n");
-                       send_error(KCONSUMERD_POLL_HUP);
-                       goto end;
+                       if (quit == 1) {
+                               goto end;
+                       }
+                       continue;
                }
 
                /* Take care of low priority channels. */
                }
 
                /* Take care of low priority channels. */
@@ -727,6 +742,7 @@ static void *thread_poll_fds(void *data)
                }
        }
 end:
                }
        }
 end:
+       DBG("polling thread exiting");
        if (pollfd != NULL) {
                free(pollfd);
                pollfd = NULL;
        if (pollfd != NULL) {
                free(pollfd);
                pollfd = NULL;
index b563f6a7b6244d0f3b6102aefc9200657f39988e..cb9e26dd8d663cdfe8996da1404dfd43a8a0a4b9 100644 (file)
@@ -169,8 +169,7 @@ struct lttcomm_lttng_msg {
  */
 struct lttcomm_kconsumerd_header {
        u32 payload_size;
  */
 struct lttcomm_kconsumerd_header {
        u32 payload_size;
-       u32 cmd_type;   /* enum lttcomm_consumerd_command */
-       u32 ret_code;   /* enum lttcomm_return_code */
+       u32 cmd_type;   /* enum kconsumerd_command */
 };
 
 /* lttcomm_kconsumerd_msg represents a file descriptor to consume the
 };
 
 /* lttcomm_kconsumerd_msg represents a file descriptor to consume the
This page took 0.028023 seconds and 4 git commands to generate.