Move accept sequence
[lttng-tools.git] / kconsumerd / kconsumerd.c
index 7e166b8f144886dc254cf997c4753721e3350efb..96051caafdfeba7dd09b4029c16404d28769cf08 100644 (file)
@@ -72,6 +72,28 @@ static const char *progname;
 static char command_sock_path[PATH_MAX]; /* Global command socket path */
 static char error_sock_path[PATH_MAX]; /* Global error path */
 
+/*
+ * del_fd
+ *
+ * Remove a fd from the global list protected by a mutex
+ */
+static void del_fd(struct ltt_kconsumerd_fd *lcf)
+{
+       pthread_mutex_lock(&kconsumerd_lock_fds);
+       cds_list_del(&lcf->list);
+       if (fds_count > 0) {
+               fds_count--;
+               DBG("Removed ltt_kconsumerd_fd");
+               if (lcf != NULL) {
+                       close(lcf->out_fd);
+                       close(lcf->consumerd_fd);
+                       free(lcf);
+                       lcf = NULL;
+               }
+       }
+       pthread_mutex_unlock(&kconsumerd_lock_fds);
+}
+
 /*
  *  cleanup
  *
@@ -79,7 +101,21 @@ static char error_sock_path[PATH_MAX]; /* Global error path */
  */
 static void cleanup()
 {
+       struct ltt_kconsumerd_fd *iter;
+
+
+       /* remove the socket file */
        unlink(command_sock_path);
+
+       /* unblock the threads */
+       WARN("Terminating the threads before exiting");
+       pthread_cancel(threads[0]);
+       pthread_cancel(threads[1]);
+
+       /* close all outfd */
+       cds_list_for_each_entry(iter, &kconsumerd_fd_list.head, list) {
+               del_fd(iter);
+       }
 }
 
 /* send_error
@@ -96,21 +132,6 @@ static int send_error(enum lttcomm_return_code cmd)
        }
 }
 
-/*
- * cleanup_kconsumerd_fd
- *
- * Close the FDs and frees a ltt_kconsumerd_fd struct
- */
-static void cleanup_kconsumerd_fd(struct ltt_kconsumerd_fd *lcf)
-{
-       if (lcf != NULL) {
-               close(lcf->out_fd);
-               close(lcf->consumerd_fd);
-               free(lcf);
-               lcf = NULL;
-       }
-}
-
 /*
  * add_fd
  *
@@ -152,36 +173,6 @@ end:
        return ret;
 }
 
-/*
- * del_fd
- *
- * Remove a fd from the global list protected by a mutex
- */
-static void del_fd(struct ltt_kconsumerd_fd *lcf)
-{
-       pthread_mutex_lock(&kconsumerd_lock_fds);
-       cds_list_del(&lcf->list);
-       if (fds_count > 0) {
-               fds_count--;
-               DBG("Removed ltt_kconsumerd_fd");
-               cleanup_kconsumerd_fd(lcf);
-       }
-       pthread_mutex_unlock(&kconsumerd_lock_fds);
-}
-
-/*
- * close_outfds
- *
- * Close all fds in the previous fd_list
- * Must be used with kconsumerd_lock_fds lock held
- */
-static void close_outfds()
-{
-       struct ltt_kconsumerd_fd *iter;
-       cds_list_for_each_entry(iter, &kconsumerd_fd_list.head, list) {
-               del_fd(iter);
-       }
-}
 
 /*
  *  sighandler
@@ -190,11 +181,6 @@ static void close_outfds()
  */
 static void sighandler(int sig)
 {
-       /* unblock the threads */
-       pthread_cancel(threads[0]);
-       pthread_cancel(threads[1]);
-
-       close_outfds();
        cleanup();
 
        return;
@@ -260,16 +246,18 @@ static int on_read_subbuffer(struct ltt_kconsumerd_fd *kconsumerd_fd,
                                SPLICE_F_MOVE | SPLICE_F_MORE);
                DBG("splice chan to pipe ret %ld", ret);
                if (ret < 0) {
+                       ret = errno;
                        perror("Error in relay splice");
-                       goto write_end;
+                       goto splice_error;
                }
 
                ret = splice(thread_pipe[0], NULL, outfd, NULL, ret,
                                SPLICE_F_MOVE | SPLICE_F_MORE);
                DBG("splice pipe to file %ld", ret);
                if (ret < 0) {
+                       ret = errno;
                        perror("Error in file splice");
-                       goto write_end;
+                       goto splice_error;
                }
                if (ret >= len) {
                        len = 0;
@@ -279,7 +267,7 @@ static int on_read_subbuffer(struct ltt_kconsumerd_fd *kconsumerd_fd,
                                SYNC_FILE_RANGE_WRITE);
                kconsumerd_fd->out_fd_offset += ret;
        }
-write_end:
+
        /*
         * This does a blocking write-and-wait on any page that belongs to the
         * subbuffer prior to the one we just wrote.
@@ -307,6 +295,26 @@ write_end:
                posix_fadvise(outfd, orig_offset - kconsumerd_fd->max_sb_size,
                                kconsumerd_fd->max_sb_size, POSIX_FADV_DONTNEED);
        }
+       goto end;
+
+splice_error:
+       /* send the appropriate error description to sessiond */
+       switch(ret) {
+               case EBADF:
+                       send_error(KCONSUMERD_SPLICE_EBADF);
+                       break;
+               case EINVAL:
+                       send_error(KCONSUMERD_SPLICE_EINVAL);
+                       break;
+               case ENOMEM:
+                       send_error(KCONSUMERD_SPLICE_ENOMEM);
+                       break;
+               case ESPIPE:
+                       send_error(KCONSUMERD_SPLICE_ESPIPE);
+                       break;
+       }
+
+end:
        return ret;
 }
 
@@ -503,27 +511,30 @@ static void *thread_receive_fds(void *data)
                goto error;
        }
 
+       /* Blocking call, waiting for transmission */
+       sock = lttcomm_accept_unix_sock(client_socket);
+       if (sock <= 0) {
+               WARN("On accept, retrying");
+               goto error;
+       }
        while (1) {
-               /* Blocking call, waiting for transmission */
-               sock = lttcomm_accept_unix_sock(client_socket);
-               if (sock <= 0) {
-                       continue;
-               }
 
                /* We first get the number of fd we are about to receive */
                ret = lttcomm_recv_unix_sock(sock, &tmp,
                                sizeof(struct lttcomm_kconsumerd_header));
                if (ret < 0) {
-                       ERR("Receiving the lttcomm_kconsumerd_header");
-                       continue;
+                       ERR("Receiving the lttcomm_kconsumerd_header, exiting");
+                       goto error;
                }
                ret = consumerd_recv_fd(sock, tmp.payload_size, tmp.cmd_type);
                if (ret < 0) {
-                       continue;
+                       ERR("Receiving the FD, exiting");
+                       goto error;
                }
        }
 
 error:
+       cleanup();
        return NULL;
 }
 
@@ -696,6 +707,7 @@ end:
                free(local_kconsumerd_fd);
                local_kconsumerd_fd = NULL;
        }
+       cleanup();
        return NULL;
 }
 
This page took 0.035951 seconds and 4 git commands to generate.