*
*/
-#include <signal.h>
-
-#include <common/pipe.hpp>
-#include <common/utils.hpp>
-
+#include "health-sessiond.hpp"
#include "manage-consumer.hpp"
#include "testpoint.hpp"
-#include "health-sessiond.hpp"
-#include "utils.hpp"
#include "thread.hpp"
#include "ust-consumer.hpp"
+#include "utils.hpp"
+
+#include <common/pipe.hpp>
+#include <common/utils.hpp>
+
+#include <fcntl.h>
+#include <signal.h>
namespace {
struct thread_notifiers {
sem_post(¬ifiers->ready);
}
-static void mark_thread_intialization_as_failed(
- struct thread_notifiers *notifiers)
+static void mark_thread_intialization_as_failed(struct thread_notifiers *notifiers)
{
ERR("Consumer management thread entering error state");
notifiers->initialization_result = -1;
*/
static void *thread_consumer_management(void *data)
{
- int sock = -1, i, ret, pollfd, err = -1, should_quit = 0;
- uint32_t revents, nb_fd;
+ int sock = -1, i, ret, err = -1, should_quit = 0;
+ uint32_t nb_fd;
enum lttcomm_return_code code;
struct lttng_poll_event events;
struct thread_notifiers *notifiers = (thread_notifiers *) data;
struct consumer_data *consumer_data = notifiers->consumer_data;
- const int quit_pipe_read_fd = lttng_pipe_get_readfd(notifiers->quit_pipe);
- struct consumer_socket *cmd_socket_wrapper = NULL;
+ const auto thread_quit_pipe_fd = lttng_pipe_get_readfd(notifiers->quit_pipe);
+ struct consumer_socket *cmd_socket_wrapper = nullptr;
DBG("[thread] Manage consumer started");
goto error_poll;
}
- ret = lttng_poll_add(&events, quit_pipe_read_fd, LPOLLIN | LPOLLERR);
+ ret = lttng_poll_add(&events, thread_quit_pipe_fd, LPOLLIN);
if (ret < 0) {
mark_thread_intialization_as_failed(notifiers);
goto error;
for (i = 0; i < nb_fd; i++) {
/* Fetch once the poll data */
- revents = LTTNG_POLL_GETEV(&events, i);
- pollfd = LTTNG_POLL_GETFD(&events, i);
+ const auto revents = LTTNG_POLL_GETEV(&events, i);
+ const auto pollfd = LTTNG_POLL_GETFD(&events, i);
health_code_update();
- /* Thread quit pipe has been closed. Killing thread. */
- if (pollfd == quit_pipe_read_fd) {
+ /* Activity on thread quit pipe, exiting. */
+ if (pollfd == thread_quit_pipe_fd) {
+ DBG("Activity on thread quit pipe");
err = 0;
mark_thread_intialization_as_failed(notifiers);
goto exit;
DBG2("Receiving code from consumer err_sock");
/* Getting status code from kconsumerd */
- ret = lttcomm_recv_unix_sock(sock, &code,
- sizeof(enum lttcomm_return_code));
+ ret = lttcomm_recv_unix_sock(sock, &code, sizeof(enum lttcomm_return_code));
if (ret <= 0) {
mark_thread_intialization_as_failed(notifiers);
goto error;
health_code_update();
if (code != LTTCOMM_CONSUMERD_COMMAND_SOCK_READY) {
ERR("consumer error when waiting for SOCK_READY : %s",
- lttcomm_get_readable_code((lttcomm_return_code) -code));
+ lttcomm_get_readable_code((lttcomm_return_code) -code));
mark_thread_intialization_as_failed(notifiers);
goto error;
}
/* Connect both command and metadata sockets. */
- consumer_data->cmd_sock =
- lttcomm_connect_unix_sock(
- consumer_data->cmd_unix_sock_path);
- consumer_data->metadata_fd =
- lttcomm_connect_unix_sock(
- consumer_data->cmd_unix_sock_path);
+ consumer_data->cmd_sock = lttcomm_connect_unix_sock(consumer_data->cmd_unix_sock_path);
+ consumer_data->metadata_fd = lttcomm_connect_unix_sock(consumer_data->cmd_unix_sock_path);
if (consumer_data->cmd_sock < 0 || consumer_data->metadata_fd < 0) {
PERROR("consumer connect cmd socket");
mark_thread_intialization_as_failed(notifiers);
/* Create metadata socket lock. */
consumer_data->metadata_sock.lock = zmalloc<pthread_mutex_t>();
- if (consumer_data->metadata_sock.lock == NULL) {
+ if (consumer_data->metadata_sock.lock == nullptr) {
PERROR("zmalloc pthread mutex");
mark_thread_intialization_as_failed(notifiers);
goto error;
}
- pthread_mutex_init(consumer_data->metadata_sock.lock, NULL);
+ pthread_mutex_init(consumer_data->metadata_sock.lock, nullptr);
DBG("Consumer command socket ready (fd: %d)", consumer_data->cmd_sock);
- DBG("Consumer metadata socket ready (fd: %d)",
- consumer_data->metadata_fd);
+ DBG("Consumer metadata socket ready (fd: %d)", consumer_data->metadata_fd);
/*
* Remove the consumerd error sock since we've established a connection.
}
/* Add metadata socket that is successfully connected. */
- ret = lttng_poll_add(&events, consumer_data->metadata_fd,
- LPOLLIN | LPOLLRDHUP);
+ ret = lttng_poll_add(&events, consumer_data->metadata_fd, LPOLLIN | LPOLLRDHUP);
if (ret < 0) {
mark_thread_intialization_as_failed(notifiers);
goto error;
pthread_mutex_unlock(cmd_socket_wrapper->lock);
ret = consumer_send_channel_monitor_pipe(cmd_socket_wrapper,
- consumer_data->channel_monitor_pipe);
+ consumer_data->channel_monitor_pipe);
if (ret) {
mark_thread_intialization_as_failed(notifiers);
goto error;
/* Discard the socket wrapper as it is no longer needed. */
consumer_destroy_socket(cmd_socket_wrapper);
- cmd_socket_wrapper = NULL;
+ cmd_socket_wrapper = nullptr;
/* The thread is completely initialized, signal that it is ready. */
mark_thread_as_ready(notifiers);
/* Infinite blocking call, waiting for transmission */
- while (1) {
+ while (true) {
health_code_update();
/* Exit the thread because the thread quit pipe has been triggered. */
for (i = 0; i < nb_fd; i++) {
/* Fetch once the poll data */
- revents = LTTNG_POLL_GETEV(&events, i);
- pollfd = LTTNG_POLL_GETFD(&events, i);
+ const auto revents = LTTNG_POLL_GETEV(&events, i);
+ const auto pollfd = LTTNG_POLL_GETFD(&events, i);
health_code_update();
* but continue the current loop to handle potential data from
* consumer.
*/
- if (pollfd == quit_pipe_read_fd) {
+ if (pollfd == thread_quit_pipe_fd) {
should_quit = 1;
} else if (pollfd == sock) {
/* Event on the consumerd socket */
- if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)
- && !(revents & LPOLLIN)) {
+ if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP) &&
+ !(revents & LPOLLIN)) {
ERR("consumer err socket second poll error");
goto error;
}
health_code_update();
/* Wait for any kconsumerd error */
- ret = lttcomm_recv_unix_sock(sock, &code,
- sizeof(enum lttcomm_return_code));
+ ret = lttcomm_recv_unix_sock(
+ sock, &code, sizeof(enum lttcomm_return_code));
if (ret <= 0) {
ERR("consumer closed the command socket");
goto error;
}
ERR("consumer return code : %s",
- lttcomm_get_readable_code((lttcomm_return_code) -code));
+ lttcomm_get_readable_code((lttcomm_return_code) -code));
goto exit;
} else if (pollfd == consumer_data->metadata_fd) {
- if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)
- && !(revents & LPOLLIN)) {
+ if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP) &&
+ !(revents & LPOLLIN)) {
ERR("consumer err metadata socket second poll error");
goto error;
}
/* UST metadata requests */
- ret = ust_consumer_metadata_request(
- &consumer_data->metadata_sock);
+ ret = ust_consumer_metadata_request(&consumer_data->metadata_sock);
if (ret < 0) {
ERR("Handling metadata request");
goto error;
if (consumer_data->type == LTTNG_CONSUMER_KERNEL) {
uatomic_set(&the_kernel_consumerd_state, CONSUMER_ERROR);
} else if (consumer_data->type == LTTNG_CONSUMER64_UST ||
- consumer_data->type == LTTNG_CONSUMER32_UST) {
+ consumer_data->type == LTTNG_CONSUMER32_UST) {
uatomic_set(&the_ust_consumerd_state, CONSUMER_ERROR);
} else {
/* Code flow error... */
}
consumer_data->cmd_sock = -1;
}
- if (consumer_data->metadata_sock.fd_ptr &&
- *consumer_data->metadata_sock.fd_ptr >= 0) {
+ if (consumer_data->metadata_sock.fd_ptr && *consumer_data->metadata_sock.fd_ptr >= 0) {
ret = close(*consumer_data->metadata_sock.fd_ptr);
if (ret) {
PERROR("close");
rcu_thread_offline();
rcu_unregister_thread();
- return NULL;
+ return nullptr;
}
static bool shutdown_consumer_management_thread(void *data)
bool launch_consumer_management_thread(struct consumer_data *consumer_data)
{
struct lttng_pipe *quit_pipe;
- struct thread_notifiers *notifiers = NULL;
+ struct thread_notifiers *notifiers = nullptr;
struct lttng_thread *thread;
notifiers = zmalloc<thread_notifiers>();
sem_init(¬ifiers->ready, 0, 0);
thread = lttng_thread_create("Consumer management",
- thread_consumer_management,
- shutdown_consumer_management_thread,
- cleanup_consumer_management_thread,
- notifiers);
+ thread_consumer_management,
+ shutdown_consumer_management_thread,
+ cleanup_consumer_management_thread,
+ notifiers);
if (!thread) {
goto error;
}
wait_until_thread_is_ready(notifiers);
lttng_thread_put(thread);
- if (notifiers->initialization_result) {
- return false;
- }
- return true;
+ return notifiers->initialization_result == 0;
error:
cleanup_consumer_management_thread(notifiers);
error_alloc: