X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fmanage-consumer.cpp;h=e1d67f5bb0e3126cf9fc0ea61a3fb8931e56fa82;hb=e0252788784c4c7392e5105aed0eaf745798482e;hp=a3bb816f9459457ab0a1a74379bab796283a228b;hpb=f149493493fbd8a3efa4748832c03278c96c38ca;p=lttng-tools.git diff --git a/src/bin/lttng-sessiond/manage-consumer.cpp b/src/bin/lttng-sessiond/manage-consumer.cpp index a3bb816f9..e1d67f5bb 100644 --- a/src/bin/lttng-sessiond/manage-consumer.cpp +++ b/src/bin/lttng-sessiond/manage-consumer.cpp @@ -7,17 +7,17 @@ * */ -#include - -#include -#include - +#include "health-sessiond.hpp" #include "manage-consumer.hpp" #include "testpoint.hpp" -#include "health-sessiond.hpp" -#include "utils.hpp" #include "thread.hpp" #include "ust-consumer.hpp" +#include "utils.hpp" + +#include +#include + +#include namespace { struct thread_notifiers { @@ -35,8 +35,7 @@ static void mark_thread_as_ready(struct thread_notifiers *notifiers) sem_post(¬ifiers->ready); } -static void mark_thread_intialization_as_failed( - struct thread_notifiers *notifiers) +static void mark_thread_intialization_as_failed(struct thread_notifiers *notifiers) { ERR("Consumer management thread entering error state"); notifiers->initialization_result = -1; @@ -55,14 +54,14 @@ static void wait_until_thread_is_ready(struct thread_notifiers *notifiers) */ static void *thread_consumer_management(void *data) { - int sock = -1, i, ret, pollfd, err = -1, should_quit = 0; - uint32_t revents, nb_fd; + int sock = -1, i, ret, err = -1, should_quit = 0; + uint32_t nb_fd; enum lttcomm_return_code code; struct lttng_poll_event events; struct thread_notifiers *notifiers = (thread_notifiers *) data; struct consumer_data *consumer_data = notifiers->consumer_data; - const int quit_pipe_read_fd = lttng_pipe_get_readfd(notifiers->quit_pipe); - struct consumer_socket *cmd_socket_wrapper = NULL; + const auto thread_quit_pipe_fd = lttng_pipe_get_readfd(notifiers->quit_pipe); + struct consumer_socket *cmd_socket_wrapper = nullptr; DBG("[thread] Manage consumer started"); @@ -83,7 +82,7 @@ static void *thread_consumer_management(void *data) goto error_poll; } - ret = lttng_poll_add(&events, quit_pipe_read_fd, LPOLLIN | LPOLLERR); + ret = lttng_poll_add(&events, thread_quit_pipe_fd, LPOLLIN); if (ret < 0) { mark_thread_intialization_as_failed(notifiers); goto error; @@ -121,13 +120,14 @@ static void *thread_consumer_management(void *data) for (i = 0; i < nb_fd; i++) { /* Fetch once the poll data */ - revents = LTTNG_POLL_GETEV(&events, i); - pollfd = LTTNG_POLL_GETFD(&events, i); + const auto revents = LTTNG_POLL_GETEV(&events, i); + const auto pollfd = LTTNG_POLL_GETFD(&events, i); health_code_update(); - /* Thread quit pipe has been closed. Killing thread. */ - if (pollfd == quit_pipe_read_fd) { + /* Activity on thread quit pipe, exiting. */ + if (pollfd == thread_quit_pipe_fd) { + DBG("Activity on thread quit pipe"); err = 0; mark_thread_intialization_as_failed(notifiers); goto exit; @@ -164,8 +164,7 @@ static void *thread_consumer_management(void *data) DBG2("Receiving code from consumer err_sock"); /* Getting status code from kconsumerd */ - ret = lttcomm_recv_unix_sock(sock, &code, - sizeof(enum lttcomm_return_code)); + ret = lttcomm_recv_unix_sock(sock, &code, sizeof(enum lttcomm_return_code)); if (ret <= 0) { mark_thread_intialization_as_failed(notifiers); goto error; @@ -174,18 +173,14 @@ static void *thread_consumer_management(void *data) health_code_update(); if (code != LTTCOMM_CONSUMERD_COMMAND_SOCK_READY) { ERR("consumer error when waiting for SOCK_READY : %s", - lttcomm_get_readable_code((lttcomm_return_code) -code)); + lttcomm_get_readable_code((lttcomm_return_code) -code)); mark_thread_intialization_as_failed(notifiers); goto error; } /* Connect both command and metadata sockets. */ - consumer_data->cmd_sock = - lttcomm_connect_unix_sock( - consumer_data->cmd_unix_sock_path); - consumer_data->metadata_fd = - lttcomm_connect_unix_sock( - consumer_data->cmd_unix_sock_path); + consumer_data->cmd_sock = lttcomm_connect_unix_sock(consumer_data->cmd_unix_sock_path); + consumer_data->metadata_fd = lttcomm_connect_unix_sock(consumer_data->cmd_unix_sock_path); if (consumer_data->cmd_sock < 0 || consumer_data->metadata_fd < 0) { PERROR("consumer connect cmd socket"); mark_thread_intialization_as_failed(notifiers); @@ -196,16 +191,15 @@ static void *thread_consumer_management(void *data) /* Create metadata socket lock. */ consumer_data->metadata_sock.lock = zmalloc(); - if (consumer_data->metadata_sock.lock == NULL) { + if (consumer_data->metadata_sock.lock == nullptr) { PERROR("zmalloc pthread mutex"); mark_thread_intialization_as_failed(notifiers); goto error; } - pthread_mutex_init(consumer_data->metadata_sock.lock, NULL); + pthread_mutex_init(consumer_data->metadata_sock.lock, nullptr); DBG("Consumer command socket ready (fd: %d)", consumer_data->cmd_sock); - DBG("Consumer metadata socket ready (fd: %d)", - consumer_data->metadata_fd); + DBG("Consumer metadata socket ready (fd: %d)", consumer_data->metadata_fd); /* * Remove the consumerd error sock since we've established a connection. @@ -224,8 +218,7 @@ static void *thread_consumer_management(void *data) } /* Add metadata socket that is successfully connected. */ - ret = lttng_poll_add(&events, consumer_data->metadata_fd, - LPOLLIN | LPOLLRDHUP); + ret = lttng_poll_add(&events, consumer_data->metadata_fd, LPOLLIN | LPOLLRDHUP); if (ret < 0) { mark_thread_intialization_as_failed(notifiers); goto error; @@ -255,7 +248,7 @@ static void *thread_consumer_management(void *data) pthread_mutex_unlock(cmd_socket_wrapper->lock); ret = consumer_send_channel_monitor_pipe(cmd_socket_wrapper, - consumer_data->channel_monitor_pipe); + consumer_data->channel_monitor_pipe); if (ret) { mark_thread_intialization_as_failed(notifiers); goto error; @@ -263,13 +256,13 @@ static void *thread_consumer_management(void *data) /* Discard the socket wrapper as it is no longer needed. */ consumer_destroy_socket(cmd_socket_wrapper); - cmd_socket_wrapper = NULL; + cmd_socket_wrapper = nullptr; /* The thread is completely initialized, signal that it is ready. */ mark_thread_as_ready(notifiers); /* Infinite blocking call, waiting for transmission */ - while (1) { + while (true) { health_code_update(); /* Exit the thread because the thread quit pipe has been triggered. */ @@ -290,8 +283,8 @@ static void *thread_consumer_management(void *data) for (i = 0; i < nb_fd; i++) { /* Fetch once the poll data */ - revents = LTTNG_POLL_GETEV(&events, i); - pollfd = LTTNG_POLL_GETFD(&events, i); + const auto revents = LTTNG_POLL_GETEV(&events, i); + const auto pollfd = LTTNG_POLL_GETFD(&events, i); health_code_update(); @@ -300,37 +293,36 @@ static void *thread_consumer_management(void *data) * but continue the current loop to handle potential data from * consumer. */ - if (pollfd == quit_pipe_read_fd) { + if (pollfd == thread_quit_pipe_fd) { should_quit = 1; } else if (pollfd == sock) { /* Event on the consumerd socket */ - if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP) - && !(revents & LPOLLIN)) { + if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP) && + !(revents & LPOLLIN)) { ERR("consumer err socket second poll error"); goto error; } health_code_update(); /* Wait for any kconsumerd error */ - ret = lttcomm_recv_unix_sock(sock, &code, - sizeof(enum lttcomm_return_code)); + ret = lttcomm_recv_unix_sock( + sock, &code, sizeof(enum lttcomm_return_code)); if (ret <= 0) { ERR("consumer closed the command socket"); goto error; } ERR("consumer return code : %s", - lttcomm_get_readable_code((lttcomm_return_code) -code)); + lttcomm_get_readable_code((lttcomm_return_code) -code)); goto exit; } else if (pollfd == consumer_data->metadata_fd) { - if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP) - && !(revents & LPOLLIN)) { + if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP) && + !(revents & LPOLLIN)) { ERR("consumer err metadata socket second poll error"); goto error; } /* UST metadata requests */ - ret = ust_consumer_metadata_request( - &consumer_data->metadata_sock); + ret = ust_consumer_metadata_request(&consumer_data->metadata_sock); if (ret < 0) { ERR("Handling metadata request"); goto error; @@ -354,7 +346,7 @@ error: if (consumer_data->type == LTTNG_CONSUMER_KERNEL) { uatomic_set(&the_kernel_consumerd_state, CONSUMER_ERROR); } else if (consumer_data->type == LTTNG_CONSUMER64_UST || - consumer_data->type == LTTNG_CONSUMER32_UST) { + consumer_data->type == LTTNG_CONSUMER32_UST) { uatomic_set(&the_ust_consumerd_state, CONSUMER_ERROR); } else { /* Code flow error... */ @@ -375,8 +367,7 @@ error: } consumer_data->cmd_sock = -1; } - if (consumer_data->metadata_sock.fd_ptr && - *consumer_data->metadata_sock.fd_ptr >= 0) { + if (consumer_data->metadata_sock.fd_ptr && *consumer_data->metadata_sock.fd_ptr >= 0) { ret = close(*consumer_data->metadata_sock.fd_ptr); if (ret) { PERROR("close"); @@ -414,7 +405,7 @@ error_poll: rcu_thread_offline(); rcu_unregister_thread(); - return NULL; + return nullptr; } static bool shutdown_consumer_management_thread(void *data) @@ -436,7 +427,7 @@ static void cleanup_consumer_management_thread(void *data) bool launch_consumer_management_thread(struct consumer_data *consumer_data) { struct lttng_pipe *quit_pipe; - struct thread_notifiers *notifiers = NULL; + struct thread_notifiers *notifiers = nullptr; struct lttng_thread *thread; notifiers = zmalloc(); @@ -453,19 +444,16 @@ bool launch_consumer_management_thread(struct consumer_data *consumer_data) sem_init(¬ifiers->ready, 0, 0); thread = lttng_thread_create("Consumer management", - thread_consumer_management, - shutdown_consumer_management_thread, - cleanup_consumer_management_thread, - notifiers); + thread_consumer_management, + shutdown_consumer_management_thread, + cleanup_consumer_management_thread, + notifiers); if (!thread) { goto error; } wait_until_thread_is_ready(notifiers); lttng_thread_put(thread); - if (notifiers->initialization_result) { - return false; - } - return true; + return notifiers->initialization_result == 0; error: cleanup_consumer_management_thread(notifiers); error_alloc: