Fix: health subsystem issues with shared code
[lttng-tools.git] / src / bin / lttng-sessiond / main.c
index c0cfddb96166840bfe3effdfe0e7dcf36566a7d6..d999928feb8a0f06bfe24857fc3182f56c4f903f 100644 (file)
@@ -21,7 +21,6 @@
 #include <grp.h>
 #include <limits.h>
 #include <pthread.h>
-#include <semaphore.h>
 #include <signal.h>
 #include <stdio.h>
 #include <stdlib.h>
@@ -60,8 +59,8 @@
 #include "ust-consumer.h"
 #include "utils.h"
 #include "fd-limit.h"
-#include "filter.h"
 #include "health.h"
+#include "testpoint.h"
 
 #define CONSUMERD_FILE "lttng-consumerd"
 
@@ -73,6 +72,7 @@ const char default_global_apps_pipe[] = DEFAULT_GLOBAL_APPS_PIPE;
 
 const char *progname;
 const char *opt_tracing_group;
+static const char *opt_pidfile;
 static int opt_sig_parent;
 static int opt_verbose_consumer;
 static int opt_daemon;
@@ -81,7 +81,10 @@ static int is_root;                  /* Set to 1 if the daemon is running as root */
 static pid_t ppid;          /* Parent PID for --sig-parent option */
 static char *rundir;
 
