bin: compile lttng-sessiond as C++
[lttng-tools.git] / src / bin / lttng-sessiond / manage-consumer.c
diff --git a/src/bin/lttng-sessiond/manage-consumer.c b/src/bin/lttng-sessiond/manage-consumer.c
deleted file mode 100644 (file)
index 719d42b..0000000
+++ /dev/null
@@ -1,471 +0,0 @@
-/*
- * Copyright (C) 2011 David Goulet <david.goulet@polymtl.ca>
- * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
- * Copyright (C) 2013 Jérémie Galarneau <jeremie.galarneau@efficios.com>
- *
- * SPDX-License-Identifier: GPL-2.0-only
- *
- */
-
-#include <signal.h>
-
-#include <common/pipe.h>
-#include <common/utils.h>
-
-#include "manage-consumer.h"
-#include "testpoint.h"
-#include "health-sessiond.h"
-#include "utils.h"
-#include "thread.h"
-#include "ust-consumer.h"
-
-struct thread_notifiers {
-       struct lttng_pipe *quit_pipe;
-       struct consumer_data *consumer_data;
-       sem_t ready;
-       int initialization_result;
-};
-
-static void mark_thread_as_ready(struct thread_notifiers *notifiers)
-{
-       DBG("Marking consumer management thread as ready");
-       notifiers->initialization_result = 0;
-       sem_post(&notifiers->ready);
-}
-
-static void mark_thread_intialization_as_failed(
-               struct thread_notifiers *notifiers)
-{
-       ERR("Consumer management thread entering error state");
-       notifiers->initialization_result = -1;
-       sem_post(&notifiers->ready);
-}
-
-static void wait_until_thread_is_ready(struct thread_notifiers *notifiers)
-{
-       DBG("Waiting for consumer management thread to be ready");
-       sem_wait(&notifiers->ready);
-       DBG("Consumer management thread is ready");
-}
-
-/*
- * This thread manage the consumer error sent back to the session daemon.
- */
-static void *thread_consumer_management(void *data)
-{
-       int sock = -1, i, ret, pollfd, err = -1, should_quit = 0;
-       uint32_t revents, nb_fd;
-       enum lttcomm_return_code code;
-       struct lttng_poll_event events;
-       struct thread_notifiers *notifiers = data;
-       struct consumer_data *consumer_data = notifiers->consumer_data;
-       const int quit_pipe_read_fd = lttng_pipe_get_readfd(notifiers->quit_pipe);
-       struct consumer_socket *cmd_socket_wrapper = NULL;
-
-       DBG("[thread] Manage consumer started");
-
-       rcu_register_thread();
-       rcu_thread_online();
-
-       health_register(the_health_sessiond, HEALTH_SESSIOND_TYPE_CONSUMER);
-
-       health_code_update();
-
-       /*
-        * Pass 3 as size here for the thread quit pipe, consumerd_err_sock and the
-        * metadata_sock. Nothing more will be added to this poll set.
-        */
-       ret = lttng_poll_create(&events, 3, LTTNG_CLOEXEC);
-       if (ret < 0) {
-               mark_thread_intialization_as_failed(notifiers);
-               goto error_poll;
-       }
-
-       ret = lttng_poll_add(&events, quit_pipe_read_fd, LPOLLIN | LPOLLERR);
-       if (ret < 0) {
-               mark_thread_intialization_as_failed(notifiers);
-               goto error;
-       }
-
-       /*
-        * The error socket here is already in a listening state which was done
-        * just before spawning this thread to avoid a race between the consumer
-        * daemon exec trying to connect and the listen() call.
-        */
-       ret = lttng_poll_add(&events, consumer_data->err_sock, LPOLLIN | LPOLLRDHUP);
-       if (ret < 0) {
-               mark_thread_intialization_as_failed(notifiers);
-               goto error;
-       }
-
-       health_code_update();
-
-       /* Infinite blocking call, waiting for transmission */
-       health_poll_entry();
-
-       if (testpoint(sessiond_thread_manage_consumer)) {
-               mark_thread_intialization_as_failed(notifiers);
-               goto error;
-       }
-
-       ret = lttng_poll_wait(&events, -1);
-       health_poll_exit();
-       if (ret < 0) {
-               mark_thread_intialization_as_failed(notifiers);
-               goto error;
-       }
-
-       nb_fd = ret;
-
-       for (i = 0; i < nb_fd; i++) {
-               /* Fetch once the poll data */
-               revents = LTTNG_POLL_GETEV(&events, i);
-               pollfd = LTTNG_POLL_GETFD(&events, i);
-
-               health_code_update();
-
-               /* Thread quit pipe has been closed. Killing thread. */
-               if (pollfd == quit_pipe_read_fd) {
-                       err = 0;
-                       mark_thread_intialization_as_failed(notifiers);
-                       goto exit;
-               } else if (pollfd == consumer_data->err_sock) {
-                       /* Event on the registration socket */
-                       if (revents & LPOLLIN) {
-                               continue;
-                       } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
-                               ERR("consumer err socket poll error");
-                               mark_thread_intialization_as_failed(notifiers);
-                               goto error;
-                       } else {
-                               ERR("Unexpected poll events %u for sock %d", revents, pollfd);
-                               mark_thread_intialization_as_failed(notifiers);
-                               goto error;
-                       }
-               }
-       }
-
-       sock = lttcomm_accept_unix_sock(consumer_data->err_sock);
-       if (sock < 0) {
-               mark_thread_intialization_as_failed(notifiers);
-               goto error;
-       }
-
-       /*
-        * Set the CLOEXEC flag. Return code is useless because either way, the
-        * show must go on.
-        */
-       (void) utils_set_fd_cloexec(sock);
-
-       health_code_update();
-
-       DBG2("Receiving code from consumer err_sock");
-
-       /* Getting status code from kconsumerd */
-       ret = lttcomm_recv_unix_sock(sock, &code,
-                       sizeof(enum lttcomm_return_code));
-       if (ret <= 0) {
-               mark_thread_intialization_as_failed(notifiers);
-               goto error;
-       }
-
-       health_code_update();
-       if (code != LTTCOMM_CONSUMERD_COMMAND_SOCK_READY) {
-               ERR("consumer error when waiting for SOCK_READY : %s",
-                               lttcomm_get_readable_code(-code));
-               mark_thread_intialization_as_failed(notifiers);
-               goto error;
-       }
-
-       /* Connect both command and metadata sockets. */
-       consumer_data->cmd_sock =
-                       lttcomm_connect_unix_sock(
-                               consumer_data->cmd_unix_sock_path);
-       consumer_data->metadata_fd =
-                       lttcomm_connect_unix_sock(
-                               consumer_data->cmd_unix_sock_path);
-       if (consumer_data->cmd_sock < 0 || consumer_data->metadata_fd < 0) {
-               PERROR("consumer connect cmd socket");
-               mark_thread_intialization_as_failed(notifiers);
-               goto error;
-       }
-
-       consumer_data->metadata_sock.fd_ptr = &consumer_data->metadata_fd;
-
-       /* Create metadata socket lock. */
-       consumer_data->metadata_sock.lock = zmalloc(sizeof(pthread_mutex_t));
-       if (consumer_data->metadata_sock.lock == NULL) {
-               PERROR("zmalloc pthread mutex");
-               mark_thread_intialization_as_failed(notifiers);
-               goto error;
-       }
-       pthread_mutex_init(consumer_data->metadata_sock.lock, NULL);
-
-       DBG("Consumer command socket ready (fd: %d)", consumer_data->cmd_sock);
-       DBG("Consumer metadata socket ready (fd: %d)",
-                       consumer_data->metadata_fd);
-
-       /*
-        * Remove the consumerd error sock since we've established a connection.
-        */
-       ret = lttng_poll_del(&events, consumer_data->err_sock);
-       if (ret < 0) {
-               mark_thread_intialization_as_failed(notifiers);
-               goto error;
-       }
-
-       /* Add new accepted error socket. */
-       ret = lttng_poll_add(&events, sock, LPOLLIN | LPOLLRDHUP);
-       if (ret < 0) {
-               mark_thread_intialization_as_failed(notifiers);
-               goto error;
-       }
-
-       /* Add metadata socket that is successfully connected. */
-       ret = lttng_poll_add(&events, consumer_data->metadata_fd,
-                       LPOLLIN | LPOLLRDHUP);
-       if (ret < 0) {
-               mark_thread_intialization_as_failed(notifiers);
-               goto error;
-       }
-
-       health_code_update();
-
-       /*
-        * Transfer the write-end of the channel monitoring pipe to the consumer
-        * by issuing a SET_CHANNEL_MONITOR_PIPE command.
-        */
-       cmd_socket_wrapper = consumer_allocate_socket(&consumer_data->cmd_sock);
-       if (!cmd_socket_wrapper) {
-               mark_thread_intialization_as_failed(notifiers);
-               goto error;
-       }
-       cmd_socket_wrapper->lock = &consumer_data->lock;
-
-       pthread_mutex_lock(cmd_socket_wrapper->lock);
-       ret = consumer_init(cmd_socket_wrapper, the_sessiond_uuid);
-       if (ret) {
-               ERR("Failed to send sessiond uuid to consumer daemon");
-               mark_thread_intialization_as_failed(notifiers);
-               pthread_mutex_unlock(cmd_socket_wrapper->lock);
-               goto error;
-       }
-       pthread_mutex_unlock(cmd_socket_wrapper->lock);
-
-       ret = consumer_send_channel_monitor_pipe(cmd_socket_wrapper,
-                       consumer_data->channel_monitor_pipe);
-       if (ret) {
-               mark_thread_intialization_as_failed(notifiers);
-               goto error;
-       }
-
-       /* Discard the socket wrapper as it is no longer needed. */
-       consumer_destroy_socket(cmd_socket_wrapper);
-       cmd_socket_wrapper = NULL;
-
-       /* The thread is completely initialized, signal that it is ready. */
-       mark_thread_as_ready(notifiers);
-
-       /* Infinite blocking call, waiting for transmission */
-       while (1) {
-               health_code_update();
-
-               /* Exit the thread because the thread quit pipe has been triggered. */
-               if (should_quit) {
-                       /* Not a health error. */
-                       err = 0;
-                       goto exit;
-               }
-
-               health_poll_entry();
-               ret = lttng_poll_wait(&events, -1);
-               health_poll_exit();
-               if (ret < 0) {
-                       goto error;
-               }
-
-               nb_fd = ret;
-
-               for (i = 0; i < nb_fd; i++) {
-                       /* Fetch once the poll data */
-                       revents = LTTNG_POLL_GETEV(&events, i);
-                       pollfd = LTTNG_POLL_GETFD(&events, i);
-
-                       health_code_update();
-
-                       /*
-                        * Thread quit pipe has been triggered, flag that we should stop
-                        * but continue the current loop to handle potential data from
-                        * consumer.
-                        */
-                       if (pollfd == quit_pipe_read_fd) {
-                               should_quit = 1;
-                       } else if (pollfd == sock) {
-                               /* Event on the consumerd socket */
-                               if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)
-                                               && !(revents & LPOLLIN)) {
-                                       ERR("consumer err socket second poll error");
-                                       goto error;
-                               }
-                               health_code_update();
-                               /* Wait for any kconsumerd error */
-                               ret = lttcomm_recv_unix_sock(sock, &code,
-                                               sizeof(enum lttcomm_return_code));
-                               if (ret <= 0) {
-                                       ERR("consumer closed the command socket");
-                                       goto error;
-                               }
-
-                               ERR("consumer return code : %s",
-                                               lttcomm_get_readable_code(-code));
-
-                               goto exit;
-                       } else if (pollfd == consumer_data->metadata_fd) {
-                               if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)
-                                               && !(revents & LPOLLIN)) {
-                                       ERR("consumer err metadata socket second poll error");
-                                       goto error;
-                               }
-                               /* UST metadata requests */
-                               ret = ust_consumer_metadata_request(
-                                               &consumer_data->metadata_sock);
-                               if (ret < 0) {
-                                       ERR("Handling metadata request");
-                                       goto error;
-                               }
-                       }
-                       /* No need for an else branch all FDs are tested prior. */
-               }
-               health_code_update();
-       }
-
-exit:
-error:
-       /*
-        * We lock here because we are about to close the sockets and some other
-        * thread might be using them so get exclusive access which will abort all
-        * other consumer command by other threads.
-        */
-       pthread_mutex_lock(&consumer_data->lock);
-
-       /* Immediately set the consumerd state to stopped */
-       if (consumer_data->type == LTTNG_CONSUMER_KERNEL) {
-               uatomic_set(&the_kernel_consumerd_state, CONSUMER_ERROR);
-       } else if (consumer_data->type == LTTNG_CONSUMER64_UST ||
-                       consumer_data->type == LTTNG_CONSUMER32_UST) {
-               uatomic_set(&the_ust_consumerd_state, CONSUMER_ERROR);
-       } else {
-               /* Code flow error... */
-               abort();
-       }
-
-       if (consumer_data->err_sock >= 0) {
-               ret = close(consumer_data->err_sock);
-               if (ret) {
-                       PERROR("close");
-               }
-               consumer_data->err_sock = -1;
-       }
-       if (consumer_data->cmd_sock >= 0) {
-               ret = close(consumer_data->cmd_sock);
-               if (ret) {
-                       PERROR("close");
-               }
-               consumer_data->cmd_sock = -1;
-       }
-       if (consumer_data->metadata_sock.fd_ptr &&
-           *consumer_data->metadata_sock.fd_ptr >= 0) {
-               ret = close(*consumer_data->metadata_sock.fd_ptr);
-               if (ret) {
-                       PERROR("close");
-               }
-       }
-       if (sock >= 0) {
-               ret = close(sock);
-               if (ret) {
-                       PERROR("close");
-               }
-       }
-
-       unlink(consumer_data->err_unix_sock_path);
-       unlink(consumer_data->cmd_unix_sock_path);
-       pthread_mutex_unlock(&consumer_data->lock);
-
-       /* Cleanup metadata socket mutex. */
-       if (consumer_data->metadata_sock.lock) {
-               pthread_mutex_destroy(consumer_data->metadata_sock.lock);
-               free(consumer_data->metadata_sock.lock);
-       }
-       lttng_poll_clean(&events);
-
-       if (cmd_socket_wrapper) {
-               consumer_destroy_socket(cmd_socket_wrapper);
-       }
-error_poll:
-       if (err) {
-               health_error();
-               ERR("Health error occurred in %s", __func__);
-       }
-       health_unregister(the_health_sessiond);
-       DBG("consumer thread cleanup completed");
-
-       rcu_thread_offline();
-       rcu_unregister_thread();
-
-       return NULL;
-}
-
-static bool shutdown_consumer_management_thread(void *data)
-{
-       struct thread_notifiers *notifiers = data;
-       const int write_fd = lttng_pipe_get_writefd(notifiers->quit_pipe);
-
-       return notify_thread_pipe(write_fd) == 1;
-}
-
-static void cleanup_consumer_management_thread(void *data)
-{
-       struct thread_notifiers *notifiers = data;
-
-       lttng_pipe_destroy(notifiers->quit_pipe);
-       free(notifiers);
-}
-
-bool launch_consumer_management_thread(struct consumer_data *consumer_data)
-{
-       struct lttng_pipe *quit_pipe;
-       struct thread_notifiers *notifiers = NULL;
-       struct lttng_thread *thread;
-
-       notifiers = zmalloc(sizeof(*notifiers));
-       if (!notifiers) {
-               goto error_alloc;
-       }
-
-       quit_pipe = lttng_pipe_open(FD_CLOEXEC);
-       if (!quit_pipe) {
-               goto error;
-       }
-       notifiers->quit_pipe = quit_pipe;
-       notifiers->consumer_data = consumer_data;
-       sem_init(&notifiers->ready, 0, 0);
-
-       thread = lttng_thread_create("Consumer management",
-                       thread_consumer_management,
-                       shutdown_consumer_management_thread,
-                       cleanup_consumer_management_thread,
-                       notifiers);
-       if (!thread) {
-               goto error;
-       }
-       wait_until_thread_is_ready(notifiers);
-       lttng_thread_put(thread);
-       if (notifiers->initialization_result) {
-               return false;
-       }
-       return true;
-error:
-       cleanup_consumer_management_thread(notifiers);
-error_alloc:
-       return false;
-}
This page took 0.028432 seconds and 4 git commands to generate.