X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fmanage-consumer.cpp;h=702bd5050cb884972ffff0dc1642ae24c96957bc;hb=cd9adb8b829564212158943a0d279bb35322ab30;hp=d9d1e670ec87e2da1f356c4dc2930db6005a1c83;hpb=8a00688e1d58cc5a2e77eba206ff23bd6105130c;p=lttng-tools.git diff --git a/src/bin/lttng-sessiond/manage-consumer.cpp b/src/bin/lttng-sessiond/manage-consumer.cpp index d9d1e670e..702bd5050 100644 --- a/src/bin/lttng-sessiond/manage-consumer.cpp +++ b/src/bin/lttng-sessiond/manage-consumer.cpp @@ -7,17 +7,17 @@ * */ -#include - -#include -#include - +#include "health-sessiond.hpp" #include "manage-consumer.hpp" #include "testpoint.hpp" -#include "health-sessiond.hpp" -#include "utils.hpp" #include "thread.hpp" #include "ust-consumer.hpp" +#include "utils.hpp" + +#include +#include + +#include namespace { struct thread_notifiers { @@ -35,8 +35,7 @@ static void mark_thread_as_ready(struct thread_notifiers *notifiers) sem_post(¬ifiers->ready); } -static void mark_thread_intialization_as_failed( - struct thread_notifiers *notifiers) +static void mark_thread_intialization_as_failed(struct thread_notifiers *notifiers) { ERR("Consumer management thread entering error state"); notifiers->initialization_result = -1; @@ -62,7 +61,7 @@ static void *thread_consumer_management(void *data) struct thread_notifiers *notifiers = (thread_notifiers *) data; struct consumer_data *consumer_data = notifiers->consumer_data; const auto thread_quit_pipe_fd = lttng_pipe_get_readfd(notifiers->quit_pipe); - struct consumer_socket *cmd_socket_wrapper = NULL; + struct consumer_socket *cmd_socket_wrapper = nullptr; DBG("[thread] Manage consumer started"); @@ -83,7 +82,7 @@ static void *thread_consumer_management(void *data) goto error_poll; } - ret = lttng_poll_add(&events, thread_quit_pipe_fd, LPOLLIN | LPOLLERR); + ret = lttng_poll_add(&events, thread_quit_pipe_fd, LPOLLIN); if (ret < 0) { mark_thread_intialization_as_failed(notifiers); goto error; @@ -165,8 +164,7 @@ static void *thread_consumer_management(void *data) DBG2("Receiving code from consumer err_sock"); /* Getting status code from kconsumerd */ - ret = lttcomm_recv_unix_sock(sock, &code, - sizeof(enum lttcomm_return_code)); + ret = lttcomm_recv_unix_sock(sock, &code, sizeof(enum lttcomm_return_code)); if (ret <= 0) { mark_thread_intialization_as_failed(notifiers); goto error; @@ -175,18 +173,14 @@ static void *thread_consumer_management(void *data) health_code_update(); if (code != LTTCOMM_CONSUMERD_COMMAND_SOCK_READY) { ERR("consumer error when waiting for SOCK_READY : %s", - lttcomm_get_readable_code((lttcomm_return_code) -code)); + lttcomm_get_readable_code((lttcomm_return_code) -code)); mark_thread_intialization_as_failed(notifiers); goto error; } /* Connect both command and metadata sockets. */ - consumer_data->cmd_sock = - lttcomm_connect_unix_sock( - consumer_data->cmd_unix_sock_path); - consumer_data->metadata_fd = - lttcomm_connect_unix_sock( - consumer_data->cmd_unix_sock_path); + consumer_data->cmd_sock = lttcomm_connect_unix_sock(consumer_data->cmd_unix_sock_path); + consumer_data->metadata_fd = lttcomm_connect_unix_sock(consumer_data->cmd_unix_sock_path); if (consumer_data->cmd_sock < 0 || consumer_data->metadata_fd < 0) { PERROR("consumer connect cmd socket"); mark_thread_intialization_as_failed(notifiers); @@ -197,16 +191,15 @@ static void *thread_consumer_management(void *data) /* Create metadata socket lock. */ consumer_data->metadata_sock.lock = zmalloc(); - if (consumer_data->metadata_sock.lock == NULL) { + if (consumer_data->metadata_sock.lock == nullptr) { PERROR("zmalloc pthread mutex"); mark_thread_intialization_as_failed(notifiers); goto error; } - pthread_mutex_init(consumer_data->metadata_sock.lock, NULL); + pthread_mutex_init(consumer_data->metadata_sock.lock, nullptr); DBG("Consumer command socket ready (fd: %d)", consumer_data->cmd_sock); - DBG("Consumer metadata socket ready (fd: %d)", - consumer_data->metadata_fd); + DBG("Consumer metadata socket ready (fd: %d)", consumer_data->metadata_fd); /* * Remove the consumerd error sock since we've established a connection. @@ -225,8 +218,7 @@ static void *thread_consumer_management(void *data) } /* Add metadata socket that is successfully connected. */ - ret = lttng_poll_add(&events, consumer_data->metadata_fd, - LPOLLIN | LPOLLRDHUP); + ret = lttng_poll_add(&events, consumer_data->metadata_fd, LPOLLIN | LPOLLRDHUP); if (ret < 0) { mark_thread_intialization_as_failed(notifiers); goto error; @@ -256,7 +248,7 @@ static void *thread_consumer_management(void *data) pthread_mutex_unlock(cmd_socket_wrapper->lock); ret = consumer_send_channel_monitor_pipe(cmd_socket_wrapper, - consumer_data->channel_monitor_pipe); + consumer_data->channel_monitor_pipe); if (ret) { mark_thread_intialization_as_failed(notifiers); goto error; @@ -264,13 +256,13 @@ static void *thread_consumer_management(void *data) /* Discard the socket wrapper as it is no longer needed. */ consumer_destroy_socket(cmd_socket_wrapper); - cmd_socket_wrapper = NULL; + cmd_socket_wrapper = nullptr; /* The thread is completely initialized, signal that it is ready. */ mark_thread_as_ready(notifiers); /* Infinite blocking call, waiting for transmission */ - while (1) { + while (true) { health_code_update(); /* Exit the thread because the thread quit pipe has been triggered. */ @@ -305,33 +297,32 @@ static void *thread_consumer_management(void *data) should_quit = 1; } else if (pollfd == sock) { /* Event on the consumerd socket */ - if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP) - && !(revents & LPOLLIN)) { + if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP) && + !(revents & LPOLLIN)) { ERR("consumer err socket second poll error"); goto error; } health_code_update(); /* Wait for any kconsumerd error */ - ret = lttcomm_recv_unix_sock(sock, &code, - sizeof(enum lttcomm_return_code)); + ret = lttcomm_recv_unix_sock( + sock, &code, sizeof(enum lttcomm_return_code)); if (ret <= 0) { ERR("consumer closed the command socket"); goto error; } ERR("consumer return code : %s", - lttcomm_get_readable_code((lttcomm_return_code) -code)); + lttcomm_get_readable_code((lttcomm_return_code) -code)); goto exit; } else if (pollfd == consumer_data->metadata_fd) { - if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP) - && !(revents & LPOLLIN)) { + if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP) && + !(revents & LPOLLIN)) { ERR("consumer err metadata socket second poll error"); goto error; } /* UST metadata requests */ - ret = ust_consumer_metadata_request( - &consumer_data->metadata_sock); + ret = ust_consumer_metadata_request(&consumer_data->metadata_sock); if (ret < 0) { ERR("Handling metadata request"); goto error; @@ -355,7 +346,7 @@ error: if (consumer_data->type == LTTNG_CONSUMER_KERNEL) { uatomic_set(&the_kernel_consumerd_state, CONSUMER_ERROR); } else if (consumer_data->type == LTTNG_CONSUMER64_UST || - consumer_data->type == LTTNG_CONSUMER32_UST) { + consumer_data->type == LTTNG_CONSUMER32_UST) { uatomic_set(&the_ust_consumerd_state, CONSUMER_ERROR); } else { /* Code flow error... */ @@ -376,8 +367,7 @@ error: } consumer_data->cmd_sock = -1; } - if (consumer_data->metadata_sock.fd_ptr && - *consumer_data->metadata_sock.fd_ptr >= 0) { + if (consumer_data->metadata_sock.fd_ptr && *consumer_data->metadata_sock.fd_ptr >= 0) { ret = close(*consumer_data->metadata_sock.fd_ptr); if (ret) { PERROR("close"); @@ -415,7 +405,7 @@ error_poll: rcu_thread_offline(); rcu_unregister_thread(); - return NULL; + return nullptr; } static bool shutdown_consumer_management_thread(void *data) @@ -437,7 +427,7 @@ static void cleanup_consumer_management_thread(void *data) bool launch_consumer_management_thread(struct consumer_data *consumer_data) { struct lttng_pipe *quit_pipe; - struct thread_notifiers *notifiers = NULL; + struct thread_notifiers *notifiers = nullptr; struct lttng_thread *thread; notifiers = zmalloc(); @@ -454,10 +444,10 @@ bool launch_consumer_management_thread(struct consumer_data *consumer_data) sem_init(¬ifiers->ready, 0, 0); thread = lttng_thread_create("Consumer management", - thread_consumer_management, - shutdown_consumer_management_thread, - cleanup_consumer_management_thread, - notifiers); + thread_consumer_management, + shutdown_consumer_management_thread, + cleanup_consumer_management_thread, + notifiers); if (!thread) { goto error; }