Fix sending fd through sendmsg/recvmsg ancillary data
[lttng-tools.git] / liblttkconsumerd / liblttkconsumerd.c
index 3fc42fc6a41cbe65213c437c8e55dbf7cb7d71c1..1d69a4c463faffc1fa764141e287f725a625c535 100644 (file)
@@ -4,8 +4,8 @@
  *
  * This program is free software; you can redistribute it and/or
  * modify it under the terms of the GNU General Public License
- * as published by the Free Software Foundation; either version 2
- * of the License, or (at your option) any later version.
+ * as published by the Free Software Foundation; only version 2
+ * of the License.
  *
  * This program is distributed in the hope that it will be useful,
  * but WITHOUT ANY WARRANTY; without even the implied warranty of
@@ -28,6 +28,7 @@
 #include <sys/types.h>
 #include <unistd.h>
 #include <urcu/list.h>
+#include <assert.h>
 
 #include "libkernelctl.h"
 #include "liblttkconsumerd.h"
@@ -61,7 +62,6 @@ struct kconsumerd_global_data {
        unsigned int need_update;
 } kconsumerd_data = {
        .fd_list.head = CDS_LIST_HEAD_INIT(kconsumerd_data.fd_list.head),
-       .need_update = 1,
 };
 
 /* communication with splice */
@@ -70,12 +70,8 @@ static int kconsumerd_thread_pipe[2];
 /* pipe to wake the poll thread when necessary */
 static int kconsumerd_poll_pipe[2];
 
-/*
- * TODO: create a should_quit pipe to let the signal handler wake up the
- * fd receiver thread. It should be initialized before any signal can be
- * received by the library.
- */
-
+/* to let the signal handler wake up the fd receiver thread */
+static int kconsumerd_should_quit[2];
 
 /* timeout parameter, to control the polling thread grace period */
 static int kconsumerd_poll_timeout = -1;
@@ -129,7 +125,6 @@ static int kconsumerd_find_session_fd(int fd)
        cds_list_for_each_entry(iter, &kconsumerd_data.fd_list.head, list) {
                if (iter->sessiond_fd == fd) {
                        DBG("Duplicate session fd %d", fd);
-                       pthread_mutex_unlock(&kconsumerd_data.lock);
                        return 1;
                }
        }
@@ -554,6 +549,32 @@ end:
        return ret;
 }
 
+/*
+ * kconsumerd_poll_socket
+ *
+ * Poll on the should_quit pipe and the command socket
+ * return -1 on error and should exit, 0 if data is
+ * available on the command socket
+ */
+int kconsumerd_poll_socket(struct pollfd *kconsumerd_sockpoll)
+{
+       int num_rdy;
+
+       num_rdy = poll(kconsumerd_sockpoll, 2, -1);
+       if (num_rdy == -1) {
+               perror("Poll error");
+               goto exit;
+       }
+       if (kconsumerd_sockpoll[0].revents == POLLIN) {
+               DBG("kconsumerd_should_quit wake up");
+               goto exit;
+       }
+       return 0;
+
+exit:
+       return -1;
+}
+
 /*
  * kconsumerd_consumerd_recv_fd
  *
@@ -561,10 +582,10 @@ end:
  * structures describing each fd (path name).
  * Returns the size of received data
  */
