Launch the consumer management thread using lttng_thread
authorJérémie Galarneau <jeremie.galarneau@efficios.com>
Wed, 5 Dec 2018 04:19:25 +0000 (23:19 -0500)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Thu, 6 Dec 2018 21:34:34 +0000 (16:34 -0500)
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
src/bin/lttng-sessiond/Makefile.am
src/bin/lttng-sessiond/client.c
src/bin/lttng-sessiond/consumer.h
src/bin/lttng-sessiond/globals.c
src/bin/lttng-sessiond/main.c
src/bin/lttng-sessiond/manage-consumer.c [new file with mode: 0644]
src/bin/lttng-sessiond/manage-consumer.h [new file with mode: 0644]

index e1e5a00e8764c230d510835e679ebf29a4ac348d..fba2a5a1c40919141e3112569aee5e15f6bb393d 100644 (file)
@@ -48,7 +48,8 @@ lttng_sessiond_SOURCES = utils.c utils.h \
                        dispatch.c dispatch.h \
                        register.c register.h \
                        manage-apps.c manage-apps.h \
-                       manage-kernel.c manage-kernel.h
+                       manage-kernel.c manage-kernel.h \
+                       manage-consumer.c manage-consumer.h
 
 if HAVE_LIBLTTNG_UST_CTL
 lttng_sessiond_SOURCES += trace-ust.c ust-registry.c ust-app.c \
index 119062c15c648f2dfcea8e395751e69d396080e3..ea8ffd45fc066c3cfa16b4ac8017400361888e8d 100644 (file)
@@ -35,6 +35,7 @@
 #include "health-sessiond.h"
 #include "testpoint.h"
 #include "utils.h"
+#include "manage-consumer.h"
 
 static bool is_root;
 
@@ -116,133 +117,11 @@ end:
 
 /*
  * Start the thread_manage_consumer. This must be done after a lttng-consumerd
- * exec or it will fails.
+ * exec or it will fail.
  */
 static int spawn_consumer_thread(struct consumer_data *consumer_data)
 {
-       int ret, clock_ret;
-       struct timespec timeout;
-
-       /*
-        * Make sure we set the readiness flag to 0 because we are NOT ready.
-        * This access to consumer_thread_is_ready does not need to be
-        * protected by consumer_data.cond_mutex (yet) since the consumer
-        * management thread has not been started at this point.
-        */
-       consumer_data->consumer_thread_is_ready = 0;
-
-       /* Setup pthread condition */
-       ret = pthread_condattr_init(&consumer_data->condattr);
-       if (ret) {
-               errno = ret;
-               PERROR("pthread_condattr_init consumer data");
-               goto error;
-       }
-
-       /*
-        * Set the monotonic clock in order to make sure we DO NOT jump in time
-        * between the clock_gettime() call and the timedwait call. See bug #324
-        * for a more details and how we noticed it.
-        */
-       ret = pthread_condattr_setclock(&consumer_data->condattr, CLOCK_MONOTONIC);
-       if (ret) {
-               errno = ret;
-               PERROR("pthread_condattr_setclock consumer data");
-               goto error;
-       }
-
-       ret = pthread_cond_init(&consumer_data->cond, &consumer_data->condattr);
-       if (ret) {
-               errno = ret;
-               PERROR("pthread_cond_init consumer data");
-               goto error;
-       }
-
-       ret = pthread_create(&consumer_data->thread, default_pthread_attr(),
-                       thread_manage_consumer, consumer_data);
-       if (ret) {
-               errno = ret;
-               PERROR("pthread_create consumer");
-               ret = -1;
-               goto error;
-       }
-
-       /* We are about to wait on a pthread condition */
-       pthread_mutex_lock(&consumer_data->cond_mutex);
-
-       /* Get time for sem_timedwait absolute timeout */
-       clock_ret = lttng_clock_gettime(CLOCK_MONOTONIC, &timeout);
-       /*
-        * Set the timeout for the condition timed wait even if the clock gettime
-        * call fails since we might loop on that call and we want to avoid to
-        * increment the timeout too many times.
-        */
-       timeout.tv_sec += DEFAULT_SEM_WAIT_TIMEOUT;
-
-       /*
-        * The following loop COULD be skipped in some conditions so this is why we
-        * set ret to 0 in order to make sure at least one round of the loop is
-        * done.
-        */
-       ret = 0;
-
-       /*
-        * Loop until the condition is reached or when a timeout is reached. Note
-        * that the pthread_cond_timedwait(P) man page specifies that EINTR can NOT
-        * be returned but the pthread_cond(3), from the glibc-doc, says that it is
-        * possible. This loop does not take any chances and works with both of
-        * them.
-        */
-       while (!consumer_data->consumer_thread_is_ready && ret != ETIMEDOUT) {
-               if (clock_ret < 0) {
-                       PERROR("clock_gettime spawn consumer");
-                       /* Infinite wait for the consumerd thread to be ready */
-                       ret = pthread_cond_wait(&consumer_data->cond,
-                                       &consumer_data->cond_mutex);
-               } else {
-                       ret = pthread_cond_timedwait(&consumer_data->cond,
-                                       &consumer_data->cond_mutex, &timeout);
-               }
-       }
-
-       /* Release the pthread condition */
-       pthread_mutex_unlock(&consumer_data->cond_mutex);
-
-       if (ret != 0) {
-               errno = ret;
-               if (ret == ETIMEDOUT) {
-                       int pth_ret;
-
-                       /*
-                        * Call has timed out so we kill the kconsumerd_thread and return
-                        * an error.
-                        */
-                       ERR("Condition timed out. The consumer thread was never ready."
-                                       " Killing it");
-                       pth_ret = pthread_cancel(consumer_data->thread);
-                       if (pth_ret < 0) {
-                               PERROR("pthread_cancel consumer thread");
-                       }
-               } else {
-                       PERROR("pthread_cond_wait failed consumer thread");
-               }
-               /* Caller is expecting a negative value on failure. */
-               ret = -1;
-               goto error;
-       }
-
-       pthread_mutex_lock(&consumer_data->pid_mutex);
-       if (consumer_data->pid == 0) {
-               ERR("Consumerd did not start");
-               pthread_mutex_unlock(&consumer_data->pid_mutex);
-               goto error;
-       }
-       pthread_mutex_unlock(&consumer_data->pid_mutex);
-
-       return 0;
-
-error:
-       return ret;
+       return launch_consumer_management_thread(consumer_data) ? 0 : -1;
 }
 
 /*
@@ -757,27 +636,6 @@ error:
        return ret;
 }
 
-/*
- * Join consumer thread
- */
-static int join_consumer_thread(struct consumer_data *consumer_data)
-{
-       void *status;
-
-       /* Consumer pid must be a real one. */
-       if (consumer_data->pid > 0) {
-               int ret;
-               ret = kill(consumer_data->pid, SIGTERM);
-               if (ret) {
-                       PERROR("Error killing consumer daemon");
-                       return ret;
-               }
-               return pthread_join(consumer_data->thread, &status);
-       } else {
-               return 0;
-       }
-}
-
 /*
  * Version of setup_lttng_msg() without command header.
  */
