X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fagent-thread.cpp;h=c3d76896cdbf54b0fdf2e55acb44519e6a30af17;hp=77470b3ee1a25446a9a8f23e2be3c9f4f4a7e134;hb=28ab034a2c3582d07d3423d2d746731f87d3969f;hpb=c9e313bc594f40a86eed237dce222c0fc99c957f diff --git a/src/bin/lttng-sessiond/agent-thread.cpp b/src/bin/lttng-sessiond/agent-thread.cpp index 77470b3ee..c3d76896c 100644 --- a/src/bin/lttng-sessiond/agent-thread.cpp +++ b/src/bin/lttng-sessiond/agent-thread.cpp @@ -7,21 +7,21 @@ #define _LGPL_SOURCE -#include -#include -#include -#include - -#include - -#include "fd-limit.hpp" #include "agent-thread.hpp" #include "agent.hpp" +#include "fd-limit.hpp" #include "lttng-sessiond.hpp" #include "session.hpp" -#include "utils.hpp" #include "thread.hpp" +#include "utils.hpp" + +#include +#include +#include +#include +#include +namespace { struct thread_notifiers { struct lttng_pipe *quit_pipe; sem_t ready; @@ -36,15 +36,15 @@ struct agent_protocol_version { unsigned int major, minor; }; -static int agent_tracing_enabled = -1; +int agent_tracing_enabled = -1; /* * Note that there is not port here. It's set after this URI is parsed so we * can let the user define a custom one. However, localhost is ALWAYS the * default listening address. */ -static const char *default_reg_uri = - "tcp://" DEFAULT_NETWORK_VIEWER_BIND_ADDRESS; +const char *default_reg_uri = "tcp://" DEFAULT_NETWORK_VIEWER_BIND_ADDRESS; +} /* namespace */ /* * Update agent application using the given socket. This is done just after @@ -64,7 +64,7 @@ static void update_agent_app(const struct agent_app *app) list = session_get_list(); LTTNG_ASSERT(list); - cds_list_for_each_entry_safe(session, stmp, &list->head, list) { + cds_list_for_each_entry_safe (session, stmp, &list->head, list) { if (!session_get(session)) { continue; } @@ -89,8 +89,8 @@ static void update_agent_app(const struct agent_app *app) * We are protected against the addition of new events by the session * list lock being held. */ - cds_lfht_for_each_entry(the_trigger_agents_ht_by_domain->ht, - &iter.iter, trigger_agent, node.node) { + cds_lfht_for_each_entry ( + the_trigger_agents_ht_by_domain->ht, &iter.iter, trigger_agent, node.node) { agent_update(trigger_agent, app); } rcu_read_unlock(); @@ -128,12 +128,11 @@ static struct lttcomm_sock *init_tcp_socket(void) goto error; } - for (port = the_config.agent_tcp_port.begin; - port <= the_config.agent_tcp_port.end; port++) { + for (port = the_config.agent_tcp_port.begin; port <= the_config.agent_tcp_port.end; + port++) { ret = lttcomm_sock_set_port(sock, (uint16_t) port); if (ret) { - ERR("Failed to set port %u on socket", - port); + ERR("Failed to set port %u on socket", port); goto error; } DBG3("Trying to bind on port %u", port); @@ -144,8 +143,7 @@ static struct lttcomm_sock *init_tcp_socket(void) } if (errno == EADDRINUSE) { - DBG("Failed to bind to port %u since it is already in use", - port); + DBG("Failed to bind to port %u since it is already in use", port); } else { PERROR("Failed to bind to port %u", port); goto error; @@ -153,17 +151,16 @@ static struct lttcomm_sock *init_tcp_socket(void) } if (!bind_succeeded) { - if (the_config.agent_tcp_port.begin == - the_config.agent_tcp_port.end) { + if (the_config.agent_tcp_port.begin == the_config.agent_tcp_port.end) { WARN("Another process is already using the agent port %i. " "Agent support will be deactivated.", - the_config.agent_tcp_port.begin); + the_config.agent_tcp_port.begin); goto error; } else { WARN("All ports in the range [%i, %i] are already in use. " "Agent support will be deactivated.", - the_config.agent_tcp_port.begin, - the_config.agent_tcp_port.end); + the_config.agent_tcp_port.begin, + the_config.agent_tcp_port.end); goto error; } } @@ -173,8 +170,7 @@ static struct lttcomm_sock *init_tcp_socket(void) goto error; } - DBG("Listening on TCP port %u and socket %d", - port, sock->fd); + DBG("Listening on TCP port %u and socket %d", port, sock->fd); return sock; @@ -201,8 +197,7 @@ static void destroy_tcp_socket(struct lttcomm_sock *sock) port = 0; } - DBG3("Destroy TCP socket on port %" PRIu16, - port); + DBG3("Destroy TCP socket on port %" PRIu16, port); /* This will return gracefully if fd is invalid. */ sock->ops->close(sock); @@ -229,16 +224,17 @@ static const char *domain_type_str(enum lttng_domain_type domain_type) } } -static bool is_agent_protocol_version_supported( - const struct agent_protocol_version *version) +static bool is_agent_protocol_version_supported(const struct agent_protocol_version *version) { const bool is_supported = version->major == AGENT_MAJOR_VERSION && - version->minor == AGENT_MINOR_VERSION; + version->minor == AGENT_MINOR_VERSION; if (!is_supported) { WARN("Refusing agent connection: unsupported protocol version %ui.%ui, expected %i.%i", - version->major, version->minor, - AGENT_MAJOR_VERSION, AGENT_MINOR_VERSION); + version->major, + version->minor, + AGENT_MAJOR_VERSION, + AGENT_MINOR_VERSION); } return is_supported; @@ -251,10 +247,9 @@ static bool is_agent_protocol_version_supported( * On success, the resulting socket is returned through `agent_app_socket` * and the application's reported id is updated through `agent_app_id`. */ -static int accept_agent_connection( - struct lttcomm_sock *reg_sock, - struct agent_app_id *agent_app_id, - struct lttcomm_sock **agent_app_socket) +static int accept_agent_connection(struct lttcomm_sock *reg_sock, + struct agent_app_id *agent_app_id, + struct lttcomm_sock **agent_app_socket) { int ret; struct agent_protocol_version agent_version; @@ -276,7 +271,8 @@ static int accept_agent_connection( PERROR("Failed to register new agent application"); } else if (size != 0) { ERR("Failed to register new agent application: invalid registration message length: expected length = %zu, message length = %zd", - sizeof(msg), size); + sizeof(msg), + size); } else { DBG("Failed to register new agent application: connection closed"); } @@ -284,7 +280,7 @@ static int accept_agent_connection( goto error_close_socket; } - agent_version = (struct agent_protocol_version) { + agent_version = (struct agent_protocol_version){ be32toh(msg.major_version), be32toh(msg.minor_version), }; @@ -295,14 +291,15 @@ static int accept_agent_connection( goto error_close_socket; } - *agent_app_id = (struct agent_app_id) { + *agent_app_id = (struct agent_app_id){ .pid = (pid_t) be32toh(msg.pid), .domain = (lttng_domain_type) be32toh(msg.domain), }; DBG2("New registration for agent application: pid = %ld, domain = %s, socket fd = %d", - (long) agent_app_id->pid, - domain_type_str(agent_app_id->domain), new_sock->fd); + (long) agent_app_id->pid, + domain_type_str(agent_app_id->domain), + new_sock->fd); *agent_app_socket = new_sock; new_sock = NULL; @@ -330,19 +327,16 @@ bool agent_tracing_is_enabled(void) */ static int write_agent_port(uint16_t port) { - return utils_create_pid_file( - (pid_t) port, the_config.agent_port_file_path.value); + return utils_create_pid_file((pid_t) port, the_config.agent_port_file_path.value); } -static -void mark_thread_as_ready(struct thread_notifiers *notifiers) +static void mark_thread_as_ready(struct thread_notifiers *notifiers) { DBG("Marking agent management thread as ready"); sem_post(¬ifiers->ready); } -static -void wait_until_thread_is_ready(struct thread_notifiers *notifiers) +static void wait_until_thread_is_ready(struct thread_notifiers *notifiers) { DBG("Waiting for agent management thread to be ready"); sem_wait(¬ifiers->ready); @@ -354,13 +348,12 @@ void wait_until_thread_is_ready(struct thread_notifiers *notifiers) */ static void *thread_agent_management(void *data) { - int i, ret, pollfd; - uint32_t revents, nb_fd; + int i, ret; + uint32_t nb_fd; struct lttng_poll_event events; struct lttcomm_sock *reg_sock; struct thread_notifiers *notifiers = (thread_notifiers *) data; - const int quit_pipe_read_fd = lttng_pipe_get_readfd( - notifiers->quit_pipe); + const auto thread_quit_pipe_fd = lttng_pipe_get_readfd(notifiers->quit_pipe); DBG("Manage agent application registration."); @@ -376,8 +369,7 @@ static void *thread_agent_management(void *data) goto error_poll_create; } - ret = lttng_poll_add(&events, quit_pipe_read_fd, - LPOLLIN | LPOLLERR); + ret = lttng_poll_add(&events, thread_quit_pipe_fd, LPOLLIN); if (ret < 0) { goto error_tcp_socket; } @@ -410,8 +402,7 @@ static void *thread_agent_management(void *data) mark_thread_as_ready(notifiers); /* Add TCP socket to the poll set. */ - ret = lttng_poll_add(&events, reg_sock->fd, - LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP); + ret = lttng_poll_add(&events, reg_sock->fd, LPOLLIN | LPOLLRDHUP); if (ret < 0) { goto error; } @@ -420,10 +411,9 @@ static void *thread_agent_management(void *data) DBG3("Manage agent polling"); /* Inifinite blocking call, waiting for transmission */ -restart: + restart: ret = lttng_poll_wait(&events, -1); - DBG3("Manage agent return from poll on %d fds", - LTTNG_POLL_GETNB(&events)); + DBG3("Manage agent return from poll on %d fds", LTTNG_POLL_GETNB(&events)); if (ret < 0) { /* * Restart interrupted system call. @@ -438,11 +428,12 @@ restart: for (i = 0; i < nb_fd; i++) { /* Fetch once the poll data */ - revents = LTTNG_POLL_GETEV(&events, i); - pollfd = LTTNG_POLL_GETFD(&events, i); + const auto revents = LTTNG_POLL_GETEV(&events, i); + const auto pollfd = LTTNG_POLL_GETFD(&events, i); - /* Thread quit pipe has been closed. Killing thread. */ - if (pollfd == quit_pipe_read_fd) { + /* Activity on thread quit pipe, exiting. */ + if (pollfd == thread_quit_pipe_fd) { + DBG("Activity on thread quit pipe"); goto exit; } @@ -466,12 +457,10 @@ restart: * new_app_socket's ownership has been * transferred to the new agent app. */ - new_app = agent_create_app(new_app_id.pid, - new_app_id.domain, - new_app_socket); + new_app = agent_create_app( + new_app_id.pid, new_app_id.domain, new_app_socket); if (!new_app) { - new_app_socket->ops->close( - new_app_socket); + new_app_socket->ops->close(new_app_socket); continue; } new_app_socket_fd = new_app_socket->fd; @@ -482,8 +471,7 @@ restart: * read), only add poll error event to only * detect shutdown. */ - ret = lttng_poll_add(&events, new_app_socket_fd, - LPOLLERR | LPOLLHUP | LPOLLRDHUP); + ret = lttng_poll_add(&events, new_app_socket_fd, LPOLLRDHUP); if (ret < 0) { agent_destroy_app(new_app); continue; @@ -506,8 +494,7 @@ restart: if (ret < 0) { agent_destroy_app(new_app); /* Removing from the poll set. */ - ret = lttng_poll_del(&events, - new_app_socket_fd); + ret = lttng_poll_del(&events, new_app_socket_fd); if (ret < 0) { session_unlock_list(); goto error; @@ -570,7 +557,7 @@ bool launch_agent_management_thread(void) struct thread_notifiers *notifiers; struct lttng_thread *thread; - notifiers = (thread_notifiers *) zmalloc(sizeof(*notifiers)); + notifiers = zmalloc(); if (!notifiers) { goto error_alloc; } @@ -581,10 +568,10 @@ bool launch_agent_management_thread(void) goto error; } thread = lttng_thread_create("Agent management", - thread_agent_management, - shutdown_agent_management_thread, - cleanup_agent_management_thread, - notifiers); + thread_agent_management, + shutdown_agent_management_thread, + cleanup_agent_management_thread, + notifiers); if (!thread) { goto error; }