X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fmanage-consumer.cpp;h=702bd5050cb884972ffff0dc1642ae24c96957bc;hb=cd9adb8b829564212158943a0d279bb35322ab30;hp=cce48526e681309de3cb17bc0ea111b83abcba54;hpb=21cf9b6b1843774306a76f4dccddddd706b64f79;p=lttng-tools.git diff --git a/src/bin/lttng-sessiond/manage-consumer.cpp b/src/bin/lttng-sessiond/manage-consumer.cpp index cce48526e..702bd5050 100644 --- a/src/bin/lttng-sessiond/manage-consumer.cpp +++ b/src/bin/lttng-sessiond/manage-consumer.cpp @@ -7,24 +7,26 @@ * */ -#include +#include "health-sessiond.hpp" +#include "manage-consumer.hpp" +#include "testpoint.hpp" +#include "thread.hpp" +#include "ust-consumer.hpp" +#include "utils.hpp" -#include -#include +#include +#include -#include "manage-consumer.h" -#include "testpoint.h" -#include "health-sessiond.h" -#include "utils.h" -#include "thread.h" -#include "ust-consumer.h" +#include +namespace { struct thread_notifiers { struct lttng_pipe *quit_pipe; struct consumer_data *consumer_data; sem_t ready; int initialization_result; }; +} /* namespace */ static void mark_thread_as_ready(struct thread_notifiers *notifiers) { @@ -33,8 +35,7 @@ static void mark_thread_as_ready(struct thread_notifiers *notifiers) sem_post(¬ifiers->ready); } -static void mark_thread_intialization_as_failed( - struct thread_notifiers *notifiers) +static void mark_thread_intialization_as_failed(struct thread_notifiers *notifiers) { ERR("Consumer management thread entering error state"); notifiers->initialization_result = -1; @@ -53,14 +54,14 @@ static void wait_until_thread_is_ready(struct thread_notifiers *notifiers) */ static void *thread_consumer_management(void *data) { - int sock = -1, i, ret, pollfd, err = -1, should_quit = 0; - uint32_t revents, nb_fd; + int sock = -1, i, ret, err = -1, should_quit = 0; + uint32_t nb_fd; enum lttcomm_return_code code; struct lttng_poll_event events; struct thread_notifiers *notifiers = (thread_notifiers *) data; struct consumer_data *consumer_data = notifiers->consumer_data; - const int quit_pipe_read_fd = lttng_pipe_get_readfd(notifiers->quit_pipe); - struct consumer_socket *cmd_socket_wrapper = NULL; + const auto thread_quit_pipe_fd = lttng_pipe_get_readfd(notifiers->quit_pipe); + struct consumer_socket *cmd_socket_wrapper = nullptr; DBG("[thread] Manage consumer started"); @@ -81,7 +82,7 @@ static void *thread_consumer_management(void *data) goto error_poll; } - ret = lttng_poll_add(&events, quit_pipe_read_fd, LPOLLIN | LPOLLERR); + ret = lttng_poll_add(&events, thread_quit_pipe_fd, LPOLLIN); if (ret < 0) { mark_thread_intialization_as_failed(notifiers); goto error; @@ -119,13 +120,14 @@ static void *thread_consumer_management(void *data) for (i = 0; i < nb_fd; i++) { /* Fetch once the poll data */ - revents = LTTNG_POLL_GETEV(&events, i); - pollfd = LTTNG_POLL_GETFD(&events, i); + const auto revents = LTTNG_POLL_GETEV(&events, i); + const auto pollfd = LTTNG_POLL_GETFD(&events, i); health_code_update(); - /* Thread quit pipe has been closed. Killing thread. */ - if (pollfd == quit_pipe_read_fd) { + /* Activity on thread quit pipe, exiting. */ + if (pollfd == thread_quit_pipe_fd) { + DBG("Activity on thread quit pipe"); err = 0; mark_thread_intialization_as_failed(notifiers); goto exit; @@ -162,8 +164,7 @@ static void *thread_consumer_management(void *data) DBG2("Receiving code from consumer err_sock"); /* Getting status code from kconsumerd */ - ret = lttcomm_recv_unix_sock(sock, &code, - sizeof(enum lttcomm_return_code)); + ret = lttcomm_recv_unix_sock(sock, &code, sizeof(enum lttcomm_return_code)); if (ret <= 0) { mark_thread_intialization_as_failed(notifiers); goto error; @@ -172,18 +173,14 @@ static void *thread_consumer_management(void *data) health_code_update(); if (code != LTTCOMM_CONSUMERD_COMMAND_SOCK_READY) { ERR("consumer error when waiting for SOCK_READY : %s", - lttcomm_get_readable_code((lttcomm_return_code) -code)); + lttcomm_get_readable_code((lttcomm_return_code) -code)); mark_thread_intialization_as_failed(notifiers); goto error; } /* Connect both command and metadata sockets. */ - consumer_data->cmd_sock = - lttcomm_connect_unix_sock( - consumer_data->cmd_unix_sock_path); - consumer_data->metadata_fd = - lttcomm_connect_unix_sock( - consumer_data->cmd_unix_sock_path); + consumer_data->cmd_sock = lttcomm_connect_unix_sock(consumer_data->cmd_unix_sock_path); + consumer_data->metadata_fd = lttcomm_connect_unix_sock(consumer_data->cmd_unix_sock_path); if (consumer_data->cmd_sock < 0 || consumer_data->metadata_fd < 0) { PERROR("consumer connect cmd socket"); mark_thread_intialization_as_failed(notifiers); @@ -193,17 +190,16 @@ static void *thread_consumer_management(void *data) consumer_data->metadata_sock.fd_ptr = &consumer_data->metadata_fd; /* Create metadata socket lock. */ - consumer_data->metadata_sock.lock = (pthread_mutex_t *) zmalloc(sizeof(pthread_mutex_t)); - if (consumer_data->metadata_sock.lock == NULL) { + consumer_data->metadata_sock.lock = zmalloc(); + if (consumer_data->metadata_sock.lock == nullptr) { PERROR("zmalloc pthread mutex"); mark_thread_intialization_as_failed(notifiers); goto error; } - pthread_mutex_init(consumer_data->metadata_sock.lock, NULL); + pthread_mutex_init(consumer_data->metadata_sock.lock, nullptr); DBG("Consumer command socket ready (fd: %d)", consumer_data->cmd_sock); - DBG("Consumer metadata socket ready (fd: %d)", - consumer_data->metadata_fd); + DBG("Consumer metadata socket ready (fd: %d)", consumer_data->metadata_fd); /* * Remove the consumerd error sock since we've established a connection. @@ -222,8 +218,7 @@ static void *thread_consumer_management(void *data) } /* Add metadata socket that is successfully connected. */ - ret = lttng_poll_add(&events, consumer_data->metadata_fd, - LPOLLIN | LPOLLRDHUP); + ret = lttng_poll_add(&events, consumer_data->metadata_fd, LPOLLIN | LPOLLRDHUP); if (ret < 0) { mark_thread_intialization_as_failed(notifiers); goto error; @@ -253,7 +248,7 @@ static void *thread_consumer_management(void *data) pthread_mutex_unlock(cmd_socket_wrapper->lock); ret = consumer_send_channel_monitor_pipe(cmd_socket_wrapper, - consumer_data->channel_monitor_pipe); + consumer_data->channel_monitor_pipe); if (ret) { mark_thread_intialization_as_failed(notifiers); goto error; @@ -261,13 +256,13 @@ static void *thread_consumer_management(void *data) /* Discard the socket wrapper as it is no longer needed. */ consumer_destroy_socket(cmd_socket_wrapper); - cmd_socket_wrapper = NULL; + cmd_socket_wrapper = nullptr; /* The thread is completely initialized, signal that it is ready. */ mark_thread_as_ready(notifiers); /* Infinite blocking call, waiting for transmission */ - while (1) { + while (true) { health_code_update(); /* Exit the thread because the thread quit pipe has been triggered. */ @@ -288,8 +283,8 @@ static void *thread_consumer_management(void *data) for (i = 0; i < nb_fd; i++) { /* Fetch once the poll data */ - revents = LTTNG_POLL_GETEV(&events, i); - pollfd = LTTNG_POLL_GETFD(&events, i); + const auto revents = LTTNG_POLL_GETEV(&events, i); + const auto pollfd = LTTNG_POLL_GETFD(&events, i); health_code_update(); @@ -298,37 +293,36 @@ static void *thread_consumer_management(void *data) * but continue the current loop to handle potential data from * consumer. */ - if (pollfd == quit_pipe_read_fd) { + if (pollfd == thread_quit_pipe_fd) { should_quit = 1; } else if (pollfd == sock) { /* Event on the consumerd socket */ - if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP) - && !(revents & LPOLLIN)) { + if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP) && + !(revents & LPOLLIN)) { ERR("consumer err socket second poll error"); goto error; } health_code_update(); /* Wait for any kconsumerd error */ - ret = lttcomm_recv_unix_sock(sock, &code, - sizeof(enum lttcomm_return_code)); + ret = lttcomm_recv_unix_sock( + sock, &code, sizeof(enum lttcomm_return_code)); if (ret <= 0) { ERR("consumer closed the command socket"); goto error; } ERR("consumer return code : %s", - lttcomm_get_readable_code((lttcomm_return_code) -code)); + lttcomm_get_readable_code((lttcomm_return_code) -code)); goto exit; } else if (pollfd == consumer_data->metadata_fd) { - if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP) - && !(revents & LPOLLIN)) { + if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP) && + !(revents & LPOLLIN)) { ERR("consumer err metadata socket second poll error"); goto error; } /* UST metadata requests */ - ret = ust_consumer_metadata_request( - &consumer_data->metadata_sock); + ret = ust_consumer_metadata_request(&consumer_data->metadata_sock); if (ret < 0) { ERR("Handling metadata request"); goto error; @@ -352,7 +346,7 @@ error: if (consumer_data->type == LTTNG_CONSUMER_KERNEL) { uatomic_set(&the_kernel_consumerd_state, CONSUMER_ERROR); } else if (consumer_data->type == LTTNG_CONSUMER64_UST || - consumer_data->type == LTTNG_CONSUMER32_UST) { + consumer_data->type == LTTNG_CONSUMER32_UST) { uatomic_set(&the_ust_consumerd_state, CONSUMER_ERROR); } else { /* Code flow error... */ @@ -373,8 +367,7 @@ error: } consumer_data->cmd_sock = -1; } - if (consumer_data->metadata_sock.fd_ptr && - *consumer_data->metadata_sock.fd_ptr >= 0) { + if (consumer_data->metadata_sock.fd_ptr && *consumer_data->metadata_sock.fd_ptr >= 0) { ret = close(*consumer_data->metadata_sock.fd_ptr); if (ret) { PERROR("close"); @@ -412,7 +405,7 @@ error_poll: rcu_thread_offline(); rcu_unregister_thread(); - return NULL; + return nullptr; } static bool shutdown_consumer_management_thread(void *data) @@ -434,10 +427,10 @@ static void cleanup_consumer_management_thread(void *data) bool launch_consumer_management_thread(struct consumer_data *consumer_data) { struct lttng_pipe *quit_pipe; - struct thread_notifiers *notifiers = NULL; + struct thread_notifiers *notifiers = nullptr; struct lttng_thread *thread; - notifiers = (thread_notifiers *) zmalloc(sizeof(*notifiers)); + notifiers = zmalloc(); if (!notifiers) { goto error_alloc; } @@ -451,10 +444,10 @@ bool launch_consumer_management_thread(struct consumer_data *consumer_data) sem_init(¬ifiers->ready, 0, 0); thread = lttng_thread_create("Consumer management", - thread_consumer_management, - shutdown_consumer_management_thread, - cleanup_consumer_management_thread, - notifiers); + thread_consumer_management, + shutdown_consumer_management_thread, + cleanup_consumer_management_thread, + notifiers); if (!thread) { goto error; }