-/* Consumer daemon specific control data */
+/*
+ * Consumer daemon specific control data. Every value not initialized here is
+ * set to 0 by the static definition.
+ */
 static struct consumer_data kconsumer_data = {
        .type = LTTNG_CONSUMER_KERNEL,
        .err_unix_sock_path = DEFAULT_KCONSUMERD_ERR_SOCK_PATH,
@@ -90,6 +93,8 @@ static struct consumer_data kconsumer_data = {
        .cmd_sock = -1,
        .pid_mutex = PTHREAD_MUTEX_INITIALIZER,
        .lock = PTHREAD_MUTEX_INITIALIZER,
+       .cond = PTHREAD_COND_INITIALIZER,
+       .cond_mutex = PTHREAD_MUTEX_INITIALIZER,
 };
 static struct consumer_data ustconsumer64_data = {
        .type = LTTNG_CONSUMER64_UST,
@@ -99,6 +104,8 @@ static struct consumer_data ustconsumer64_data = {
        .cmd_sock = -1,
        .pid_mutex = PTHREAD_MUTEX_INITIALIZER,
        .lock = PTHREAD_MUTEX_INITIALIZER,
+       .cond = PTHREAD_COND_INITIALIZER,
+       .cond_mutex = PTHREAD_MUTEX_INITIALIZER,
 };
 static struct consumer_data ustconsumer32_data = {
        .type = LTTNG_CONSUMER32_UST,
@@ -108,6 +115,8 @@ static struct consumer_data ustconsumer32_data = {
        .cmd_sock = -1,
        .pid_mutex = PTHREAD_MUTEX_INITIALIZER,
        .lock = PTHREAD_MUTEX_INITIALIZER,
+       .cond = PTHREAD_COND_INITIALIZER,
+       .cond_mutex = PTHREAD_MUTEX_INITIALIZER,
 };
 
 /* Shared between threads */
@@ -219,6 +228,11 @@ struct health_state health_thread_app_manage;
 struct health_state health_thread_app_reg;
 struct health_state health_thread_kernel;
 
+/*
+ * Socket timeout for receiving and sending in seconds.
+ */
+static int app_socket_timeout;
+
 static
 void setup_consumerd_path(void)
 {
@@ -383,7 +397,7 @@ static void stop_threads(void)
 static void cleanup(void)
 {
        int ret;
-       char *cmd;
+       char *cmd = NULL;
        struct ltt_session *sess, *stmp;
 
        DBG("Cleaning up");
@@ -391,6 +405,17 @@ static void cleanup(void)
        /* First thing first, stop all threads */
        utils_close_pipe(thread_quit_pipe);
 
+       /*
+        * If opt_pidfile is undefined, the default file will be wiped when
+        * removing the rundir.
+        */
+       if (opt_pidfile) {
+               ret = remove(opt_pidfile);
+               if (ret < 0) {
+                       PERROR("remove pidfile %s", opt_pidfile);
+               }
+       }
+
        DBG("Removing %s directory", rundir);
        ret = asprintf(&cmd, "rm -rf %s", rundir);
        if (ret < 0) {
@@ -403,6 +428,7 @@ static void cleanup(void)
                ERR("Unable to clean %s", rundir);
        }
        free(cmd);
+       free(rundir);
 
        DBG("Cleaning up all sessions");
 
@@ -432,9 +458,6 @@ static void cleanup(void)
                modprobe_remove_lttng_all();
        }
 
-       utils_close_pipe(kernel_poll_pipe);
-       utils_close_pipe(apps_cmd_pipe);
-
        /* <fun> */
        DBG("%c[%d;%dm*** assert failed :-) *** ==> %c[%dm%c[%d;%dm"
                        "Matthew, BEET driven development works!%c[%dm",
@@ -450,7 +473,7 @@ static void cleanup(void)
 static int send_unix_sock(int sock, void *buf, size_t len)
 {
        /* Check valid length */
-       if (len <= 0) {
+       if (len == 0) {
                return -1;
        }
 
@@ -615,20 +638,22 @@ static int update_kernel_stream(struct consumer_data *consumer_data, int fd)
                                        struct lttng_ht_iter iter;
                                        struct consumer_socket *socket;
 
-
+                                       rcu_read_lock();
                                        cds_lfht_for_each_entry(ksess->consumer->socks->ht,
                                                        &iter.iter, socket, node.node) {
                                                /* Code flow error */
                                                assert(socket->fd >= 0);
 
                                                pthread_mutex_lock(socket->lock);
-                                               ret = kernel_consumer_send_channel_stream(socket->fd,
+                                               ret = kernel_consumer_send_channel_stream(socket,
                                                                channel, ksess);
                                                pthread_mutex_unlock(socket->lock);
                                                if (ret < 0) {
+                                                       rcu_read_unlock();
                                                        goto error;
                                                }
                                        }
+                                       rcu_read_unlock();
                                }
                                goto error;
                        }
@@ -678,30 +703,44 @@ static void *thread_manage_kernel(void *data)
        char tmp;
        struct lttng_poll_event events;
 
-       DBG("Thread manage kernel started");
+       DBG("[thread] Thread manage kernel started");
 
-       health_code_update(&health_thread_kernel);
+       health_register(HEALTH_TYPE_KERNEL);
 
-       ret = create_thread_poll_set(&events, 2);
-       if (ret < 0) {
-               goto error_poll_create;
+       /*
+        * This first step of the while is to clean this structure which could free
+        * non NULL pointers so zero it before the loop.
+        */
+       memset(&events, 0, sizeof(events));
+
+       if (testpoint(thread_manage_kernel)) {
+               goto error_testpoint;
        }
 
-       ret = lttng_poll_add(&events, kernel_poll_pipe[0], LPOLLIN);
-       if (ret < 0) {
-               goto error;
+       health_code_update(&health_thread_kernel);
+
+       if (testpoint(thread_manage_kernel_before_loop)) {
+               goto error_testpoint;
        }
 
        while (1) {
                health_code_update(&health_thread_kernel);
 
                if (update_poll_flag == 1) {
-                       /*
-                        * Reset number of fd in the poll set. Always 2 since there is the thread
-                        * quit pipe and the kernel pipe.
-                        */
-                       events.nb_fd = 2;
+                       /* Clean events object. We are about to populate it again. */
+                       lttng_poll_clean(&events);
+
+                       ret = create_thread_poll_set(&events, 2);
+                       if (ret < 0) {
+                               goto error_poll_create;
+                       }
 
+                       ret = lttng_poll_add(&events, kernel_poll_pipe[0], LPOLLIN);
+                       if (ret < 0) {
+                               goto error;
+                       }
+
+                       /* This will add the available kernel channel if any. */
                        ret = update_kernel_poll(&events);
                        if (ret < 0) {
                                goto error;
@@ -709,12 +748,7 @@ static void *thread_manage_kernel(void *data)
                        update_poll_flag = 0;
                }
 
-               nb_fd = LTTNG_POLL_GETNB(&events);
-
-               DBG("Thread kernel polling on %d fds", nb_fd);
-
-               /* Zeroed the poll events */
-               lttng_poll_reset(&events);
+               DBG("Thread kernel polling on %d fds", LTTNG_POLL_GETNB(&events));
 
                /* Poll infinite value of time */
        restart:
@@ -736,6 +770,8 @@ static void *thread_manage_kernel(void *data)
                        continue;
                }
 
+               nb_fd = ret;
+
                for (i = 0; i < nb_fd; i++) {
                        /* Fetch once the poll data */
                        revents = LTTNG_POLL_GETEV(&events, i);
@@ -752,7 +788,13 @@ static void *thread_manage_kernel(void *data)
 
                        /* Check for data on kernel pipe */
                        if (pollfd == kernel_poll_pipe[0] && (revents & LPOLLIN)) {
-                               ret = read(kernel_poll_pipe[0], &tmp, 1);
+                               do {
+                                       ret = read(kernel_poll_pipe[0], &tmp, 1);
+                               } while (ret < 0 && errno == EINTR);
+                               /*
+                                * Ret value is useless here, if this pipe gets any actions an
+                                * update is required anyway.
+                                */
                                update_poll_flag = 1;
                                continue;
                        } else {
@@ -779,15 +821,43 @@ exit:
 error:
        lttng_poll_clean(&events);
 error_poll_create:
+error_testpoint:
+       utils_close_pipe(kernel_poll_pipe);
+       kernel_poll_pipe[0] = kernel_poll_pipe[1] = -1;
        if (err) {
                health_error(&health_thread_kernel);
                ERR("Health error occurred in %s", __func__);
+               WARN("Kernel thread died unexpectedly. "
+                               "Kernel tracing can continue but CPU hotplug is disabled.");
        }
-       health_exit(&health_thread_kernel);
+       health_unregister();
        DBG("Kernel thread dying");
        return NULL;
 }
 
+/*
+ * Signal pthread condition of the consumer data that the thread.
+ */
+static void signal_consumer_condition(struct consumer_data *data, int state)
+{
+       pthread_mutex_lock(&data->cond_mutex);
+
+       /*
+        * The state is set before signaling. It can be any value, it's the waiter
+        * job to correctly interpret this condition variable associated to the
+        * consumer pthread_cond.
+        *
+        * A value of 0 means that the corresponding thread of the consumer data
+        * was not started. 1 indicates that the thread has started and is ready
+        * for action. A negative value means that there was an error during the
+        * thread bootstrap.
+        */
+       data->consumer_thread_is_ready = state;
+       (void) pthread_cond_signal(&data->cond);
+
+       pthread_mutex_unlock(&data->cond_mutex);
+}
+
 /*
  * This thread manage the consumer error sent back to the session daemon.
  */
@@ -801,12 +871,26 @@ static void *thread_manage_consumer(void *data)
 
        DBG("[thread] Manage consumer started");
 
-       health_code_update(&consumer_data->health);
+       health_register(HEALTH_TYPE_CONSUMER);
 
-       ret = lttcomm_listen_unix_sock(consumer_data->err_sock);
-       if (ret < 0) {
-               goto error_listen;
-       }
+       /*
+        * Since the consumer thread can be spawned at any moment in time, we init
+        * the health to a poll status (1, which is a valid health over time).
+        * When the thread starts, we update here the health to a "code" path being
+        * an even value so this thread, when reaching a poll wait, does not
+        * trigger an error with an even value.
+        *
+        * Here is the use case we avoid.
+        *
+        * +1: the first poll update during initialization (main())
+        * +2 * x: multiple code update once in this thread.
+        * +1: poll wait in this thread (being a good health state).
+        * == even number which after the wait period shows as a bad health.
+        *
+        * In a nutshell, the following poll update to the health state brings back
+        * the state to an even value meaning a code path.
+        */
+       health_poll_update(&consumer_data->health);
 
        /*
         * Pass 2 as size here for the thread quit pipe and kconsumerd_err_sock.
@@ -817,18 +901,26 @@ static void *thread_manage_consumer(void *data)
                goto error_poll;
        }
 
+       /*
+        * The error socket here is already in a listening state which was done
+        * just before spawning this thread to avoid a race between the consumer
+        * daemon exec trying to connect and the listen() call.
+        */
        ret = lttng_poll_add(&events, consumer_data->err_sock, LPOLLIN | LPOLLRDHUP);
        if (ret < 0) {
                goto error;
        }
 
-       nb_fd = LTTNG_POLL_GETNB(&events);
-
        health_code_update(&consumer_data->health);
 
        /* Inifinite blocking call, waiting for transmission */
 restart:
        health_poll_update(&consumer_data->health);
+
+       if (testpoint(thread_manage_consumer)) {
+               goto error;
+       }
+
        ret = lttng_poll_wait(&events, -1);
        health_poll_update(&consumer_data->health);
        if (ret < 0) {
@@ -841,6 +933,8 @@ restart:
                goto error;
        }
 
+       nb_fd = ret;
+
        for (i = 0; i < nb_fd; i++) {
                /* Fetch once the poll data */
                revents = LTTNG_POLL_GETEV(&events, i);
@@ -869,6 +963,12 @@ restart:
                goto error;
        }
 
+       /*
+        * Set the CLOEXEC flag. Return code is useless because either way, the
+        * show must go on.
+        */
+       (void) utils_set_fd_cloexec(sock);
+
        health_code_update(&consumer_data->health);
 
        DBG2("Receiving code from consumer err_sock");
@@ -882,17 +982,17 @@ restart:
 
        health_code_update(&consumer_data->health);
 
-       if (code == CONSUMERD_COMMAND_SOCK_READY) {
+       if (code == LTTCOMM_CONSUMERD_COMMAND_SOCK_READY) {
                consumer_data->cmd_sock =
                        lttcomm_connect_unix_sock(consumer_data->cmd_unix_sock_path);
                if (consumer_data->cmd_sock < 0) {
-                       sem_post(&consumer_data->sem);
+                       /* On error, signal condition and quit. */
+                       signal_consumer_condition(consumer_data, -1);
                        PERROR("consumer connect");
                        goto error;
                }
-               /* Signal condition to tell that the kconsumerd is ready */
-               sem_post(&consumer_data->sem);
-               DBG("consumer command socket ready");
+               signal_consumer_condition(consumer_data, 1);
+               DBG("Consumer command socket ready");
        } else {
                ERR("consumer error when waiting for SOCK_READY : %s",
                                lttcomm_get_readable_code(-code));
@@ -912,9 +1012,6 @@ restart:
 
        health_code_update(&consumer_data->health);
 
-       /* Update number of fd */
-       nb_fd = LTTNG_POLL_GETNB(&events);
-
        /* Inifinite blocking call, waiting for transmission */
 restart_poll:
        health_poll_update(&consumer_data->health);
@@ -930,6 +1027,8 @@ restart_poll:
                goto error;
        }
 
+       nb_fd = ret;
+
        for (i = 0; i < nb_fd; i++) {
                /* Fetch once the poll data */
                revents = LTTNG_POLL_GETEV(&events, i);
@@ -1003,12 +1102,11 @@ error:
 
        lttng_poll_clean(&events);
 error_poll:
-error_listen:
        if (err) {
                health_error(&consumer_data->health);
                ERR("Health error occurred in %s", __func__);
        }
-       health_exit(&consumer_data->health);
+       health_unregister();
        DBG("consumer thread cleanup completed");
 
        return NULL;
@@ -1029,6 +1127,12 @@ static void *thread_manage_apps(void *data)
        rcu_register_thread();
        rcu_thread_online();
 
+       health_register(HEALTH_TYPE_APP_MANAGE);
+
+       if (testpoint(thread_manage_apps)) {
+               goto error_testpoint;
+       }
+
        health_code_update(&health_thread_app_manage);
 
        ret = create_thread_poll_set(&events, 2);
@@ -1041,15 +1145,14 @@ static void *thread_manage_apps(void *data)
                goto error;
        }
 
+       if (testpoint(thread_manage_apps_before_loop)) {
+               goto error;
+       }
+
        health_code_update(&health_thread_app_manage);
 
        while (1) {
-               /* Zeroed the events structure */
-               lttng_poll_reset(&events);
-
-               nb_fd = LTTNG_POLL_GETNB(&events);
-
-               DBG("Apps thread polling on %d fds", nb_fd);
+               DBG("Apps thread polling on %d fds", LTTNG_POLL_GETNB(&events));
 
                /* Inifinite blocking call, waiting for transmission */
        restart:
@@ -1066,6 +1169,8 @@ static void *thread_manage_apps(void *data)
                        goto error;
                }
 
+               nb_fd = ret;
+
                for (i = 0; i < nb_fd; i++) {
                        /* Fetch once the poll data */
                        revents = LTTNG_POLL_GETEV(&events, i);
@@ -1087,7 +1192,9 @@ static void *thread_manage_apps(void *data)
                                        goto error;
                                } else if (revents & LPOLLIN) {
                                        /* Empty pipe */
-                                       ret = read(apps_cmd_pipe[0], &ust_cmd, sizeof(ust_cmd));
+                                       do {
+                                               ret = read(apps_cmd_pipe[0], &ust_cmd, sizeof(ust_cmd));
+                                       } while (ret < 0 && errno == EINTR);
                                        if (ret < 0 || ret < sizeof(ust_cmd)) {
                                                PERROR("read apps cmd pipe");
                                                goto error;
@@ -1129,16 +1236,22 @@ static void *thread_manage_apps(void *data)
                                                ust_app_unregister(ust_cmd.sock);
                                        } else {
                                                /*
-                                                * We just need here to monitor the close of the UST
-                                                * socket and poll set monitor those by default.
-                                                * Listen on POLLIN (even if we never expect any
-                                                * data) to ensure that hangup wakes us.
+                                                * We only monitor the error events of the socket. This
+                                                * thread does not handle any incoming data from UST
+                                                * (POLLIN).
                                                 */
-                                               ret = lttng_poll_add(&events, ust_cmd.sock, LPOLLIN);
+                                               ret = lttng_poll_add(&events, ust_cmd.sock,
+                                                               LPOLLERR & LPOLLHUP & LPOLLRDHUP);
                                                if (ret < 0) {
                                                        goto error;
                                                }
 
+                                               /* Set socket timeout for both receiving and ending */
+                                               (void) lttcomm_setsockopt_rcv_timeout(ust_cmd.sock,
+                                                               app_socket_timeout);
+                                               (void) lttcomm_setsockopt_snd_timeout(ust_cmd.sock,
+                                                               app_socket_timeout);
+
                                                DBG("Apps with sock %d added to poll set",
                                                                ust_cmd.sock);
                                        }
@@ -1173,11 +1286,21 @@ exit:
 error:
        lttng_poll_clean(&events);
 error_poll_create:
+error_testpoint:
+       utils_close_pipe(apps_cmd_pipe);
+       apps_cmd_pipe[0] = apps_cmd_pipe[1] = -1;
+
+       /*
+        * We don't clean the UST app hash table here since already registered
+        * applications can still be controlled so let them be until the session
+        * daemon dies or the applications stop.
+        */
+
        if (err) {
                health_error(&health_thread_app_manage);
                ERR("Health error occurred in %s", __func__);
        }
-       health_exit(&health_thread_app_manage);
+       health_unregister();
        DBG("Application communication apps thread cleanup complete");
        rcu_thread_offline();
        rcu_unregister_thread();
@@ -1222,18 +1345,28 @@ static void *thread_dispatch_ust_registration(void *data)
                         * call is blocking so we can be assured that the data will be read
                         * at some point in time or wait to the end of the world :)
                         */
-                       ret = write(apps_cmd_pipe[1], ust_cmd,
-                                       sizeof(struct ust_command));
-                       if (ret < 0) {
-                               PERROR("write apps cmd pipe");
-                               if (errno == EBADF) {
-                                       /*
-                                        * We can't inform the application thread to process
-                                        * registration. We will exit or else application
-                                        * registration will not occur and tracing will never
-                                        * start.
-                                        */
-                                       goto error;
+                       if (apps_cmd_pipe[1] >= 0) {
+                               do {
+                                       ret = write(apps_cmd_pipe[1], ust_cmd,
+                                                       sizeof(struct ust_command));
+                               } while (ret < 0 && errno == EINTR);
+                               if (ret < 0 || ret != sizeof(struct ust_command)) {
+                                       PERROR("write apps cmd pipe");
+                                       if (errno == EBADF) {
+                                               /*
+                                                * We can't inform the application thread to process
+                                                * registration. We will exit or else application
+                                                * registration will not occur and tracing will never
+                                                * start.
+                                                */
+                                               goto error;
+                                       }
+                               }
+                       } else {
+                               /* Application manager thread is not available. */
+                               ret = close(ust_cmd->sock);
+                               if (ret < 0) {
+                                       PERROR("close ust_cmd sock");
                                }
                        }
                        free(ust_cmd);
@@ -1264,6 +1397,12 @@ static void *thread_registration_apps(void *data)
 
        DBG("[thread] Manage application registration started");
 
+       health_register(HEALTH_TYPE_APP_REG);
+
+       if (testpoint(thread_registration_apps)) {
+               goto error_testpoint;
+       }
+
        ret = lttcomm_listen_unix_sock(apps_sock);
        if (ret < 0) {
                goto error_listen;
@@ -1295,8 +1434,6 @@ static void *thread_registration_apps(void *data)
        while (1) {
                DBG("Accepting application registration");
 
-               nb_fd = LTTNG_POLL_GETNB(&events);
-
                /* Inifinite blocking call, waiting for transmission */
        restart:
                health_poll_update(&health_thread_app_reg);
@@ -1312,6 +1449,8 @@ static void *thread_registration_apps(void *data)
                        goto error;
                }
 
+               nb_fd = ret;
+
                for (i = 0; i < nb_fd; i++) {
                        health_code_update(&health_thread_app_reg);
 
@@ -1337,6 +1476,12 @@ static void *thread_registration_apps(void *data)
                                                goto error;
                                        }
 
+                                       /*
+                                        * Set the CLOEXEC flag. Return code is useless because
+                                        * either way, the show must go on.
+                                        */
+                                       (void) utils_set_fd_cloexec(sock);
+
                                        /* Create UST registration command for enqueuing */
                                        ust_cmd = zmalloc(sizeof(struct ust_command));
                                        if (ust_cmd == NULL) {
@@ -1411,7 +1556,6 @@ error:
                health_error(&health_thread_app_reg);
                ERR("Health error occurred in %s", __func__);
        }
-       health_exit(&health_thread_app_reg);
 
        /* Notify that the registration thread is gone */
        notify_ust_apps(0);
@@ -1435,7 +1579,9 @@ error_poll_add:
        lttng_poll_clean(&events);
 error_listen:
 error_create_poll:
+error_testpoint:
        DBG("UST Registration thread cleanup complete");
+       health_unregister();
 
        return NULL;
 }
@@ -1446,59 +1592,110 @@ error_create_poll:
  */
 static int spawn_consumer_thread(struct consumer_data *consumer_data)
 {
-       int ret;
+       int ret, clock_ret;
        struct timespec timeout;
 
-       timeout.tv_sec = DEFAULT_SEM_WAIT_TIMEOUT;
-       timeout.tv_nsec = 0;
+       /* Make sure we set the readiness flag to 0 because we are NOT ready */
+       consumer_data->consumer_thread_is_ready = 0;
 
-       /* Setup semaphore */
-       ret = sem_init(&consumer_data->sem, 0, 0);
-       if (ret < 0) {
-               PERROR("sem_init consumer semaphore");
+       /* Setup pthread condition */
+       ret = pthread_condattr_init(&consumer_data->condattr);
+       if (ret != 0) {
+               errno = ret;
+               PERROR("pthread_condattr_init consumer data");
                goto error;
        }
 
-       ret = pthread_create(&consumer_data->thread, NULL,
-                       thread_manage_consumer, consumer_data);
+       /*
+        * Set the monotonic clock in order to make sure we DO NOT jump in time
+        * between the clock_gettime() call and the timedwait call. See bug #324
+        * for a more details and how we noticed it.
+        */
+       ret = pthread_condattr_setclock(&consumer_data->condattr, CLOCK_MONOTONIC);
+       if (ret != 0) {
+               errno = ret;
+               PERROR("pthread_condattr_setclock consumer data");
+               goto error;
+       }
+
+       ret = pthread_cond_init(&consumer_data->cond, &consumer_data->condattr);
+       if (ret != 0) {
+               errno = ret;
+               PERROR("pthread_cond_init consumer data");
+               goto error;
+       }
+
+       ret = pthread_create(&consumer_data->thread, NULL, thread_manage_consumer,
+                       consumer_data);
        if (ret != 0) {
                PERROR("pthread_create consumer");
                ret = -1;
                goto error;
        }
 
+       /* We are about to wait on a pthread condition */
+       pthread_mutex_lock(&consumer_data->cond_mutex);
+
        /* Get time for sem_timedwait absolute timeout */
-       ret = clock_gettime(CLOCK_REALTIME, &timeout);
-       if (ret < 0) {
-               PERROR("clock_gettime spawn consumer");
-               /* Infinite wait for the kconsumerd thread to be ready */
-               ret = sem_wait(&consumer_data->sem);
-       } else {
-               /* Normal timeout if the gettime was successful */
-               timeout.tv_sec += DEFAULT_SEM_WAIT_TIMEOUT;
-               ret = sem_timedwait(&consumer_data->sem, &timeout);
+       clock_ret = clock_gettime(CLOCK_MONOTONIC, &timeout);
+       /*
+        * Set the timeout for the condition timed wait even if the clock gettime
+        * call fails since we might loop on that call and we want to avoid to
+        * increment the timeout too many times.
+        */
+       timeout.tv_sec += DEFAULT_SEM_WAIT_TIMEOUT;
+
+       /*
+        * The following loop COULD be skipped in some conditions so this is why we
+        * set ret to 0 in order to make sure at least one round of the loop is
+        * done.
+        */
+       ret = 0;
+
+       /*
+        * Loop until the condition is reached or when a timeout is reached. Note
+        * that the pthread_cond_timedwait(P) man page specifies that EINTR can NOT
+        * be returned but the pthread_cond(3), from the glibc-doc, says that it is
+        * possible. This loop does not take any chances and works with both of
+        * them.
+        */
+       while (!consumer_data->consumer_thread_is_ready && ret != ETIMEDOUT) {
+               if (clock_ret < 0) {
+                       PERROR("clock_gettime spawn consumer");
+                       /* Infinite wait for the consumerd thread to be ready */
+                       ret = pthread_cond_wait(&consumer_data->cond,
+                                       &consumer_data->cond_mutex);
+               } else {
+                       ret = pthread_cond_timedwait(&consumer_data->cond,
+                                       &consumer_data->cond_mutex, &timeout);
+               }
        }
 
-       if (ret < 0) {
-               if (errno == ETIMEDOUT) {
+       /* Release the pthread condition */
+       pthread_mutex_unlock(&consumer_data->cond_mutex);
+
+       if (ret != 0) {
+               errno = ret;
+               if (ret == ETIMEDOUT) {
                        /*
                         * Call has timed out so we kill the kconsumerd_thread and return
                         * an error.
                         */
-                       ERR("The consumer thread was never ready. Killing it");
+                       ERR("Condition timed out. The consumer thread was never ready."
+                                       " Killing it");
                        ret = pthread_cancel(consumer_data->thread);
                        if (ret < 0) {
                                PERROR("pthread_cancel consumer thread");
                        }
                } else {
-                       PERROR("semaphore wait failed consumer thread");
+                       PERROR("pthread_cond_wait failed consumer thread");
                }
                goto error;
        }
 
        pthread_mutex_lock(&consumer_data->pid_mutex);
        if (consumer_data->pid == 0) {
-               ERR("Kconsumerd did not start");
+               ERR("Consumerd did not start");
                pthread_mutex_unlock(&consumer_data->pid_mutex);
                goto error;
        }
@@ -1516,10 +1713,10 @@ error:
 static int join_consumer_thread(struct consumer_data *consumer_data)
 {
        void *status;
-       int ret;
 
        /* Consumer pid must be a real one. */
        if (consumer_data->pid > 0) {
+               int ret;
                ret = kill(consumer_data->pid, SIGTERM);
                if (ret) {
                        ERR("Error killing consumer daemon");
@@ -1700,6 +1897,16 @@ static int start_consumerd(struct consumer_data *consumer_data)
 {
        int ret;
 
+       /*
+        * Set the listen() state on the socket since there is a possible race
+        * between the exec() of the consumer daemon and this call if place in the
+        * consumer thread. See bug #366 for more details.
+        */
+       ret = lttcomm_listen_unix_sock(consumer_data->err_sock);
+       if (ret < 0) {
+               goto error;
+       }
+
        pthread_mutex_lock(&consumer_data->pid_mutex);
        if (consumer_data->pid != 0) {
                pthread_mutex_unlock(&consumer_data->pid_mutex);
@@ -1729,6 +1936,15 @@ end:
        return 0;
 
 error:
+       /* Cleanup already created socket on error. */
+       if (consumer_data->err_sock >= 0) {
+               int err;
+
+               err = close(consumer_data->err_sock);
+               if (err < 0) {
+                       PERROR("close consumer data error socket");
+               }
+       }
        return ret;
 }
 
@@ -1740,9 +1956,7 @@ static int check_consumer_health(void)
 {
        int ret;
 
-       ret = health_check_state(&kconsumer_data.health) &&
-               health_check_state(&ustconsumer32_data.health) &&
-               health_check_state(&ustconsumer64_data.health);
+       ret = health_check_state(HEALTH_TYPE_CONSUMER);
 
        DBG3("Health consumer check %d", ret);
 
@@ -1791,7 +2005,7 @@ error_version:
                PERROR("close");
        }
        kernel_tracer_fd = -1;
-       return LTTCOMM_KERN_VERSION;
+       return LTTNG_ERR_KERN_VERSION;
 
 error_modules:
        ret = close(kernel_tracer_fd);
@@ -1806,9 +2020,9 @@ error:
        WARN("No kernel tracer available");
        kernel_tracer_fd = -1;
        if (!is_root) {
-               return LTTCOMM_NEED_ROOT_SESSIOND;
+               return LTTNG_ERR_NEED_ROOT_SESSIOND;
        } else {
-               return LTTCOMM_KERN_NA;
+               return LTTNG_ERR_KERN_NA;
        }
 }
 
@@ -1830,6 +2044,15 @@ static int copy_session_consumer(int domain, struct ltt_session *session)
        switch (domain) {
        case LTTNG_DOMAIN_KERNEL:
                DBG3("Copying tracing session consumer output in kernel session");
+               /*
+                * XXX: We should audit the session creation and what this function
+                * does "extra" in order to avoid a destroy since this function is used
+                * in the domain session creation (kernel and ust) only. Same for UST
+                * domain.
+                */
+               if (session->kernel_session->consumer) {
+                       consumer_destroy_output(session->kernel_session->consumer);
+               }
                session->kernel_session->consumer =
                        consumer_copy_output(session->consumer);
                /* Ease our life a bit for the next part */
@@ -1838,6 +2061,9 @@ static int copy_session_consumer(int domain, struct ltt_session *session)
                break;
        case LTTNG_DOMAIN_UST:
                DBG3("Copying tracing session consumer output in UST session");
+               if (session->ust_session->consumer) {
+                       consumer_destroy_output(session->ust_session->consumer);
+               }
                session->ust_session->consumer =
                        consumer_copy_output(session->consumer);
                /* Ease our life a bit for the next part */
@@ -1845,21 +2071,16 @@ static int copy_session_consumer(int domain, struct ltt_session *session)
                dir_name = DEFAULT_UST_TRACE_DIR;
                break;
        default:
-               ret = LTTCOMM_UNKNOWN_DOMAIN;
-               goto error;
-       }
-
-       ret = consumer_set_subdir(session->consumer, session->name);
-       if (ret < 0) {
-               ret = LTTCOMM_FATAL;
+               ret = LTTNG_ERR_UNKNOWN_DOMAIN;
                goto error;
        }
 
        /* Append correct directory to subdir */
-       strncat(consumer->subdir, dir_name, sizeof(consumer->subdir));
+       strncat(consumer->subdir, dir_name,
+                       sizeof(consumer->subdir) - strlen(consumer->subdir) - 1);
        DBG3("Copy session consumer subdir %s", consumer->subdir);
 
-       ret = LTTCOMM_OK;
+       ret = LTTNG_OK;
 
 error:
        return ret;
@@ -1883,15 +2104,15 @@ static int create_ust_session(struct ltt_session *session,
                break;
        default:
                ERR("Unknown UST domain on create session %d", domain->type);
-               ret = LTTCOMM_UNKNOWN_DOMAIN;
+               ret = LTTNG_ERR_UNKNOWN_DOMAIN;
                goto error;
        }
 
        DBG("Creating UST session");
 
-       lus = trace_ust_create_session(session->path, session->id, domain);
+       lus = trace_ust_create_session(session->path, session->id);
        if (lus == NULL) {
-               ret = LTTCOMM_UST_SESS_FAIL;
+               ret = LTTNG_ERR_UST_SESS_FAIL;
                goto error;
        }
 
@@ -1901,11 +2122,11 @@ static int create_ust_session(struct ltt_session *session,
 
        /* Copy session output to the newly created UST session */
        ret = copy_session_consumer(domain->type, session);
-       if (ret != LTTCOMM_OK) {
+       if (ret != LTTNG_OK) {
                goto error;
        }
 
-       return LTTCOMM_OK;
+       return LTTNG_OK;
 
 error:
        free(lus);
@@ -1924,7 +2145,7 @@ static int create_kernel_session(struct ltt_session *session)
 
        ret = kernel_create_session(session, kernel_tracer_fd);
        if (ret < 0) {
-               ret = LTTCOMM_KERN_SESS_FAIL;
+               ret = LTTNG_ERR_KERN_SESS_FAIL;
                goto error;
        }
 
@@ -1933,7 +2154,7 @@ static int create_kernel_session(struct ltt_session *session)
 
        /* Copy session output to the newly created Kernel session */
        ret = copy_session_consumer(LTTNG_DOMAIN_KERNEL, session);
-       if (ret != LTTCOMM_OK) {
+       if (ret != LTTNG_OK) {
                goto error;
        }
 
@@ -1954,7 +2175,7 @@ static int create_kernel_session(struct ltt_session *session)
        session->kernel_session->uid = session->uid;
        session->kernel_session->gid = session->gid;
 
-       return LTTCOMM_OK;
+       return LTTNG_OK;
 
 error:
        trace_kernel_destroy_session(session->kernel_session);
@@ -1996,7 +2217,7 @@ static unsigned int lttng_sessions_count(uid_t uid, gid_t gid)
 static int process_client_msg(struct command_ctx *cmd_ctx, int sock,
                int *sock_error)
 {
-       int ret = LTTCOMM_OK;
+       int ret = LTTNG_OK;
        int need_tracing_session = 1;
        int need_domain;
 
@@ -2011,6 +2232,7 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock,
        case LTTNG_LIST_DOMAINS:
        case LTTNG_START_TRACE:
        case LTTNG_STOP_TRACE:
+       case LTTNG_DATA_PENDING:
                need_domain = 0;
                break;
        default:
@@ -2020,9 +2242,9 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock,
        if (opt_no_kernel && need_domain
                        && cmd_ctx->lsm->domain.type == LTTNG_DOMAIN_KERNEL) {
                if (!is_root) {
-                       ret = LTTCOMM_NEED_ROOT_SESSIOND;
+                       ret = LTTNG_ERR_NEED_ROOT_SESSIOND;
                } else {
-                       ret = LTTCOMM_KERN_NA;
+                       ret = LTTNG_ERR_KERN_NA;
                }
                goto error;
        }
@@ -2031,7 +2253,8 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock,
        if (cmd_ctx->lsm->cmd_type == LTTNG_REGISTER_CONSUMER) {
                pthread_mutex_lock(&kconsumer_data.pid_mutex);
                if (kconsumer_data.pid > 0) {
-                       ret = LTTCOMM_KERN_CONSUMER_FAIL;
+                       ret = LTTNG_ERR_KERN_CONSUMER_FAIL;
+                       pthread_mutex_unlock(&kconsumer_data.pid_mutex);
                        goto error;
                }
                pthread_mutex_unlock(&kconsumer_data.pid_mutex);
@@ -2079,10 +2302,10 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock,
                cmd_ctx->session = session_find_by_name(cmd_ctx->lsm->session.name);
                if (cmd_ctx->session == NULL) {
                        if (cmd_ctx->lsm->session.name != NULL) {
-                               ret = LTTCOMM_SESS_NOT_FOUND;
+                               ret = LTTNG_ERR_SESS_NOT_FOUND;
                        } else {
                                /* If no session name specified */
-                               ret = LTTCOMM_SELECT_SESS;
+                               ret = LTTNG_ERR_SELECT_SESS;
                        }
                        goto error;
                } else {
@@ -2102,7 +2325,7 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock,
        switch (cmd_ctx->lsm->domain.type) {
        case LTTNG_DOMAIN_KERNEL:
                if (!is_root) {
-                       ret = LTTCOMM_NEED_ROOT_SESSIOND;
+                       ret = LTTNG_ERR_NEED_ROOT_SESSIOND;
                        goto error;
                }
 
@@ -2117,7 +2340,7 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock,
 
                /* Consumer is in an ERROR state. Report back to client */
                if (uatomic_read(&kernel_consumerd_state) == CONSUMER_ERROR) {
-                       ret = LTTCOMM_NO_KERNCONSUMERD;
+                       ret = LTTNG_ERR_NO_KERNCONSUMERD;
                        goto error;
                }
 
@@ -2126,7 +2349,7 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock,
                        if (cmd_ctx->session->kernel_session == NULL) {
                                ret = create_kernel_session(cmd_ctx->session);
                                if (ret < 0) {
-                                       ret = LTTCOMM_KERN_SESS_FAIL;
+                                       ret = LTTNG_ERR_KERN_SESS_FAIL;
                                        goto error;
                                }
                        }
@@ -2139,7 +2362,7 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock,
                                pthread_mutex_unlock(&kconsumer_data.pid_mutex);
                                ret = start_consumerd(&kconsumer_data);
                                if (ret < 0) {
-                                       ret = LTTCOMM_KERN_CONSUMER_FAIL;
+                                       ret = LTTNG_ERR_KERN_CONSUMER_FAIL;
                                        goto error;
                                }
                                uatomic_set(&kernel_consumerd_state, CONSUMER_STARTED);
@@ -2163,7 +2386,7 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock,
        {
                /* Consumer is in an ERROR state. Report back to client */
                if (uatomic_read(&ust_consumerd_state) == CONSUMER_ERROR) {
-                       ret = LTTCOMM_NO_USTCONSUMERD;
+                       ret = LTTNG_ERR_NO_USTCONSUMERD;
                        goto error;
                }
 
@@ -2172,7 +2395,7 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock,
                        if (cmd_ctx->session->ust_session == NULL) {
                                ret = create_ust_session(cmd_ctx->session,
                                                &cmd_ctx->lsm->domain);
-                               if (ret != LTTCOMM_OK) {
+                               if (ret != LTTNG_OK) {
                                        goto error;
                                }
                        }
@@ -2187,7 +2410,7 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock,
                                pthread_mutex_unlock(&ustconsumer64_data.pid_mutex);
                                ret = start_consumerd(&ustconsumer64_data);
                                if (ret < 0) {
-                                       ret = LTTCOMM_UST_CONSUMER64_FAIL;
+                                       ret = LTTNG_ERR_UST_CONSUMER64_FAIL;
                                        uatomic_set(&ust_consumerd64_fd, -EINVAL);
                                        goto error;
                                }
@@ -2216,7 +2439,7 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock,
                                pthread_mutex_unlock(&ustconsumer32_data.pid_mutex);
                                ret = start_consumerd(&ustconsumer32_data);
                                if (ret < 0) {
-                                       ret = LTTCOMM_UST_CONSUMER32_FAIL;
+                                       ret = LTTNG_ERR_UST_CONSUMER32_FAIL;
                                        uatomic_set(&ust_consumerd32_fd, -EINVAL);
                                        goto error;
                                }
@@ -2250,13 +2473,13 @@ skip_domain:
                switch (cmd_ctx->lsm->domain.type) {
                case LTTNG_DOMAIN_UST:
                        if (uatomic_read(&ust_consumerd_state) != CONSUMER_STARTED) {
-                               ret = LTTCOMM_NO_USTCONSUMERD;
+                               ret = LTTNG_ERR_NO_USTCONSUMERD;
                                goto error;
                        }
                        break;
                case LTTNG_DOMAIN_KERNEL:
                        if (uatomic_read(&kernel_consumerd_state) != CONSUMER_STARTED) {
-                               ret = LTTCOMM_NO_KERNCONSUMERD;
+                               ret = LTTNG_ERR_NO_KERNCONSUMERD;
                                goto error;
                        }
                        break;
@@ -2271,7 +2494,7 @@ skip_domain:
                if (!session_access_ok(cmd_ctx->session,
                                LTTNG_SOCK_GET_UID_CRED(&cmd_ctx->creds),
                                LTTNG_SOCK_GET_GID_CRED(&cmd_ctx->creds))) {
-                       ret = LTTCOMM_EPERM;
+                       ret = LTTNG_ERR_EPERM;
                        goto error;
                }
        }
@@ -2282,8 +2505,7 @@ skip_domain:
        {
                ret = cmd_add_context(cmd_ctx->session, cmd_ctx->lsm->domain.type,
                                cmd_ctx->lsm->u.context.channel_name,
-                               cmd_ctx->lsm->u.context.event_name,
-                               &cmd_ctx->lsm->u.context.ctx);
+                               &cmd_ctx->lsm->u.context.ctx, kernel_poll_pipe[1]);
                break;
        }
        case LTTNG_DISABLE_CHANNEL:
@@ -2325,7 +2547,7 @@ skip_domain:
                 * be a DOMAIN enuam.
                 */
                ret = cmd_enable_consumer(cmd_ctx->lsm->domain.type, cmd_ctx->session);
-               if (ret != LTTCOMM_OK) {
+               if (ret != LTTNG_OK) {
                        goto error;
                }
 
@@ -2346,7 +2568,7 @@ skip_domain:
        {
                ret = cmd_enable_event(cmd_ctx->session, cmd_ctx->lsm->domain.type,
                                cmd_ctx->lsm->u.enable.channel_name,
-                               &cmd_ctx->lsm->u.enable.event, kernel_poll_pipe[1]);
+                               &cmd_ctx->lsm->u.enable.event, NULL, kernel_poll_pipe[1]);
                break;
        }
        case LTTNG_ENABLE_ALL_EVENT:
@@ -2355,7 +2577,7 @@ skip_domain:
 
                ret = cmd_enable_event_all(cmd_ctx->session, cmd_ctx->lsm->domain.type,
                                cmd_ctx->lsm->u.enable.channel_name,
-                               cmd_ctx->lsm->u.enable.event.type, kernel_poll_pipe[1]);
+                               cmd_ctx->lsm->u.enable.event.type, NULL, kernel_poll_pipe[1]);
                break;
        }
        case LTTNG_LIST_TRACEPOINTS:
@@ -2365,6 +2587,7 @@ skip_domain:
 
                nb_events = cmd_list_tracepoints(cmd_ctx->lsm->domain.type, &events);
                if (nb_events < 0) {
+                       /* Return value is a negative lttng_error_code. */
                        ret = -nb_events;
                        goto error;
                }
@@ -2385,7 +2608,7 @@ skip_domain:
 
                free(events);
 
-               ret = LTTCOMM_OK;
+               ret = LTTNG_OK;
                break;
        }
        case LTTNG_LIST_TRACEPOINT_FIELDS:
@@ -2396,6 +2619,7 @@ skip_domain:
                nb_fields = cmd_list_tracepoint_fields(cmd_ctx->lsm->domain.type,
                                &fields);
                if (nb_fields < 0) {
+                       /* Return value is a negative lttng_error_code. */
                        ret = -nb_fields;
                        goto error;
                }
@@ -2417,7 +2641,7 @@ skip_domain:
 
                free(fields);
 
-               ret = LTTCOMM_OK;
+               ret = LTTNG_OK;
                break;
        }
        case LTTNG_SET_CONSUMER_URI:
@@ -2429,13 +2653,13 @@ skip_domain:
                len = nb_uri * sizeof(struct lttng_uri);
 
                if (nb_uri == 0) {
-                       ret = LTTCOMM_INVALID;
+                       ret = LTTNG_ERR_INVALID;
                        goto error;
                }
 
                uris = zmalloc(len);
                if (uris == NULL) {
-                       ret = LTTCOMM_FATAL;
+                       ret = LTTNG_ERR_FATAL;
                        goto error;
                }
 
@@ -2445,13 +2669,15 @@ skip_domain:
                if (ret <= 0) {
                        DBG("No URIs received from client... continuing");
                        *sock_error = 1;
-                       ret = LTTCOMM_SESSION_FAIL;
+                       ret = LTTNG_ERR_SESSION_FAIL;
+                       free(uris);
                        goto error;
                }
 
                ret = cmd_set_consumer_uri(cmd_ctx->lsm->domain.type, cmd_ctx->session,
                                nb_uri, uris);
-               if (ret != LTTCOMM_OK) {
+               if (ret != LTTNG_OK) {
+                       free(uris);
                        goto error;
                }
 
@@ -2472,6 +2698,8 @@ skip_domain:
                        }
                }
 
+               free(uris);
+
                break;
        }
        case LTTNG_START_TRACE:
@@ -2495,7 +2723,7 @@ skip_domain:
                if (nb_uri > 0) {
                        uris = zmalloc(len);
                        if (uris == NULL) {
-                               ret = LTTCOMM_FATAL;
+                               ret = LTTNG_ERR_FATAL;
                                goto error;
                        }
 
@@ -2505,13 +2733,15 @@ skip_domain:
                        if (ret <= 0) {
                                DBG("No URIs received from client... continuing");
                                *sock_error = 1;
-                               ret = LTTCOMM_SESSION_FAIL;
+                               ret = LTTNG_ERR_SESSION_FAIL;
+                               free(uris);
                                goto error;
                        }
 
                        if (nb_uri == 1 && uris[0].dtype != LTTNG_DST_PATH) {
                                DBG("Creating session with ONE network URI is a bad call");
-                               ret = LTTCOMM_SESSION_FAIL;
+                               ret = LTTNG_ERR_SESSION_FAIL;
+                               free(uris);
                                goto error;
                        }
                }
@@ -2519,6 +2749,8 @@ skip_domain:
                ret = cmd_create_session_uri(cmd_ctx->lsm->session.name, uris, nb_uri,
                        &cmd_ctx->creds);
 
+               free(uris);
+
                break;
        }
        case LTTNG_DESTROY_SESSION:
@@ -2536,6 +2768,7 @@ skip_domain:
 
                nb_dom = cmd_list_domains(cmd_ctx->session, &domains);
                if (nb_dom < 0) {
+                       /* Return value is a negative lttng_error_code. */
                        ret = -nb_dom;
                        goto error;
                }
@@ -2551,7 +2784,7 @@ skip_domain:
 
                free(domains);
 
-               ret = LTTCOMM_OK;
+               ret = LTTNG_OK;
                break;
        }
        case LTTNG_LIST_CHANNELS:
@@ -2562,6 +2795,7 @@ skip_domain:
                nb_chan = cmd_list_channels(cmd_ctx->lsm->domain.type,
                                cmd_ctx->session, &channels);
                if (nb_chan < 0) {
+                       /* Return value is a negative lttng_error_code. */
                        ret = -nb_chan;
                        goto error;
                }
@@ -2577,7 +2811,7 @@ skip_domain:
 
                free(channels);
 
-               ret = LTTCOMM_OK;
+               ret = LTTNG_OK;
                break;
        }
        case LTTNG_LIST_EVENTS:
@@ -2588,6 +2822,7 @@ skip_domain:
                nb_event = cmd_list_events(cmd_ctx->lsm->domain.type, cmd_ctx->session,
                                cmd_ctx->lsm->u.list.channel_name, &events);
                if (nb_event < 0) {
+                       /* Return value is a negative lttng_error_code. */
                        ret = -nb_event;
                        goto error;
                }
@@ -2603,7 +2838,7 @@ skip_domain:
 
                free(events);
 
-               ret = LTTCOMM_OK;
+               ret = LTTNG_OK;
                break;
        }
        case LTTNG_LIST_SESSIONS:
@@ -2628,7 +2863,7 @@ skip_domain:
 
                session_unlock_list();
 
-               ret = LTTCOMM_OK;
+               ret = LTTNG_OK;
                break;
        }
        case LTTNG_CALIBRATE:
@@ -2646,7 +2881,7 @@ skip_domain:
                        cdata = &kconsumer_data;
                        break;
                default:
-                       ret = LTTCOMM_UND;
+                       ret = LTTNG_ERR_UND;
                        goto error;
                }
 
@@ -2654,45 +2889,53 @@ skip_domain:
                                cmd_ctx->lsm->u.reg.path, cdata);
                break;
        }
-       case LTTNG_SET_FILTER:
+       case LTTNG_ENABLE_EVENT_WITH_FILTER:
        {
                struct lttng_filter_bytecode *bytecode;
 
-               if (cmd_ctx->lsm->u.filter.bytecode_len > 65336) {
-                       ret = LTTCOMM_FILTER_INVAL;
+               if (cmd_ctx->lsm->u.enable.bytecode_len > LTTNG_FILTER_MAX_LEN) {
+                       ret = LTTNG_ERR_FILTER_INVAL;
+                       goto error;
+               }
+               if (cmd_ctx->lsm->u.enable.bytecode_len == 0) {
+                       ret = LTTNG_ERR_FILTER_INVAL;
                        goto error;
                }
-               bytecode = zmalloc(cmd_ctx->lsm->u.filter.bytecode_len);
+               bytecode = zmalloc(cmd_ctx->lsm->u.enable.bytecode_len);
                if (!bytecode) {
-                       ret = LTTCOMM_FILTER_NOMEM;
+                       ret = LTTNG_ERR_FILTER_NOMEM;
                        goto error;
                }
                /* Receive var. len. data */
                DBG("Receiving var len data from client ...");
                ret = lttcomm_recv_unix_sock(sock, bytecode,
-                               cmd_ctx->lsm->u.filter.bytecode_len);
+                               cmd_ctx->lsm->u.enable.bytecode_len);
                if (ret <= 0) {
                        DBG("Nothing recv() from client var len data... continuing");
                        *sock_error = 1;
-                       ret = LTTCOMM_FILTER_INVAL;
+                       ret = LTTNG_ERR_FILTER_INVAL;
                        goto error;
                }
 
                if (bytecode->len + sizeof(*bytecode)
-                               != cmd_ctx->lsm->u.filter.bytecode_len) {
+                               != cmd_ctx->lsm->u.enable.bytecode_len) {
                        free(bytecode);
-                       ret = LTTCOMM_FILTER_INVAL;
+                       ret = LTTNG_ERR_FILTER_INVAL;
                        goto error;
                }
 
-               ret = cmd_set_filter(cmd_ctx->session, cmd_ctx->lsm->domain.type,
-                               cmd_ctx->lsm->u.filter.channel_name,
-                               cmd_ctx->lsm->u.filter.event_name,
-                               bytecode);
+               ret = cmd_enable_event(cmd_ctx->session, cmd_ctx->lsm->domain.type,
+                               cmd_ctx->lsm->u.enable.channel_name,
+                               &cmd_ctx->lsm->u.enable.event, bytecode, kernel_poll_pipe[1]);
+               break;
+       }
+       case LTTNG_DATA_PENDING:
+       {
+               ret = cmd_data_pending(cmd_ctx->session);
                break;
        }
        default:
-               ret = LTTCOMM_UND;
+               ret = LTTNG_ERR_UND;
                break;
        }
 
@@ -2739,6 +2982,12 @@ static void *thread_manage_health(void *data)
                goto error;
        }
 
+       /*
+        * Set the CLOEXEC flag. Return code is useless because either way, the
+        * show must go on.
+        */
+       (void) utils_set_fd_cloexec(sock);
+
        ret = lttcomm_listen_unix_sock(sock);
        if (ret < 0) {
                goto error;
@@ -2762,8 +3011,6 @@ static void *thread_manage_health(void *data)
        while (1) {
                DBG("Health check ready");
 
-               nb_fd = LTTNG_POLL_GETNB(&events);
-
                /* Inifinite blocking call, waiting for transmission */
 restart:
                ret = lttng_poll_wait(&events, -1);
@@ -2777,6 +3024,8 @@ restart:
                        goto error;
                }
 
+               nb_fd = ret;
+
                for (i = 0; i < nb_fd; i++) {
                        /* Fetch once the poll data */
                        revents = LTTNG_POLL_GETEV(&events, i);
@@ -2803,6 +3052,12 @@ restart:
                        goto error;
                }
 
+               /*
+                * Set the CLOEXEC flag. Return code is useless because either way, the
+                * show must go on.
+                */
+               (void) utils_set_fd_cloexec(new_sock);
+
                DBG("Receiving data from client for health...");
                ret = lttcomm_recv_unix_sock(new_sock, (void *)&msg, sizeof(msg));
                if (ret <= 0) {
@@ -2819,30 +3074,30 @@ restart:
 
                switch (msg.component) {
                case LTTNG_HEALTH_CMD:
-                       reply.ret_code = health_check_state(&health_thread_cmd);
+                       reply.ret_code = health_check_state(HEALTH_TYPE_CMD);
                        break;
                case LTTNG_HEALTH_APP_MANAGE:
-                       reply.ret_code = health_check_state(&health_thread_app_manage);
+                       reply.ret_code = health_check_state(HEALTH_TYPE_APP_MANAGE);
                        break;
                case LTTNG_HEALTH_APP_REG:
-                       reply.ret_code = health_check_state(&health_thread_app_reg);
+                       reply.ret_code = health_check_state(HEALTH_TYPE_APP_REG);
                        break;
                case LTTNG_HEALTH_KERNEL:
-                       reply.ret_code = health_check_state(&health_thread_kernel);
+                       reply.ret_code = health_check_state(HEALTH_TYPE_KERNEL);
                        break;
                case LTTNG_HEALTH_CONSUMER:
                        reply.ret_code = check_consumer_health();
                        break;
                case LTTNG_HEALTH_ALL:
                        reply.ret_code =
-                               health_check_state(&health_thread_app_manage) &&
-                               health_check_state(&health_thread_app_reg) &&
-                               health_check_state(&health_thread_cmd) &&
-                               health_check_state(&health_thread_kernel) &&
+                               health_check_state(HEALTH_TYPE_APP_MANAGE) &&
+                               health_check_state(HEALTH_TYPE_APP_REG) &&
+                               health_check_state(HEALTH_TYPE_CMD) &&
+                               health_check_state(HEALTH_TYPE_KERNEL) &&
                                check_consumer_health();
                        break;
                default:
-                       reply.ret_code = LTTCOMM_UND;
+                       reply.ret_code = LTTNG_ERR_UND;
                        break;
                }
 
@@ -2913,11 +3168,17 @@ static void *thread_manage_clients(void *data)
 
        rcu_register_thread();
 
+       health_register(HEALTH_TYPE_CMD);
+
+       if (testpoint(thread_manage_clients)) {
+               goto error_testpoint;
+       }
+
        health_code_update(&health_thread_cmd);
 
        ret = lttcomm_listen_unix_sock(client_sock);
        if (ret < 0) {
-               goto error;
+               goto error_listen;
        }
 
        /*
@@ -2926,7 +3187,7 @@ static void *thread_manage_clients(void *data)
         */
        ret = create_thread_poll_set(&events, 2);
        if (ret < 0) {
-               goto error;
+               goto error_create_poll;
        }
 
        /* Add the application registration socket */
@@ -2942,13 +3203,15 @@ static void *thread_manage_clients(void *data)
                kill(ppid, SIGUSR1);
        }
 
+       if (testpoint(thread_manage_clients_before_loop)) {
+               goto error;
+       }
+
        health_code_update(&health_thread_cmd);
 
        while (1) {
                DBG("Accepting client command ...");
 
-               nb_fd = LTTNG_POLL_GETNB(&events);
-
                /* Inifinite blocking call, waiting for transmission */
        restart:
                health_poll_update(&health_thread_cmd);
@@ -2964,6 +3227,8 @@ static void *thread_manage_clients(void *data)
                        goto error;
                }
 
+               nb_fd = ret;
+
                for (i = 0; i < nb_fd; i++) {
                        /* Fetch once the poll data */
                        revents = LTTNG_POLL_GETEV(&events, i);
@@ -2996,6 +3261,12 @@ static void *thread_manage_clients(void *data)
                        goto error;
                }
 
+               /*
+                * Set the CLOEXEC flag. Return code is useless because either way, the
+                * show must go on.
+                */
+               (void) utils_set_fd_cloexec(sock);
+
                /* Set socket option for credentials retrieval */
                ret = lttcomm_setsockopt_creds_unix_sock(sock);
                if (ret < 0) {
@@ -3097,13 +3368,19 @@ static void *thread_manage_clients(void *data)
 
 exit:
 error:
-       if (err) {
-               health_error(&health_thread_cmd);
-               ERR("Health error occurred in %s", __func__);
+       if (sock >= 0) {
+               ret = close(sock);
+               if (ret) {
+                       PERROR("close");
+               }
        }
-       health_exit(&health_thread_cmd);
 
-       DBG("Client thread dying");
+       lttng_poll_clean(&events);
+       clean_command_ctx(&cmd_ctx);
+
+error_listen:
+error_create_poll:
+error_testpoint:
        unlink(client_unix_sock_path);
        if (client_sock >= 0) {
                ret = close(client_sock);
@@ -3111,15 +3388,15 @@ error:
                        PERROR("close");
                }
        }
-       if (sock >= 0) {
-               ret = close(sock);
-               if (ret) {
-                       PERROR("close");
-               }
+
+       if (err) {
+               health_error(&health_thread_cmd);
+               ERR("Health error occurred in %s", __func__);
        }
 
-       lttng_poll_clean(&events);
-       clean_command_ctx(&cmd_ctx);
+       health_unregister();
+
+       DBG("Client thread dying");
 
        rcu_unregister_thread();
        return NULL;
@@ -3151,6 +3428,7 @@ static void usage(void)
        fprintf(stderr, "  -S, --sig-parent                   Send SIGCHLD to parent pid to notify readiness.\n");
        fprintf(stderr, "  -q, --quiet                        No output at all.\n");
        fprintf(stderr, "  -v, --verbose                      Verbose mode. Activate DBG() macro.\n");
+       fprintf(stderr, "  -p, --pidfile FILE                 Write a pid to FILE name overriding the default value.\n");
        fprintf(stderr, "      --verbose-consumer             Verbose mode for consumer. Activate DBG() macro.\n");
        fprintf(stderr, "      --no-kernel                    Disable kernel tracer\n");
 }
@@ -3184,12 +3462,13 @@ static int parse_args(int argc, char **argv)
                { "verbose", 0, 0, 'v' },
                { "verbose-consumer", 0, 0, 'Z' },
                { "no-kernel", 0, 0, 'N' },
+               { "pidfile", 1, 0, 'p' },
                { NULL, 0, 0, 0 }
        };
 
        while (1) {
                int option_index = 0;
-               c = getopt_long(argc, argv, "dhqvVSN" "a:c:g:s:C:E:D:F:Z:u:t",
+               c = getopt_long(argc, argv, "dhqvVSN" "a:c:g:s:C:E:D:F:Z:u:t:p:",
                                long_options, &option_index);
                if (c == -1) {
                        break;
@@ -3266,6 +3545,9 @@ static int parse_args(int argc, char **argv)
                case 'T':
                        consumerd64_libdir = optarg;
                        break;
+               case 'p':
+                       opt_pidfile = optarg;
+                       break;
                default:
                        /* Unknown option or other error.
                         * Error is printed by getopt, just return */
@@ -3296,6 +3578,14 @@ static int init_daemon_socket(void)
                goto end;
        }
 
+       /* Set the cloexec flag */
+       ret = utils_set_fd_cloexec(client_sock);
+       if (ret < 0) {
+               ERR("Unable to set CLOEXEC flag to the client Unix socket (fd: %d). "
+                               "Continuing but note that the consumer daemon will have a "
+                               "reference to this socket on exec()", client_sock);
+       }
+
        /* File permission MUST be 660 */
        ret = chmod(client_unix_sock_path, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
        if (ret < 0) {
@@ -3312,6 +3602,14 @@ static int init_daemon_socket(void)
                goto end;
        }
 
+       /* Set the cloexec flag */
+       ret = utils_set_fd_cloexec(apps_sock);
+       if (ret < 0) {
+               ERR("Unable to set CLOEXEC flag to the app Unix socket (fd: %d). "
+                               "Continuing but note that the consumer daemon will have a "
+                               "reference to this socket on exec()", apps_sock);
+       }
+
        /* File permission MUST be 666 */
        ret = chmod(apps_unix_sock_path,
                        S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
@@ -3321,6 +3619,9 @@ static int init_daemon_socket(void)
                goto end;
        }
 
+       DBG3("Session daemon client socket %d and application socket %d created",
+                       client_sock, apps_sock);
+
 end:
        umask(old_umask);
        return ret;
@@ -3573,6 +3874,38 @@ static void set_ulimit(void)
        }
 }
 
+/*
+ * Write pidfile using the rundir and opt_pidfile.
+ */
+static void write_pidfile(void)
+{
+       int ret;
+       char pidfile_path[PATH_MAX];
+
+       assert(rundir);
+
+       if (opt_pidfile) {
+               strncpy(pidfile_path, opt_pidfile, sizeof(pidfile_path));
+       } else {
+               /* Build pidfile path from rundir and opt_pidfile. */
+               ret = snprintf(pidfile_path, sizeof(pidfile_path), "%s/"
+                               DEFAULT_LTTNG_SESSIOND_PIDFILE, rundir);
+               if (ret < 0) {
+                       PERROR("snprintf pidfile path");
+                       goto error;
+               }
+       }
+
+       /*
+        * Create pid file in rundir. Return value is of no importance. The
+        * execution will continue even though we are not able to write the file.
+        */
+       (void) utils_create_pid_file(getpid(), pidfile_path);
+
+error:
+       return;
+}
+
 /*
  * main
  */
@@ -3580,7 +3913,7 @@ int main(int argc, char **argv)
 {
        int ret = 0;
        void *status;
-       const char *home_path;
+       const char *home_path, *env_app_timeout;
 
        init_kernel_workarounds();
 
@@ -3590,7 +3923,7 @@ int main(int argc, char **argv)
 
        /* Parse arguments */
        progname = argv[0];
-       if ((ret = parse_args(argc, argv) < 0)) {
+       if ((ret = parse_args(argc, argv)) < 0) {
                goto error;
        }
 
@@ -3817,8 +4150,10 @@ int main(int argc, char **argv)
        }
 
        /* Setup the kernel pipe for waking up the kernel thread */
-       if ((ret = utils_create_pipe_cloexec(kernel_poll_pipe)) < 0) {
-               goto exit;
+       if (is_root && !opt_no_kernel) {
+               if ((ret = utils_create_pipe_cloexec(kernel_poll_pipe)) < 0) {
+                       goto exit;
+               }
        }
 
        /* Setup the thread apps communication pipe. */
@@ -3840,23 +4175,15 @@ int main(int argc, char **argv)
 
        cmd_init();
 
-       /* Init all health thread counters. */
-       health_init(&health_thread_cmd);
-       health_init(&health_thread_kernel);
-       health_init(&health_thread_app_manage);
-       health_init(&health_thread_app_reg);
+       /* Check for the application socket timeout env variable. */
+       env_app_timeout = getenv(DEFAULT_APP_SOCKET_TIMEOUT_ENV);
+       if (env_app_timeout) {
+               app_socket_timeout = atoi(env_app_timeout);
+       } else {
+               app_socket_timeout = DEFAULT_APP_SOCKET_RW_TIMEOUT;
+       }
 
-       /*
-        * Init health counters of the consumer thread. We do a quick hack here to
-        * the state of the consumer health is fine even if the thread is not
-        * started.  This is simply to ease our life and has no cost what so ever.
-        */
-       health_init(&kconsumer_data.health);
-       health_poll_update(&kconsumer_data.health);
-       health_init(&ustconsumer32_data.health);
-       health_poll_update(&ustconsumer32_data.health);
-       health_init(&ustconsumer64_data.health);
-       health_poll_update(&ustconsumer64_data.health);
+       write_pidfile();
 
        /* Create thread to manage the client socket */
        ret = pthread_create(&health_thread, NULL,
@@ -3898,18 +4225,21 @@ int main(int argc, char **argv)
                goto exit_apps;
        }
 
-       /* Create kernel thread to manage kernel event */
-       ret = pthread_create(&kernel_thread, NULL,
-                       thread_manage_kernel, (void *) NULL);
-       if (ret != 0) {
-               PERROR("pthread_create kernel");
-               goto exit_kernel;
-       }
+       /* Don't start this thread if kernel tracing is not requested nor root */
+       if (is_root && !opt_no_kernel) {
+               /* Create kernel thread to manage kernel event */
+               ret = pthread_create(&kernel_thread, NULL,
+                               thread_manage_kernel, (void *) NULL);
+               if (ret != 0) {
+                       PERROR("pthread_create kernel");
+                       goto exit_kernel;
+               }
 
-       ret = pthread_join(kernel_thread, &status);
-       if (ret != 0) {
-               PERROR("pthread_join");
-               goto error;     /* join error, exit without cleanup */
+               ret = pthread_join(kernel_thread, &status);
+               if (ret != 0) {
+                       PERROR("pthread_join");
+                       goto error;     /* join error, exit without cleanup */
+               }
        }
 
 exit_kernel:
@@ -3946,7 +4276,25 @@ exit_dispatch:
                goto error;     /* join error, exit without cleanup */
        }
 
+       ret = join_consumer_thread(&ustconsumer32_data);
+       if (ret != 0) {
+               PERROR("join_consumer ust32");
+               goto error;     /* join error, exit without cleanup */
+       }
+
+       ret = join_consumer_thread(&ustconsumer64_data);
+       if (ret != 0) {
+               PERROR("join_consumer ust64");
+               goto error;     /* join error, exit without cleanup */
+       }
+
 exit_client:
+       ret = pthread_join(health_thread, &status);
+       if (ret != 0) {
+               PERROR("pthread_join health thread");
+               goto error;     /* join error, exit without cleanup */
+       }
+
 exit_health:
 exit:
        /*
This page took 0.07781 seconds and 4 git commands to generate.