Rename liblttsessiondcomm to liblttng-sessiond-comm, install it.
[lttng-tools.git] / liblttkconsumerd / liblttkconsumerd.c
index ffd2faf2eddbec9e9795cbac66441fd121e47d0f..9ad380c8dab5e8c1be679a0881890e929f355c57 100644 (file)
@@ -4,8 +4,8 @@
  *
  * This program is free software; you can redistribute it and/or
  * modify it under the terms of the GNU General Public License
- * as published by the Free Software Foundation; either version 2
- * of the License, or (at your option) any later version.
+ * as published by the Free Software Foundation; only version 2
+ * of the License.
  *
  * This program is distributed in the hope that it will be useful,
  * but WITHOUT ANY WARRANTY; without even the implied warranty of
 #include <sys/types.h>
 #include <unistd.h>
 #include <urcu/list.h>
+#include <assert.h>
 
 #include "libkernelctl.h"
 #include "liblttkconsumerd.h"
 #include "lttngerr.h"
 
-/* Init the list of FDs */
-static struct kconsumerd_fd_list kconsumerd_fd_list = {
-       .head = CDS_LIST_HEAD_INIT(kconsumerd_fd_list.head),
+static
+struct kconsumerd_global_data {
+       /*
+        * kconsumerd_data.lock protects kconsumerd_data.fd_list,
+        * kconsumerd_data.fds_count, and kconsumerd_data.need_update.  It
+        * ensures the count matches the number of items in the fd_list.
+        * It ensures the list updates *always* trigger an fd_array
+        * update (therefore need to make list update vs
+        * kconsumerd_data.need_update flag update atomic, and also flag
+        * read, fd array and flag clear atomic).
+        */
+       pthread_mutex_t lock;
+       /*
+        * Number of element for the list below.  Protected by
+        * kconsumerd_data.lock.
+        */
+       unsigned int fds_count;
+       /*
+        * List of FDs.  Protected by kconsumerd_data.lock.
+        */
+       struct kconsumerd_fd_list fd_list;
+       /*
+        * Flag specifying if the local array of FDs needs update in the
+        * poll function.  Protected by kconsumerd_data.lock.
+        */
+       unsigned int need_update;
+} kconsumerd_data = {
+       .fd_list.head = CDS_LIST_HEAD_INIT(kconsumerd_data.fd_list.head),
 };
 
-/* Number of element for the list below. */
-static unsigned int kconsumerd_fds_count;
-
-/* If the local array of FDs needs update in the poll function */
-static unsigned int kconsumerd_update_fd_array = 1;
-
-/* lock the fd array and structures */
-static pthread_mutex_t kconsumerd_lock_fds;
-
 /* communication with splice */
 static int kconsumerd_thread_pipe[2];
 
 /* pipe to wake the poll thread when necessary */
 static int kconsumerd_poll_pipe[2];
 
+/* to let the signal handler wake up the fd receiver thread */
+static int kconsumerd_should_quit[2];
+
 /* timeout parameter, to control the polling thread grace period */
 static int kconsumerd_poll_timeout = -1;
 
@@ -62,8 +82,13 @@ static int kconsumerd_error_socket;
 /* socket to exchange commands with sessiond */
 static char *kconsumerd_command_sock_path;
 
-/* flag to inform the polling thread to kconsumerd_quit when all fd hung up */
-static int kconsumerd_quit = 0;
+/*
+ * flag to inform the polling thread to quit when all fd hung up.
+ * Updated by the kconsumerd_thread_receive_fds when it notices that all
+ * fds has hung up. Also updated by the signal handler
+ * (kconsumerd_should_exit()). Read by the polling threads.
+ */
+static volatile int kconsumerd_quit = 0;
 
 /*
  * kconsumerd_set_error_socket
@@ -89,6 +114,7 @@ void kconsumerd_set_command_socket_path(char *sock)
  * kconsumerd_find_session_fd
  *
  * Find a session fd in the global list.
+ * The kconsumerd_data.lock must be locked during this call
  *
  * Return 1 if found else 0
  */
@@ -96,15 +122,12 @@ static int kconsumerd_find_session_fd(int fd)
 {
        struct kconsumerd_fd *iter;
 
-       pthread_mutex_lock(&kconsumerd_lock_fds);
-       cds_list_for_each_entry(iter, &kconsumerd_fd_list.head, list) {
+       cds_list_for_each_entry(iter, &kconsumerd_data.fd_list.head, list) {
                if (iter->sessiond_fd == fd) {
                        DBG("Duplicate session fd %d", fd);
-                       pthread_mutex_unlock(&kconsumerd_lock_fds);
                        return 1;
                }
        }
-       pthread_mutex_unlock(&kconsumerd_lock_fds);
 
        return 0;
 }
@@ -116,10 +139,10 @@ static int kconsumerd_find_session_fd(int fd)
  */
 static void kconsumerd_del_fd(struct kconsumerd_fd *lcf)
 {
-       pthread_mutex_lock(&kconsumerd_lock_fds);
+       pthread_mutex_lock(&kconsumerd_data.lock);
        cds_list_del(&lcf->list);
-       if (kconsumerd_fds_count > 0) {
-               kconsumerd_fds_count--;
+       if (kconsumerd_data.fds_count > 0) {
+               kconsumerd_data.fds_count--;
                if (lcf != NULL) {
                        close(lcf->out_fd);
                        close(lcf->consumerd_fd);
@@ -127,7 +150,8 @@ static void kconsumerd_del_fd(struct kconsumerd_fd *lcf)
                        lcf = NULL;
                }
        }
-       pthread_mutex_unlock(&kconsumerd_lock_fds);
+       kconsumerd_data.need_update = 1;
+       pthread_mutex_unlock(&kconsumerd_data.lock);
 }
 
 /*
@@ -140,6 +164,7 @@ static int kconsumerd_add_fd(struct lttcomm_kconsumerd_msg *buf, int consumerd_f
        int ret;
        struct kconsumerd_fd *tmp_fd;
 
+       pthread_mutex_lock(&kconsumerd_data.lock);
        /* Check if already exist */
        ret = kconsumerd_find_session_fd(buf->fd);
        if (ret == 1) {
@@ -152,6 +177,7 @@ static int kconsumerd_add_fd(struct lttcomm_kconsumerd_msg *buf, int consumerd_f
        tmp_fd->state = buf->state;
        tmp_fd->max_sb_size = buf->max_sb_size;
        strncpy(tmp_fd->path_name, buf->path_name, PATH_MAX);
+       tmp_fd->path_name[PATH_MAX - 1] = '\0';
 
        /* Opening the tracefile in write mode */
        ret = open(tmp_fd->path_name,
@@ -167,12 +193,11 @@ static int kconsumerd_add_fd(struct lttcomm_kconsumerd_msg *buf, int consumerd_f
        DBG("Adding %s (%d, %d, %d)", tmp_fd->path_name,
                        tmp_fd->sessiond_fd, tmp_fd->consumerd_fd, tmp_fd->out_fd);
 
-       pthread_mutex_lock(&kconsumerd_lock_fds);
-       cds_list_add(&tmp_fd->list, &kconsumerd_fd_list.head);
-       kconsumerd_fds_count++;
-       pthread_mutex_unlock(&kconsumerd_lock_fds);
-
+       cds_list_add(&tmp_fd->list, &kconsumerd_data.fd_list.head);
+       kconsumerd_data.fds_count++;
+       kconsumerd_data.need_update = 1;
 end:
+       pthread_mutex_unlock(&kconsumerd_data.lock);
        return ret;
 }
 
@@ -186,14 +211,15 @@ static void kconsumerd_change_fd_state(int sessiond_fd,
 {
        struct kconsumerd_fd *iter;
 
-       pthread_mutex_lock(&kconsumerd_lock_fds);
-       cds_list_for_each_entry(iter, &kconsumerd_fd_list.head, list) {
+       pthread_mutex_lock(&kconsumerd_data.lock);
+       cds_list_for_each_entry(iter, &kconsumerd_data.fd_list.head, list) {
                if (iter->sessiond_fd == sessiond_fd) {
                        iter->state = state;
                        break;
                }
        }
-       pthread_mutex_unlock(&kconsumerd_lock_fds);
+       kconsumerd_data.need_update = 1;
+       pthread_mutex_unlock(&kconsumerd_data.lock);
 }
 
 /*
@@ -203,6 +229,7 @@ static void kconsumerd_change_fd_state(int sessiond_fd,
  * to avoid doing a lookup in the linked list and concurrency issues
  * when writing is needed.
  * Returns the number of fds in the structures
+ * Called with kconsumerd_data.lock held.
  */
 static int kconsumerd_update_poll_array(struct pollfd **pollfd,
                struct kconsumerd_fd **local_kconsumerd_fd)
@@ -212,7 +239,7 @@ static int kconsumerd_update_poll_array(struct pollfd **pollfd,
 
        DBG("Updating poll fd array");
 
-       cds_list_for_each_entry(iter, &kconsumerd_fd_list.head, list) {
+       cds_list_for_each_entry(iter, &kconsumerd_data.fd_list.head, list) {
                DBG("Inside for each");
                if (iter->state == ACTIVE_FD) {
                        DBG("Active FD %d", iter->consumerd_fd);
@@ -229,8 +256,6 @@ static int kconsumerd_update_poll_array(struct pollfd **pollfd,
         */
        (*pollfd)[i].fd = kconsumerd_poll_pipe[0];
        (*pollfd)[i].events = POLLIN;
-
-       kconsumerd_update_fd_array = 0;
        return i;
 }
 
@@ -467,7 +492,7 @@ static int kconsumerd_read_subbuffer(struct kconsumerd_fd *kconsumerd_fd)
        }
 
        switch (DEFAULT_KERNEL_CHANNEL_OUTPUT) {
-       case LTTNG_KERNEL_SPLICE:
+       case LTTNG_EVENT_SPLICE:
                /* read the whole subbuffer */
                err = kernctl_get_padded_subbuf_size(infd, &len);
                if (err != 0) {
@@ -486,7 +511,7 @@ static int kconsumerd_read_subbuffer(struct kconsumerd_fd *kconsumerd_fd)
                        ERR("Error splicing to tracefile");
                }
                break;
-       case LTTNG_KERNEL_MMAP:
+       case LTTNG_EVENT_MMAP:
                /* read the used subbuffer size */
                err = kernctl_get_subbuf_size(infd, &len);
                if (err != 0) {
@@ -525,6 +550,32 @@ end:
        return ret;
 }
 
+/*
+ * kconsumerd_poll_socket
+ *
+ * Poll on the should_quit pipe and the command socket
+ * return -1 on error and should exit, 0 if data is
+ * available on the command socket
+ */
+int kconsumerd_poll_socket(struct pollfd *kconsumerd_sockpoll)
+{
+       int num_rdy;
+
+       num_rdy = poll(kconsumerd_sockpoll, 2, -1);
+       if (num_rdy == -1) {
+               perror("Poll error");
+               goto exit;
+       }
+       if (kconsumerd_sockpoll[0].revents == POLLIN) {
+               DBG("kconsumerd_should_quit wake up");
+               goto exit;
+       }
+       return 0;
+
+exit:
+       return -1;
+}
+
 /*
  * kconsumerd_consumerd_recv_fd
  *
@@ -532,10 +583,10 @@ end:
  * structures describing each fd (path name).
  * Returns the size of received data
  */
-static int kconsumerd_consumerd_recv_fd(int sfd, int size,
+static int kconsumerd_consumerd_recv_fd(int sfd,
+               struct pollfd *kconsumerd_sockpoll, int size,
                enum kconsumerd_command cmd_type)
 {
-       struct msghdr msg;
        struct iovec iov[1];
        int ret = 0, i, tmp2;
        struct cmsghdr *cmsg;
@@ -546,8 +597,11 @@ static int kconsumerd_consumerd_recv_fd(int sfd, int size,
        /* the number of fds we are about to receive */
        nb_fd = size / sizeof(struct lttcomm_kconsumerd_msg);
 
+       /*
+        * nb_fd is the number of fds we receive. One fd per recvmsg.
+        */
        for (i = 0; i < nb_fd; i++) {
-               memset(&msg, 0, sizeof(msg));
+               struct msghdr msg = { 0 };
 
                /* Prepare to receive the structures */
                iov[0].iov_base = &lkm;
@@ -559,6 +613,10 @@ static int kconsumerd_consumerd_recv_fd(int sfd, int size,
                msg.msg_controllen = sizeof(recv_fd);
 
                DBG("Waiting to receive fd");
+               if (kconsumerd_poll_socket(kconsumerd_sockpoll) < 0) {
+                       goto end;
+               }
+
                if ((ret = recvmsg(sfd, &msg, 0)) < 0) {
                        perror("recvmsg");
                        continue;
@@ -577,12 +635,13 @@ static int kconsumerd_consumerd_recv_fd(int sfd, int size,
                        kconsumerd_send_error(KCONSUMERD_ERROR_RECV_FD);
                        goto end;
                }
+
                /* if we received fds */
                if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) {
                        switch (cmd_type) {
                        case ADD_STREAM:
-                               DBG("kconsumerd_add_fd %s (%d)", lkm.path_name, (CMSG_DATA(cmsg)[0]));
-                               ret = kconsumerd_add_fd(&lkm, (CMSG_DATA(cmsg)[0]));
+                               DBG("kconsumerd_add_fd %s (%d)", lkm.path_name, ((int *) CMSG_DATA(cmsg))[0]);
+                               ret = kconsumerd_add_fd(&lkm, ((int *) CMSG_DATA(cmsg))[0]);
                                if (ret < 0) {
                                        kconsumerd_send_error(KCONSUMERD_OUTFD_ERROR);
                                        goto end;
@@ -594,10 +653,11 @@ static int kconsumerd_consumerd_recv_fd(int sfd, int size,
                        default:
                                break;
                        }
-                       /* flag to tell the polling thread to update its fd array */
-                       kconsumerd_update_fd_array = 1;
                        /* signal the poll thread */
                        tmp2 = write(kconsumerd_poll_pipe[1], "4", 1);
+                       if (tmp2 < 0) {
+                               perror("write kconsumerd poll");
+                       }
                } else {
                        ERR("Didn't received any fd");
                        kconsumerd_send_error(KCONSUMERD_ERROR_RECV_FD);
@@ -622,7 +682,7 @@ void *kconsumerd_thread_poll_fds(void *data)
        struct pollfd *pollfd = NULL;
        /* local view of the fds */
        struct kconsumerd_fd **local_kconsumerd_fd = NULL;
-       /* local view of kconsumerd_fds_count */
+       /* local view of kconsumerd_data.fds_count */
        int nb_fd = 0;
        char tmp;
        int tmp2;
@@ -643,7 +703,8 @@ void *kconsumerd_thread_poll_fds(void *data)
                 * the ltt_fd_list has been updated, we need to update our
                 * local array as well
                 */
-               if (kconsumerd_update_fd_array == 1) {
+               pthread_mutex_lock(&kconsumerd_data.lock);
+               if (kconsumerd_data.need_update) {
                        if (pollfd != NULL) {
                                free(pollfd);
                                pollfd = NULL;
@@ -653,33 +714,33 @@ void *kconsumerd_thread_poll_fds(void *data)
                                local_kconsumerd_fd = NULL;
                        }
 
-                       /* Lock mutex for fds count */
-                       pthread_mutex_lock(&kconsumerd_lock_fds);
                        /* allocate for all fds + 1 for the kconsumerd_poll_pipe */
-                       pollfd = malloc((kconsumerd_fds_count + 1) * sizeof(struct pollfd));
+                       pollfd = malloc((kconsumerd_data.fds_count + 1) * sizeof(struct pollfd));
                        if (pollfd == NULL) {
                                perror("pollfd malloc");
+                               pthread_mutex_unlock(&kconsumerd_data.lock);
                                goto end;
                        }
 
                        /* allocate for all fds + 1 for the kconsumerd_poll_pipe */
-                       local_kconsumerd_fd = malloc((kconsumerd_fds_count + 1) *
+                       local_kconsumerd_fd = malloc((kconsumerd_data.fds_count + 1) *
                                        sizeof(struct kconsumerd_fd));
                        if (local_kconsumerd_fd == NULL) {
                                perror("local_kconsumerd_fd malloc");
+                               pthread_mutex_unlock(&kconsumerd_data.lock);
                                goto end;
                        }
                        ret = kconsumerd_update_poll_array(&pollfd, local_kconsumerd_fd);
                        if (ret < 0) {
                                ERR("Error in allocating pollfd or local_outfds");
                                kconsumerd_send_error(KCONSUMERD_POLL_ERROR);
+                               pthread_mutex_unlock(&kconsumerd_data.lock);
                                goto end;
                        }
-                       /* Unlock mutex for fds count */
-                       pthread_mutex_unlock(&kconsumerd_lock_fds);
-
                        nb_fd = ret;
+                       kconsumerd_data.need_update = 0;
                }
+               pthread_mutex_unlock(&kconsumerd_data.lock);
 
                /* poll on the array of fds */
                DBG("polling on %d fd", nb_fd + 1);
@@ -700,12 +761,17 @@ void *kconsumerd_thread_poll_fds(void *data)
                }
 
                /*
-                * if only the kconsumerd_poll_pipe triggered poll to return just
-                * return to the beginning of the loop to update the array
+                * If the kconsumerd_poll_pipe triggered poll go
+                * directly to the beginning of the loop to update the
+                * array. We want to prioritize array update over
+                * low-priority reads.
                 */
-               if (num_rdy == 1 && pollfd[nb_fd].revents == POLLIN) {
+               if (pollfd[nb_fd].revents == POLLIN) {
                        DBG("kconsumerd_poll_pipe wake up");
                        tmp2 = read(kconsumerd_poll_pipe[0], &tmp, 1);
+                       if (tmp2 < 0) {
+                               perror("read kconsumerd poll");
+                       }
                        continue;
                }
 
@@ -715,19 +781,16 @@ void *kconsumerd_thread_poll_fds(void *data)
                        case POLLERR:
                                ERR("Error returned in polling fd %d.", pollfd[i].fd);
                                kconsumerd_del_fd(local_kconsumerd_fd[i]);
-                               kconsumerd_update_fd_array = 1;
                                num_hup++;
                                break;
                        case POLLHUP:
                                DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
                                kconsumerd_del_fd(local_kconsumerd_fd[i]);
-                               kconsumerd_update_fd_array = 1;
                                num_hup++;
                                break;
                        case POLLNVAL:
                                ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
                                kconsumerd_del_fd(local_kconsumerd_fd[i]);
-                               kconsumerd_update_fd_array = 1;
                                num_hup++;
                                break;
                        case POLLPRI:
@@ -766,7 +829,6 @@ void *kconsumerd_thread_poll_fds(void *data)
                }
        }
 end:
-       pthread_mutex_unlock(&kconsumerd_lock_fds);
        DBG("polling thread exiting");
        if (pollfd != NULL) {
                free(pollfd);
@@ -776,18 +838,38 @@ end:
                free(local_kconsumerd_fd);
                local_kconsumerd_fd = NULL;
        }
-       kconsumerd_cleanup();
        return NULL;
 }
 
 /*
- * kconsumerd_create_poll_pipe
+ * kconsumerd_init(void)
  *
- * create the pipe to wake to polling thread when needed
+ * initialise the necessary environnement :
+ * - inform the polling thread to update the polling array
+ * - create the poll_pipe
+ * - create the should_quit pipe (for signal handler)
  */
-int kconsumerd_create_poll_pipe()
+int kconsumerd_init(void)
 {
-       return pipe(kconsumerd_poll_pipe);
+       int ret;
+
+       /* need to update the polling array at init time */
+       kconsumerd_data.need_update = 1;
+
+       ret = pipe(kconsumerd_poll_pipe);
+       if (ret < 0) {
+               perror("Error creating poll pipe");
+               goto end;
+       }
+
+       ret = pipe(kconsumerd_should_quit);
+       if (ret < 0) {
+               perror("Error creating recv pipe");
+               goto end;
+       }
+
+end:
+       return ret;
 }
 
 /*
@@ -800,6 +882,12 @@ void *kconsumerd_thread_receive_fds(void *data)
 {
        int sock, client_socket, ret;
        struct lttcomm_kconsumerd_header tmp;
+       /*
+        * structure to poll for incoming data on communication socket
+        * avoids making blocking sockets
+        */
+       struct pollfd kconsumerd_sockpoll[2];
+
 
        DBG("Creating command socket %s", kconsumerd_command_sock_path);
        unlink(kconsumerd_command_sock_path);
@@ -821,13 +909,45 @@ void *kconsumerd_thread_receive_fds(void *data)
                goto end;
        }
 
+       ret = fcntl(client_socket, F_SETFL, O_NONBLOCK);
+       if (ret < 0) {
+               perror("fcntl O_NONBLOCK");
+               goto end;
+       }
+
+       /* prepare the FDs to poll : to client socket and the should_quit pipe */
+       kconsumerd_sockpoll[0].fd = kconsumerd_should_quit[0];
+       kconsumerd_sockpoll[0].events = POLLIN | POLLPRI;
+       kconsumerd_sockpoll[1].fd = client_socket;
+       kconsumerd_sockpoll[1].events = POLLIN | POLLPRI;
+
+       if (kconsumerd_poll_socket(kconsumerd_sockpoll) < 0) {
+               goto end;
+       }
+       DBG("Connection on client_socket");
+
        /* Blocking call, waiting for transmission */
        sock = lttcomm_accept_unix_sock(client_socket);
        if (sock <= 0) {
                WARN("On accept");
                goto end;
        }
+       ret = fcntl(sock, F_SETFL, O_NONBLOCK);
+       if (ret < 0) {
+               perror("fcntl O_NONBLOCK");
+               goto end;
+       }
+
+       /* update the polling structure to poll on the established socket */
+       kconsumerd_sockpoll[1].fd = sock;
+       kconsumerd_sockpoll[1].events = POLLIN | POLLPRI;
+
        while (1) {
+               if (kconsumerd_poll_socket(kconsumerd_sockpoll) < 0) {
+                       goto end;
+               }
+               DBG("Incoming fds on sock");
+
                /* We first get the number of fd we are about to receive */
                ret = lttcomm_recv_unix_sock(sock, &tmp,
                                sizeof(struct lttcomm_kconsumerd_header));
@@ -839,12 +959,19 @@ void *kconsumerd_thread_receive_fds(void *data)
                        DBG("Received STOP command");
                        goto end;
                }
+               if (kconsumerd_quit) {
+                       DBG("kconsumerd_thread_receive_fds received quit from signal");
+                       goto end;
+               }
+
                /* we received a command to add or update fds */
-               ret = kconsumerd_consumerd_recv_fd(sock, tmp.payload_size, tmp.cmd_type);
+               ret = kconsumerd_consumerd_recv_fd(sock, kconsumerd_sockpoll,
+                               tmp.payload_size, tmp.cmd_type);
                if (ret <= 0) {
                        ERR("Receiving the FD, exiting");
                        goto end;
                }
+               DBG("received fds on sock");
        }
 
 end:
@@ -876,19 +1003,38 @@ end:
  *
  *  Cleanup the daemon's socket on exit
  */
-void kconsumerd_cleanup()
+void kconsumerd_cleanup(void)
 {
-       struct kconsumerd_fd *iter;
+       struct kconsumerd_fd *iter, *tmp;
 
        /* remove the socket file */
        unlink(kconsumerd_command_sock_path);
 
-       /* close all outfd */
-       cds_list_for_each_entry(iter, &kconsumerd_fd_list.head, list) {
+       /*
+        * close all outfd. Called when there are no more threads
+        * running (after joining on the threads), no need to protect
+        * list iteration with mutex.
+        */
+       cds_list_for_each_entry_safe(iter, tmp, &kconsumerd_data.fd_list.head, list) {
                kconsumerd_del_fd(iter);
        }
 }
 
+/*
+ * kconsumerd_should_exit
+ *
+ * Called from signal handler.
+ */
+void kconsumerd_should_exit(void)
+{
+       int ret;
+       kconsumerd_quit = 1;
+       ret = write(kconsumerd_should_quit[1], "4", 1);
+       if (ret < 0) {
+               perror("write kconsumerd quit");
+       }
+}
+
 /*
  * kconsumerd_send_error
  *
This page took 0.030643 seconds and 4 git commands to generate.