@@ -2490,28 +2348,6 @@ error_create_poll:
        DBG("Client thread dying");
 
        rcu_unregister_thread();
-
-       /*
-        * Since we are creating the consumer threads, we own them, so we need
-        * to join them before our thread exits.
-        */
-       ret = join_consumer_thread(&kconsumer_data);
-       if (ret) {
-               errno = ret;
-               PERROR("join_consumer");
-       }
-
-       ret = join_consumer_thread(&ustconsumer32_data);
-       if (ret) {
-               errno = ret;
-               PERROR("join_consumer ust32");
-       }
-
-       ret = join_consumer_thread(&ustconsumer64_data);
-       if (ret) {
-               errno = ret;
-               PERROR("join_consumer ust64");
-       }
        return NULL;
 }
 
index c98ebb34f8f6b98e55fccbfa15a915df6913c778..0e7ed0017a5a9d42f2b15b5ff5c8c074ba3051d9 100644 (file)
@@ -66,26 +66,6 @@ struct consumer_socket {
 struct consumer_data {
        enum lttng_consumer_type type;
 
-       pthread_t thread;       /* Worker thread interacting with the consumer */
-
-       /* Conditions used by the consumer thread to indicate readiness. */
-       pthread_cond_t cond;
-       pthread_condattr_t condattr;
-       pthread_mutex_t cond_mutex;
-
-       /*
-        * This is a flag condition indicating that the consumer thread is ready
-        * and connected to the lttng-consumerd daemon. This flag MUST only be
-        * updated by locking the condition mutex above or before spawning a
-        * consumer thread.
-        *
-        * A value of 0 means that the thread is NOT ready. A value of 1 means that
-        * the thread consumer did connect successfully to the lttng-consumerd
-        * daemon. A negative value indicates that there is been an error and the
-        * thread has likely quit.
-        */
-       int consumer_thread_is_ready;
-
        /* Mutex to control consumerd pid assignation */
        pthread_mutex_t pid_mutex;
        pid_t pid;
index 6ca5a5a712901328d53e078cc0391b42680168d4..efe80baaf2fef86a76b8ec66366d67f997c08e0e 100644 (file)
@@ -48,8 +48,6 @@ struct consumer_data kconsumer_data = {
        .channel_monitor_pipe = -1,
        .pid_mutex = PTHREAD_MUTEX_INITIALIZER,
        .lock = PTHREAD_MUTEX_INITIALIZER,
-       .cond = PTHREAD_COND_INITIALIZER,
-       .cond_mutex = PTHREAD_MUTEX_INITIALIZER,
 };
 
 struct consumer_data ustconsumer64_data = {
@@ -59,8 +57,6 @@ struct consumer_data ustconsumer64_data = {
        .channel_monitor_pipe = -1,
        .pid_mutex = PTHREAD_MUTEX_INITIALIZER,
        .lock = PTHREAD_MUTEX_INITIALIZER,
-       .cond = PTHREAD_COND_INITIALIZER,
-       .cond_mutex = PTHREAD_MUTEX_INITIALIZER,
 };
 
 struct consumer_data ustconsumer32_data = {
@@ -70,8 +66,6 @@ struct consumer_data ustconsumer32_data = {
        .channel_monitor_pipe = -1,
        .pid_mutex = PTHREAD_MUTEX_INITIALIZER,
        .lock = PTHREAD_MUTEX_INITIALIZER,
-       .cond = PTHREAD_COND_INITIALIZER,
-       .cond_mutex = PTHREAD_MUTEX_INITIALIZER,
 };
 
 enum consumerd_state ust_consumerd_state;
index 6d0fb42e393e8ab072c25c5016c9eb776073e0c6..1fbfb1b0549383cd77e5334aad25338320cb6d56 100644 (file)
@@ -372,390 +372,6 @@ static void sessiond_cleanup_options(void)
        run_as_destroy_worker();
 }
 
-/*
- * Signal pthread condition of the consumer data that the thread.
- */
-static void signal_consumer_condition(struct consumer_data *data, int state)
-{
-       pthread_mutex_lock(&data->cond_mutex);
-
-       /*
-        * The state is set before signaling. It can be any value, it's the waiter
-        * job to correctly interpret this condition variable associated to the
-        * consumer pthread_cond.
-        *
-        * A value of 0 means that the corresponding thread of the consumer data
-        * was not started. 1 indicates that the thread has started and is ready
-        * for action. A negative value means that there was an error during the
-        * thread bootstrap.
-        */
-       data->consumer_thread_is_ready = state;
-       (void) pthread_cond_signal(&data->cond);
-
-       pthread_mutex_unlock(&data->cond_mutex);
-}
-
-/*
- * This thread manage the consumer error sent back to the session daemon.
- */
-void *thread_manage_consumer(void *data)
-{
-       int sock = -1, i, ret, pollfd, err = -1, should_quit = 0;
-       uint32_t revents, nb_fd;
-       enum lttcomm_return_code code;
-       struct lttng_poll_event events;
-       struct consumer_data *consumer_data = data;
-       struct consumer_socket *cmd_socket_wrapper = NULL;
-
-       DBG("[thread] Manage consumer started");
-
-       rcu_register_thread();
-       rcu_thread_online();
-
-       health_register(health_sessiond, HEALTH_SESSIOND_TYPE_CONSUMER);
-
-       health_code_update();
-
-       /*
-        * Pass 3 as size here for the thread quit pipe, consumerd_err_sock and the
-        * metadata_sock. Nothing more will be added to this poll set.
-        */
-       ret = sessiond_set_thread_pollset(&events, 3);
-       if (ret < 0) {
-               goto error_poll;
-       }
-
-       /*
-        * The error socket here is already in a listening state which was done
-        * just before spawning this thread to avoid a race between the consumer
-        * daemon exec trying to connect and the listen() call.
-        */
-       ret = lttng_poll_add(&events, consumer_data->err_sock, LPOLLIN | LPOLLRDHUP);
-       if (ret < 0) {
-               goto error;
-       }
-
-       health_code_update();
-
-       /* Infinite blocking call, waiting for transmission */
-restart:
-       health_poll_entry();
-
-       if (testpoint(sessiond_thread_manage_consumer)) {
-               goto error;
-       }
-
-       ret = lttng_poll_wait(&events, -1);
-       health_poll_exit();
-       if (ret < 0) {
-               /*
-                * Restart interrupted system call.
-                */
-               if (errno == EINTR) {
-                       goto restart;
-               }
-               goto error;
-       }
-
-       nb_fd = ret;
-
-       for (i = 0; i < nb_fd; i++) {
-               /* Fetch once the poll data */
-               revents = LTTNG_POLL_GETEV(&events, i);
-               pollfd = LTTNG_POLL_GETFD(&events, i);
-
-               health_code_update();
-
-               if (!revents) {
-                       /* No activity for this FD (poll implementation). */
-                       continue;
-               }
-
-               /* Thread quit pipe has been closed. Killing thread. */
-               ret = sessiond_check_thread_quit_pipe(pollfd, revents);
-               if (ret) {
-                       err = 0;
-                       goto exit;
-               }
-
-               /* Event on the registration socket */
-               if (pollfd == consumer_data->err_sock) {
-                       if (revents & LPOLLIN) {
-                               continue;
-                       } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
-                               ERR("consumer err socket poll error");
-                               goto error;
-                       } else {
-                               ERR("Unexpected poll events %u for sock %d", revents, pollfd);
-                               goto error;
-                       }
-               }
-       }
-
-       sock = lttcomm_accept_unix_sock(consumer_data->err_sock);
-       if (sock < 0) {
-               goto error;
-       }
-
-       /*
-        * Set the CLOEXEC flag. Return code is useless because either way, the
-        * show must go on.
-        */
-       (void) utils_set_fd_cloexec(sock);
-
-       health_code_update();
-
-       DBG2("Receiving code from consumer err_sock");
-
-       /* Getting status code from kconsumerd */
-       ret = lttcomm_recv_unix_sock(sock, &code,
-                       sizeof(enum lttcomm_return_code));
-       if (ret <= 0) {
-               goto error;
-       }
-
-       health_code_update();
-       if (code != LTTCOMM_CONSUMERD_COMMAND_SOCK_READY) {
-               ERR("consumer error when waiting for SOCK_READY : %s",
-                               lttcomm_get_readable_code(-code));
-               goto error;
-       }
-
-       /* Connect both command and metadata sockets. */
-       consumer_data->cmd_sock =
-                       lttcomm_connect_unix_sock(
-                               consumer_data->cmd_unix_sock_path);
-       consumer_data->metadata_fd =
-                       lttcomm_connect_unix_sock(
-                               consumer_data->cmd_unix_sock_path);
-       if (consumer_data->cmd_sock < 0 || consumer_data->metadata_fd < 0) {
-               PERROR("consumer connect cmd socket");
-               /* On error, signal condition and quit. */
-               signal_consumer_condition(consumer_data, -1);
-               goto error;
-       }
-
-       consumer_data->metadata_sock.fd_ptr = &consumer_data->metadata_fd;
-
-       /* Create metadata socket lock. */
-       consumer_data->metadata_sock.lock = zmalloc(sizeof(pthread_mutex_t));
-       if (consumer_data->metadata_sock.lock == NULL) {
-               PERROR("zmalloc pthread mutex");
-               goto error;
-       }
-       pthread_mutex_init(consumer_data->metadata_sock.lock, NULL);
-
-       DBG("Consumer command socket ready (fd: %d", consumer_data->cmd_sock);
-       DBG("Consumer metadata socket ready (fd: %d)",
-                       consumer_data->metadata_fd);
-
-       /*
-        * Remove the consumerd error sock since we've established a connection.
-        */
-       ret = lttng_poll_del(&events, consumer_data->err_sock);
-       if (ret < 0) {
-               goto error;
-       }
-
-       /* Add new accepted error socket. */
-       ret = lttng_poll_add(&events, sock, LPOLLIN | LPOLLRDHUP);
-       if (ret < 0) {
-               goto error;
-       }
-
-       /* Add metadata socket that is successfully connected. */
-       ret = lttng_poll_add(&events, consumer_data->metadata_fd,
-                       LPOLLIN | LPOLLRDHUP);
-       if (ret < 0) {
-               goto error;
-       }
-
-       health_code_update();
-
-       /*
-        * Transfer the write-end of the channel monitoring and rotate pipe
-        * to the consumer by issuing a SET_CHANNEL_MONITOR_PIPE command.
-        */
-       cmd_socket_wrapper = consumer_allocate_socket(&consumer_data->cmd_sock);
-       if (!cmd_socket_wrapper) {
-               goto error;
-       }
-       cmd_socket_wrapper->lock = &consumer_data->lock;
-
-       ret = consumer_send_channel_monitor_pipe(cmd_socket_wrapper,
-                       consumer_data->channel_monitor_pipe);
-       if (ret) {
-               goto error;
-       }
-
-       /* Discard the socket wrapper as it is no longer needed. */
-       consumer_destroy_socket(cmd_socket_wrapper);
-       cmd_socket_wrapper = NULL;
-
-       /* The thread is completely initialized, signal that it is ready. */
-       signal_consumer_condition(consumer_data, 1);
-
-       /* Infinite blocking call, waiting for transmission */
-restart_poll:
-       while (1) {
-               health_code_update();
-
-               /* Exit the thread because the thread quit pipe has been triggered. */
-               if (should_quit) {
-                       /* Not a health error. */
-                       err = 0;
-                       goto exit;
-               }
-
-               health_poll_entry();
-               ret = lttng_poll_wait(&events, -1);
-               health_poll_exit();
-               if (ret < 0) {
-                       /*
-                        * Restart interrupted system call.
-                        */
-                       if (errno == EINTR) {
-                               goto restart_poll;
-                       }
-                       goto error;
-               }
-
-               nb_fd = ret;
-
-               for (i = 0; i < nb_fd; i++) {
-                       /* Fetch once the poll data */
-                       revents = LTTNG_POLL_GETEV(&events, i);
-                       pollfd = LTTNG_POLL_GETFD(&events, i);
-
-                       health_code_update();
-
-                       if (!revents) {
-                               /* No activity for this FD (poll implementation). */
-                               continue;
-                       }
-
-                       /*
-                        * Thread quit pipe has been triggered, flag that we should stop
-                        * but continue the current loop to handle potential data from
-                        * consumer.
-                        */
-                       should_quit = sessiond_check_thread_quit_pipe(pollfd, revents);
-
-                       if (pollfd == sock) {
-                               /* Event on the consumerd socket */
-                               if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)
-                                               && !(revents & LPOLLIN)) {
-                                       ERR("consumer err socket second poll error");
-                                       goto error;
-                               }
-                               health_code_update();
-                               /* Wait for any kconsumerd error */
-                               ret = lttcomm_recv_unix_sock(sock, &code,
-                                               sizeof(enum lttcomm_return_code));
-                               if (ret <= 0) {
-                                       ERR("consumer closed the command socket");
-                                       goto error;
-                               }
-
-                               ERR("consumer return code : %s",
-                                               lttcomm_get_readable_code(-code));
-
-                               goto exit;
-                       } else if (pollfd == consumer_data->metadata_fd) {
-                               if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)
-                                               && !(revents & LPOLLIN)) {
-                                       ERR("consumer err metadata socket second poll error");
-                                       goto error;
-                               }
-                               /* UST metadata requests */
-                               ret = ust_consumer_metadata_request(
-                                               &consumer_data->metadata_sock);
-                               if (ret < 0) {
-                                       ERR("Handling metadata request");
-                                       goto error;
-                               }
-                       }
-                       /* No need for an else branch all FDs are tested prior. */
-               }
-               health_code_update();
-       }
-
-exit:
-error:
-       /*
-        * We lock here because we are about to close the sockets and some other
-        * thread might be using them so get exclusive access which will abort all
-        * other consumer command by other threads.
-        */
-       pthread_mutex_lock(&consumer_data->lock);
-
-       /* Immediately set the consumerd state to stopped */
-       if (consumer_data->type == LTTNG_CONSUMER_KERNEL) {
-               uatomic_set(&kernel_consumerd_state, CONSUMER_ERROR);
-       } else if (consumer_data->type == LTTNG_CONSUMER64_UST ||
-                       consumer_data->type == LTTNG_CONSUMER32_UST) {
-               uatomic_set(&ust_consumerd_state, CONSUMER_ERROR);
-       } else {
-               /* Code flow error... */
-               assert(0);
-       }
-
-       if (consumer_data->err_sock >= 0) {
-               ret = close(consumer_data->err_sock);
-               if (ret) {
-                       PERROR("close");
-               }
-               consumer_data->err_sock = -1;
-       }
-       if (consumer_data->cmd_sock >= 0) {
-               ret = close(consumer_data->cmd_sock);
-               if (ret) {
-                       PERROR("close");
-               }
-               consumer_data->cmd_sock = -1;
-       }
-       if (consumer_data->metadata_sock.fd_ptr &&
-           *consumer_data->metadata_sock.fd_ptr >= 0) {
-               ret = close(*consumer_data->metadata_sock.fd_ptr);
-               if (ret) {
-                       PERROR("close");
-               }
-       }
-       if (sock >= 0) {
-               ret = close(sock);
-               if (ret) {
-                       PERROR("close");
-               }
-       }
-
-       unlink(consumer_data->err_unix_sock_path);
-       unlink(consumer_data->cmd_unix_sock_path);
-       pthread_mutex_unlock(&consumer_data->lock);
-
-       /* Cleanup metadata socket mutex. */
-       if (consumer_data->metadata_sock.lock) {
-               pthread_mutex_destroy(consumer_data->metadata_sock.lock);
-               free(consumer_data->metadata_sock.lock);
-       }
-       lttng_poll_clean(&events);
-
-       if (cmd_socket_wrapper) {
-               consumer_destroy_socket(cmd_socket_wrapper);
-       }
-error_poll:
-       if (err) {
-               health_error();
-               ERR("Health error occurred in %s", __func__);
-       }
-       health_unregister(health_sessiond);
-       DBG("consumer thread cleanup completed");
-
-       rcu_thread_offline();
-       rcu_unregister_thread();
-
-       return NULL;
-}
-
 /*
  * Setup necessary data for kernel tracer action.
  */