-static int kconsumerd_consumerd_recv_fd(int sfd, int size,
+static int kconsumerd_consumerd_recv_fd(int sfd,
+               struct pollfd *kconsumerd_sockpoll, int size,
                enum kconsumerd_command cmd_type)
 {
-       struct msghdr msg;
        struct iovec iov[1];
        int ret = 0, i, tmp2;
        struct cmsghdr *cmsg;
@@ -575,8 +596,15 @@ static int kconsumerd_consumerd_recv_fd(int sfd, int size,
        /* the number of fds we are about to receive */
        nb_fd = size / sizeof(struct lttcomm_kconsumerd_msg);
 
+       /*
+        * Note: only supporting receiving one FD at a time for now.
+        * This code needs fixing if we wish to receive more (a single
+        * receive for the whole fd batch rather than one per fd).
+        */
+       assert(nb_fd == 1);
+
        for (i = 0; i < nb_fd; i++) {
-               memset(&msg, 0, sizeof(msg));
+               struct msghdr msg = { 0 };
 
                /* Prepare to receive the structures */
                iov[0].iov_base = &lkm;
@@ -588,6 +616,10 @@ static int kconsumerd_consumerd_recv_fd(int sfd, int size,
                msg.msg_controllen = sizeof(recv_fd);
 
                DBG("Waiting to receive fd");
+               if (kconsumerd_poll_socket(kconsumerd_sockpoll) < 0) {
+                       goto end;
+               }
+
                if ((ret = recvmsg(sfd, &msg, 0)) < 0) {
                        perror("recvmsg");
                        continue;
@@ -606,12 +638,13 @@ static int kconsumerd_consumerd_recv_fd(int sfd, int size,
                        kconsumerd_send_error(KCONSUMERD_ERROR_RECV_FD);
                        goto end;
                }
+
                /* if we received fds */
                if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) {
                        switch (cmd_type) {
                        case ADD_STREAM:
-                               DBG("kconsumerd_add_fd %s (%d)", lkm.path_name, (CMSG_DATA(cmsg)[0]));
-                               ret = kconsumerd_add_fd(&lkm, (CMSG_DATA(cmsg)[0]));
+                               DBG("kconsumerd_add_fd %s (%d)", lkm.path_name, ((int *) CMSG_DATA(cmsg))[0]);
+                               ret = kconsumerd_add_fd(&lkm, ((int *) CMSG_DATA(cmsg))[0]);
                                if (ret < 0) {
                                        kconsumerd_send_error(KCONSUMERD_OUTFD_ERROR);
                                        goto end;
@@ -625,6 +658,9 @@ static int kconsumerd_consumerd_recv_fd(int sfd, int size,
                        }
                        /* signal the poll thread */
                        tmp2 = write(kconsumerd_poll_pipe[1], "4", 1);
+                       if (tmp2 < 0) {
+                               perror("write kconsumerd poll");
+                       }
                } else {
                        ERR("Didn't received any fd");
                        kconsumerd_send_error(KCONSUMERD_ERROR_RECV_FD);
@@ -736,6 +772,9 @@ void *kconsumerd_thread_poll_fds(void *data)
                if (pollfd[nb_fd].revents == POLLIN) {
                        DBG("kconsumerd_poll_pipe wake up");
                        tmp2 = read(kconsumerd_poll_pipe[0], &tmp, 1);
+                       if (tmp2 < 0) {
+                               perror("read kconsumerd poll");
+                       }
                        continue;
                }
 
@@ -806,13 +845,34 @@ end:
 }
 
 /*
- * kconsumerd_create_poll_pipe
+ * kconsumerd_init(void)
  *
- * create the pipe to wake to polling thread when needed
+ * initialise the necessary environnement :
+ * - inform the polling thread to update the polling array
+ * - create the poll_pipe
+ * - create the should_quit pipe (for signal handler)
  */
-int kconsumerd_create_poll_pipe()
+int kconsumerd_init(void)
 {
-       return pipe(kconsumerd_poll_pipe);
+       int ret;
+
+       /* need to update the polling array at init time */
+       kconsumerd_data.need_update = 1;
+
+       ret = pipe(kconsumerd_poll_pipe);
+       if (ret < 0) {
+               perror("Error creating poll pipe");
+               goto end;
+       }
+
+       ret = pipe(kconsumerd_should_quit);
+       if (ret < 0) {
+               perror("Error creating recv pipe");
+               goto end;
+       }
+
+end:
+       return ret;
 }
 
 /*
@@ -825,6 +885,12 @@ void *kconsumerd_thread_receive_fds(void *data)
 {
        int sock, client_socket, ret;
        struct lttcomm_kconsumerd_header tmp;
+       /*
+        * structure to poll for incoming data on communication socket
+        * avoids making blocking sockets
+        */
+       struct pollfd kconsumerd_sockpoll[2];
+
 
        DBG("Creating command socket %s", kconsumerd_command_sock_path);
        unlink(kconsumerd_command_sock_path);
@@ -846,18 +912,46 @@ void *kconsumerd_thread_receive_fds(void *data)
                goto end;
        }
 
-       /* TODO: poll on socket and "should_quit" fd pipe */
-       /* TODO: change blocking call into non-blocking call */
+       ret = fcntl(client_socket, F_SETFL, O_NONBLOCK);
+       if (ret < 0) {
+               perror("fcntl O_NONBLOCK");
+               goto end;
+       }
+
+       /* prepare the FDs to poll : to client socket and the should_quit pipe */
+       kconsumerd_sockpoll[0].fd = kconsumerd_should_quit[0];
+       kconsumerd_sockpoll[0].events = POLLIN | POLLPRI;
+       kconsumerd_sockpoll[1].fd = client_socket;
+       kconsumerd_sockpoll[1].events = POLLIN | POLLPRI;
+
+       if (kconsumerd_poll_socket(kconsumerd_sockpoll) < 0) {
+               goto end;
+       }
+       DBG("Connection on client_socket");
+
        /* Blocking call, waiting for transmission */
        sock = lttcomm_accept_unix_sock(client_socket);
        if (sock <= 0) {
                WARN("On accept");
                goto end;
        }
+       ret = fcntl(sock, F_SETFL, O_NONBLOCK);
+       if (ret < 0) {
+               perror("fcntl O_NONBLOCK");
+               goto end;
+       }
+
+       /* update the polling structure to poll on the established socket */
+       kconsumerd_sockpoll[1].fd = sock;
+       kconsumerd_sockpoll[1].events = POLLIN | POLLPRI;
+
        while (1) {
+               if (kconsumerd_poll_socket(kconsumerd_sockpoll) < 0) {
+                       goto end;
+               }
+               DBG("Incoming fds on sock");
+
                /* We first get the number of fd we are about to receive */
-               /* TODO: poll on sock and "should_quit" fd pipe */
-               /* TODO: change recv into a non-blocking call */
                ret = lttcomm_recv_unix_sock(sock, &tmp,
                                sizeof(struct lttcomm_kconsumerd_header));
                if (ret <= 0) {
@@ -872,12 +966,15 @@ void *kconsumerd_thread_receive_fds(void *data)
                        DBG("kconsumerd_thread_receive_fds received quit from signal");
                        goto end;
                }
+
                /* we received a command to add or update fds */
-               ret = kconsumerd_consumerd_recv_fd(sock, tmp.payload_size, tmp.cmd_type);
+               ret = kconsumerd_consumerd_recv_fd(sock, kconsumerd_sockpoll,
+                               tmp.payload_size, tmp.cmd_type);
                if (ret <= 0) {
                        ERR("Receiving the FD, exiting");
                        goto end;
                }
+               DBG("received fds on sock");
        }
 
 end:
@@ -927,15 +1024,18 @@ void kconsumerd_cleanup(void)
 }
 
 /*
+ * kconsumerd_should_exit
+ *
  * Called from signal handler.
  */
 void kconsumerd_should_exit(void)
 {
+       int ret;
        kconsumerd_quit = 1;
-       /*
-        * TODO: write into a should_quit pipe to wake up the fd
-        * receiver thread.
-        */
+       ret = write(kconsumerd_should_quit[1], "4", 1);
+       if (ret < 0) {
+               perror("write kconsumerd quit");
+       }
 }
 
 /*
This page took 0.026735 seconds and 4 git commands to generate.