diff --git a/src/bin/lttng-sessiond/manage-consumer.c b/src/bin/lttng-sessiond/manage-consumer.c
new file mode 100644 (file)
index 0000000..47bfe52
--- /dev/null
@@ -0,0 +1,480 @@
+/*
+ * Copyright (C) 2011 - David Goulet <david.goulet@polymtl.ca>
+ *                      Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ *               2013 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License, version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#include <signal.h>
+
+#include <common/pipe.h>
+#include <common/utils.h>
+
+#include "manage-consumer.h"
+#include "testpoint.h"
+#include "health-sessiond.h"
+#include "utils.h"
+#include "thread.h"
+#include "ust-consumer.h"
+
+struct thread_notifiers {
+       struct lttng_pipe *quit_pipe;
+       struct consumer_data *consumer_data;
+        sem_t ready;
+       int initialization_result;
+};
+
+static void mark_thread_as_ready(struct thread_notifiers *notifiers)
+{
+       DBG("Marking consumer management thread as ready");
+       notifiers->initialization_result = 0;
+       sem_post(&notifiers->ready);
+}
+
+static void mark_thread_intialization_as_failed(
+               struct thread_notifiers *notifiers)
+{
+        ERR("Consumer management thread entering error state");
+       notifiers->initialization_result = -1;
+       sem_post(&notifiers->ready);
+}
+
+static void wait_until_thread_is_ready(struct thread_notifiers *notifiers)
+{
+       DBG("Waiting for consumer management thread to be ready");
+       sem_wait(&notifiers->ready);
+       DBG("Consumer management thread is ready");
+}
+
+/*
+ * This thread manage the consumer error sent back to the session daemon.
+ */
+void *thread_consumer_management(void *data)
+{
+       int sock = -1, i, ret, pollfd, err = -1, should_quit = 0;
+       uint32_t revents, nb_fd;
+       enum lttcomm_return_code code;
+       struct lttng_poll_event events;
+       struct thread_notifiers *notifiers = data;
+       struct consumer_data *consumer_data = notifiers->consumer_data;
+       const int quit_pipe_read_fd = lttng_pipe_get_readfd(notifiers->quit_pipe);
+       struct consumer_socket *cmd_socket_wrapper = NULL;
+
+       DBG("[thread] Manage consumer started");
+
+       rcu_register_thread();
+       rcu_thread_online();
+
+       health_register(health_sessiond, HEALTH_SESSIOND_TYPE_CONSUMER);
+
+       health_code_update();
+
+       /*
+        * Pass 3 as size here for the thread quit pipe, consumerd_err_sock and the
+        * metadata_sock. Nothing more will be added to this poll set.
+        */
+       ret = lttng_poll_create(&events, 3, LTTNG_CLOEXEC);
+       if (ret < 0) {
+               mark_thread_intialization_as_failed(notifiers);
+               goto error_poll;
+       }
+
+       ret = lttng_poll_add(&events, quit_pipe_read_fd, LPOLLIN | LPOLLERR);
+       if (ret < 0) {
+               mark_thread_intialization_as_failed(notifiers);
+               goto error;
+       }
+
+       /*
+        * The error socket here is already in a listening state which was done
+        * just before spawning this thread to avoid a race between the consumer
+        * daemon exec trying to connect and the listen() call.
+        */
+       ret = lttng_poll_add(&events, consumer_data->err_sock, LPOLLIN | LPOLLRDHUP);
+       if (ret < 0) {
+               mark_thread_intialization_as_failed(notifiers);
+               goto error;
+       }
+
+       health_code_update();
+
+       /* Infinite blocking call, waiting for transmission */
+       health_poll_entry();
+
+       if (testpoint(sessiond_thread_manage_consumer)) {
+               mark_thread_intialization_as_failed(notifiers);
+               goto error;
+       }
+
+       ret = lttng_poll_wait(&events, -1);
+       health_poll_exit();
+       if (ret < 0) {
+               mark_thread_intialization_as_failed(notifiers);
+               goto error;
+       }
+
+       nb_fd = ret;
+
+       for (i = 0; i < nb_fd; i++) {
+               /* Fetch once the poll data */
+               revents = LTTNG_POLL_GETEV(&events, i);
+               pollfd = LTTNG_POLL_GETFD(&events, i);
+
+               health_code_update();
+
+               if (!revents) {
+                       /* No activity for this FD (poll implementation). */
+                       continue;
+               }
+
+               /* Thread quit pipe has been closed. Killing thread. */
+               if (pollfd == quit_pipe_read_fd) {
+                       err = 0;
+                       mark_thread_intialization_as_failed(notifiers);
+                       goto exit;
+               } else if (pollfd == consumer_data->err_sock) {
+                       /* Event on the registration socket */
+                       if (revents & LPOLLIN) {
+                               continue;
+                       } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+                               ERR("consumer err socket poll error");
+                               mark_thread_intialization_as_failed(notifiers);
+                               goto error;
+                       } else {
+                               ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+                               mark_thread_intialization_as_failed(notifiers);
+                               goto error;
+                       }
+               }
+       }
+
+       sock = lttcomm_accept_unix_sock(consumer_data->err_sock);
+       if (sock < 0) {
+               mark_thread_intialization_as_failed(notifiers);
+               goto error;
+       }
+
+       /*
+        * Set the CLOEXEC flag. Return code is useless because either way, the
+        * show must go on.
+        */
+       (void) utils_set_fd_cloexec(sock);
+
+       health_code_update();
+
+       DBG2("Receiving code from consumer err_sock");
+
+       /* Getting status code from kconsumerd */
+       ret = lttcomm_recv_unix_sock(sock, &code,
+                       sizeof(enum lttcomm_return_code));
+       if (ret <= 0) {
+               mark_thread_intialization_as_failed(notifiers);
+               goto error;
+       }
+
+       health_code_update();
+       if (code != LTTCOMM_CONSUMERD_COMMAND_SOCK_READY) {
+               ERR("consumer error when waiting for SOCK_READY : %s",
+                               lttcomm_get_readable_code(-code));
+               mark_thread_intialization_as_failed(notifiers);
+               goto error;
+       }
+
+       /* Connect both command and metadata sockets. */
+       consumer_data->cmd_sock =
+                       lttcomm_connect_unix_sock(
+                               consumer_data->cmd_unix_sock_path);
+       consumer_data->metadata_fd =
+                       lttcomm_connect_unix_sock(
+                               consumer_data->cmd_unix_sock_path);
+       if (consumer_data->cmd_sock < 0 || consumer_data->metadata_fd < 0) {
+               PERROR("consumer connect cmd socket");
+               mark_thread_intialization_as_failed(notifiers);
+               goto error;
+       }
+
+       consumer_data->metadata_sock.fd_ptr = &consumer_data->metadata_fd;
+
+       /* Create metadata socket lock. */
+       consumer_data->metadata_sock.lock = zmalloc(sizeof(pthread_mutex_t));
+       if (consumer_data->metadata_sock.lock == NULL) {
+               PERROR("zmalloc pthread mutex");
+               mark_thread_intialization_as_failed(notifiers);
+               goto error;
+       }
+       pthread_mutex_init(consumer_data->metadata_sock.lock, NULL);
+
+       DBG("Consumer command socket ready (fd: %d)", consumer_data->cmd_sock);
+       DBG("Consumer metadata socket ready (fd: %d)",
+                       consumer_data->metadata_fd);
+
+       /*
+        * Remove the consumerd error sock since we've established a connection.
+        */
+       ret = lttng_poll_del(&events, consumer_data->err_sock);
+       if (ret < 0) {
+               mark_thread_intialization_as_failed(notifiers);
+               goto error;
+       }
+
+       /* Add new accepted error socket. */
+       ret = lttng_poll_add(&events, sock, LPOLLIN | LPOLLRDHUP);
+       if (ret < 0) {
+               mark_thread_intialization_as_failed(notifiers);
+               goto error;
+       }
+
+       /* Add metadata socket that is successfully connected. */
+       ret = lttng_poll_add(&events, consumer_data->metadata_fd,
+                       LPOLLIN | LPOLLRDHUP);
+       if (ret < 0) {
+               mark_thread_intialization_as_failed(notifiers);
+               goto error;
+       }
+
+       health_code_update();
+
+       /*
+        * Transfer the write-end of the channel monitoring and rotate pipe
+        * to the consumer by issuing a SET_CHANNEL_MONITOR_PIPE command.
+        */
+       cmd_socket_wrapper = consumer_allocate_socket(&consumer_data->cmd_sock);
+       if (!cmd_socket_wrapper) {
+               mark_thread_intialization_as_failed(notifiers);
+               goto error;
+       }
+       cmd_socket_wrapper->lock = &consumer_data->lock;
+
+       ret = consumer_send_channel_monitor_pipe(cmd_socket_wrapper,
+                       consumer_data->channel_monitor_pipe);
+       if (ret) {
+               mark_thread_intialization_as_failed(notifiers);
+               goto error;
+       }
+
+       /* Discard the socket wrapper as it is no longer needed. */
+       consumer_destroy_socket(cmd_socket_wrapper);
+       cmd_socket_wrapper = NULL;
+
+       /* The thread is completely initialized, signal that it is ready. */
+       mark_thread_as_ready(notifiers);
+
+       /* Infinite blocking call, waiting for transmission */
+       while (1) {
+               health_code_update();
+
+               /* Exit the thread because the thread quit pipe has been triggered. */
+               if (should_quit) {
+                       /* Not a health error. */
+                       err = 0;
+                       goto exit;
+               }
+
+               health_poll_entry();
+               ret = lttng_poll_wait(&events, -1);
+               health_poll_exit();
+               if (ret < 0) {
+                       goto error;
+               }
+
+               nb_fd = ret;
+
+               for (i = 0; i < nb_fd; i++) {
+                       /* Fetch once the poll data */
+                       revents = LTTNG_POLL_GETEV(&events, i);
+                       pollfd = LTTNG_POLL_GETFD(&events, i);
+
+                       health_code_update();
+
+                       if (!revents) {
+                               /* No activity for this FD (poll implementation). */
+                               continue;
+                       }
+
+                       /*
+                        * Thread quit pipe has been triggered, flag that we should stop
+                        * but continue the current loop to handle potential data from
+                        * consumer.
+                        */
+                       if (pollfd == quit_pipe_read_fd) {
+                               should_quit = 1;
+                       } else if (pollfd == sock) {
+                               /* Event on the consumerd socket */
+                               if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)
+                                               && !(revents & LPOLLIN)) {
+                                       ERR("consumer err socket second poll error");
+                                       goto error;
+                               }
+                               health_code_update();
+                               /* Wait for any kconsumerd error */
+                               ret = lttcomm_recv_unix_sock(sock, &code,
+                                               sizeof(enum lttcomm_return_code));
+                               if (ret <= 0) {
+                                       ERR("consumer closed the command socket");
+                                       goto error;
+                               }
+
+                               ERR("consumer return code : %s",
+                                               lttcomm_get_readable_code(-code));
+
+                               goto exit;
+                       } else if (pollfd == consumer_data->metadata_fd) {
+                               if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)
+                                               && !(revents & LPOLLIN)) {
+                                       ERR("consumer err metadata socket second poll error");
+                                       goto error;
+                               }
+                               /* UST metadata requests */
+                               ret = ust_consumer_metadata_request(
+                                               &consumer_data->metadata_sock);
+                               if (ret < 0) {
+                                       ERR("Handling metadata request");
+                                       goto error;
+                               }
+                       }
+                       /* No need for an else branch all FDs are tested prior. */
+               }
+               health_code_update();
+       }
+
+exit:
+error:
+       /*
+        * We lock here because we are about to close the sockets and some other
+        * thread might be using them so get exclusive access which will abort all
+        * other consumer command by other threads.
+        */
+       pthread_mutex_lock(&consumer_data->lock);
+
+       /* Immediately set the consumerd state to stopped */
+       if (consumer_data->type == LTTNG_CONSUMER_KERNEL) {
+               uatomic_set(&kernel_consumerd_state, CONSUMER_ERROR);
+       } else if (consumer_data->type == LTTNG_CONSUMER64_UST ||
+                       consumer_data->type == LTTNG_CONSUMER32_UST) {
+               uatomic_set(&ust_consumerd_state, CONSUMER_ERROR);
+       } else {
+               /* Code flow error... */
+               assert(0);
+       }
+
+       if (consumer_data->err_sock >= 0) {
+               ret = close(consumer_data->err_sock);
+               if (ret) {
+                       PERROR("close");
+               }
+               consumer_data->err_sock = -1;
+       }
+       if (consumer_data->cmd_sock >= 0) {
+               ret = close(consumer_data->cmd_sock);
+               if (ret) {
+                       PERROR("close");
+               }
+               consumer_data->cmd_sock = -1;
+       }
+       if (consumer_data->metadata_sock.fd_ptr &&
+           *consumer_data->metadata_sock.fd_ptr >= 0) {
+               ret = close(*consumer_data->metadata_sock.fd_ptr);
+               if (ret) {
+                       PERROR("close");
+               }
+       }
+       if (sock >= 0) {
+               ret = close(sock);
+               if (ret) {
+                       PERROR("close");
+               }
+       }
+
+       unlink(consumer_data->err_unix_sock_path);
+       unlink(consumer_data->cmd_unix_sock_path);
+       pthread_mutex_unlock(&consumer_data->lock);
+
+       /* Cleanup metadata socket mutex. */
+       if (consumer_data->metadata_sock.lock) {
+               pthread_mutex_destroy(consumer_data->metadata_sock.lock);
+               free(consumer_data->metadata_sock.lock);
+       }
+       lttng_poll_clean(&events);
+
+       if (cmd_socket_wrapper) {
+               consumer_destroy_socket(cmd_socket_wrapper);
+       }
+error_poll:
+       if (err) {
+               health_error();
+               ERR("Health error occurred in %s", __func__);
+       }
+       health_unregister(health_sessiond);
+       DBG("consumer thread cleanup completed");
+
+       rcu_thread_offline();
+       rcu_unregister_thread();
+
+       return NULL;
+}
+
+static bool shutdown_consumer_management_thread(void *data)
+{
+       struct thread_notifiers *notifiers = data;
+       const int write_fd = lttng_pipe_get_writefd(notifiers->quit_pipe);
+
+       return notify_thread_pipe(write_fd) == 1;
+}
+
+static void cleanup_consumer_management_thread(void *data)
+{
+       struct thread_notifiers *notifiers = data;
+
+       lttng_pipe_destroy(notifiers->quit_pipe);
+       free(notifiers);
+}
+
+bool launch_consumer_management_thread(struct consumer_data *consumer_data)
+{
+       struct lttng_pipe *quit_pipe;
+       struct thread_notifiers *notifiers = NULL;
+       struct lttng_thread *thread;
+
+       quit_pipe = lttng_pipe_open(FD_CLOEXEC);
+       if (!quit_pipe) {
+               goto error;
+       }
+
+       notifiers = zmalloc(sizeof(*notifiers));
+       if (!notifiers) {
+               goto error;
+       }
+       notifiers->quit_pipe = quit_pipe;
+       notifiers->consumer_data = consumer_data;
+       sem_init(&notifiers->ready, 0, 0);
+
+       thread = lttng_thread_create("Consumer management",
+                       thread_consumer_management,
+                       shutdown_consumer_management_thread,
+                       cleanup_consumer_management_thread,
+                       notifiers);
+       if (!thread) {
+               goto error;
+       }
+       wait_until_thread_is_ready(notifiers);
+       lttng_thread_put(thread);
+       if (notifiers->initialization_result) {
+               goto error;
+       }
+       return true;
+error:
+       cleanup_consumer_management_thread(notifiers);
+       return false;
+}
diff --git a/src/bin/lttng-sessiond/manage-consumer.h b/src/bin/lttng-sessiond/manage-consumer.h
new file mode 100644 (file)
index 0000000..0a0067c
--- /dev/null
@@ -0,0 +1,28 @@
+/*
+ * Copyright (C) 2011 - David Goulet <david.goulet@polymtl.ca>
+ *                      Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ *               2013 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License, version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#ifndef SESSIOND_CONSUMER_MANAGEMENT_THREAD_H
+#define SESSIOND_CONSUMER_MANAGEMENT_THREAD_H
+
+#include <stdbool.h>
+#include "lttng-sessiond.h"
+
+bool launch_consumer_management_thread(struct consumer_data *consumer_data);
+
+#endif /* SESSIOND_CONSUMER_MANAGEMENT_THREAD_H */
This page took 0.059114 seconds and 4 git commands to generate.