X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fagent-thread.cpp;h=f641ed78e12aecf0b16a67680b1ac54f0dc3ded2;hp=5e158b49bea408d2c161121f90eed7fbe4294359;hb=HEAD;hpb=7966af5763c4aaca39df9bbfa9277ff15715c720 diff --git a/src/bin/lttng-sessiond/agent-thread.cpp b/src/bin/lttng-sessiond/agent-thread.cpp index 5e158b49b..23a26a806 100644 --- a/src/bin/lttng-sessiond/agent-thread.cpp +++ b/src/bin/lttng-sessiond/agent-thread.cpp @@ -7,21 +7,24 @@ #define _LGPL_SOURCE -#include -#include -#include -#include - -#include - -#include "fd-limit.h" -#include "agent-thread.h" -#include "agent.h" -#include "lttng-sessiond.h" -#include "session.h" -#include "utils.h" -#include "thread.h" - +#include "agent-thread.hpp" +#include "agent.hpp" +#include "fd-limit.hpp" +#include "lttng-sessiond.hpp" +#include "session.hpp" +#include "thread.hpp" +#include "utils.hpp" + +#include +#include +#include +#include +#include +#include + +#include + +namespace { struct thread_notifiers { struct lttng_pipe *quit_pipe; sem_t ready; @@ -36,15 +39,15 @@ struct agent_protocol_version { unsigned int major, minor; }; -static int agent_tracing_enabled = -1; +int agent_tracing_enabled = -1; /* * Note that there is not port here. It's set after this URI is parsed so we * can let the user define a custom one. However, localhost is ALWAYS the * default listening address. */ -static const char *default_reg_uri = - "tcp://" DEFAULT_NETWORK_VIEWER_BIND_ADDRESS; +const char *default_reg_uri = "tcp://" DEFAULT_NETWORK_VIEWER_BIND_ADDRESS; +} /* namespace */ /* * Update agent application using the given socket. This is done just after @@ -64,7 +67,7 @@ static void update_agent_app(const struct agent_app *app) list = session_get_list(); LTTNG_ASSERT(list); - cds_list_for_each_entry_safe(session, stmp, &list->head, list) { + cds_list_for_each_entry_safe (session, stmp, &list->head, list) { if (!session_get(session)) { continue; } @@ -73,37 +76,38 @@ static void update_agent_app(const struct agent_app *app) if (session->ust_session) { const struct agent *agt; - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; agt = trace_ust_find_agent(session->ust_session, app->domain); if (agt) { agent_update(agt, app); } - rcu_read_unlock(); } session_unlock(session); session_put(session); } - rcu_read_lock(); - /* - * We are protected against the addition of new events by the session - * list lock being held. - */ - cds_lfht_for_each_entry(the_trigger_agents_ht_by_domain->ht, - &iter.iter, trigger_agent, node.node) { - agent_update(trigger_agent, app); + { + /* + * We are protected against the addition of new events by the session + * list lock being held. + */ + lttng::urcu::read_lock_guard read_lock; + + cds_lfht_for_each_entry ( + the_trigger_agents_ht_by_domain->ht, &iter.iter, trigger_agent, node.node) { + agent_update(trigger_agent, app); + } } - rcu_read_unlock(); } /* * Create and init socket from uri. */ -static struct lttcomm_sock *init_tcp_socket(void) +static struct lttcomm_sock *init_tcp_socket() { int ret; - struct lttng_uri *uri = NULL; - struct lttcomm_sock *sock = NULL; + struct lttng_uri *uri = nullptr; + struct lttcomm_sock *sock = nullptr; unsigned int port; bool bind_succeeded = false; @@ -118,7 +122,7 @@ static struct lttcomm_sock *init_tcp_socket(void) sock = lttcomm_alloc_sock_from_uri(uri); uri_free(uri); - if (sock == NULL) { + if (sock == nullptr) { ERR("agent allocating TCP socket"); goto error; } @@ -128,12 +132,11 @@ static struct lttcomm_sock *init_tcp_socket(void) goto error; } - for (port = the_config.agent_tcp_port.begin; - port <= the_config.agent_tcp_port.end; port++) { + for (port = the_config.agent_tcp_port.begin; port <= the_config.agent_tcp_port.end; + port++) { ret = lttcomm_sock_set_port(sock, (uint16_t) port); if (ret) { - ERR("Failed to set port %u on socket", - port); + ERR("Failed to set port %u on socket", port); goto error; } DBG3("Trying to bind on port %u", port); @@ -144,8 +147,7 @@ static struct lttcomm_sock *init_tcp_socket(void) } if (errno == EADDRINUSE) { - DBG("Failed to bind to port %u since it is already in use", - port); + DBG("Failed to bind to port %u since it is already in use", port); } else { PERROR("Failed to bind to port %u", port); goto error; @@ -153,17 +155,16 @@ static struct lttcomm_sock *init_tcp_socket(void) } if (!bind_succeeded) { - if (the_config.agent_tcp_port.begin == - the_config.agent_tcp_port.end) { + if (the_config.agent_tcp_port.begin == the_config.agent_tcp_port.end) { WARN("Another process is already using the agent port %i. " "Agent support will be deactivated.", - the_config.agent_tcp_port.begin); + the_config.agent_tcp_port.begin); goto error; } else { WARN("All ports in the range [%i, %i] are already in use. " "Agent support will be deactivated.", - the_config.agent_tcp_port.begin, - the_config.agent_tcp_port.end); + the_config.agent_tcp_port.begin, + the_config.agent_tcp_port.end); goto error; } } @@ -173,8 +174,7 @@ static struct lttcomm_sock *init_tcp_socket(void) goto error; } - DBG("Listening on TCP port %u and socket %d", - port, sock->fd); + DBG("Listening on TCP port %u and socket %d", port, sock->fd); return sock; @@ -182,7 +182,7 @@ error: if (sock) { lttcomm_destroy_sock(sock); } - return NULL; + return nullptr; } /* @@ -201,8 +201,7 @@ static void destroy_tcp_socket(struct lttcomm_sock *sock) port = 0; } - DBG3("Destroy TCP socket on port %" PRIu16, - port); + DBG3("Destroy TCP socket on port %" PRIu16, port); /* This will return gracefully if fd is invalid. */ sock->ops->close(sock); @@ -229,16 +228,17 @@ static const char *domain_type_str(enum lttng_domain_type domain_type) } } -static bool is_agent_protocol_version_supported( - const struct agent_protocol_version *version) +static bool is_agent_protocol_version_supported(const struct agent_protocol_version *version) { const bool is_supported = version->major == AGENT_MAJOR_VERSION && - version->minor == AGENT_MINOR_VERSION; + version->minor == AGENT_MINOR_VERSION; if (!is_supported) { WARN("Refusing agent connection: unsupported protocol version %ui.%ui, expected %i.%i", - version->major, version->minor, - AGENT_MAJOR_VERSION, AGENT_MINOR_VERSION); + version->major, + version->minor, + AGENT_MAJOR_VERSION, + AGENT_MINOR_VERSION); } return is_supported; @@ -251,10 +251,9 @@ static bool is_agent_protocol_version_supported( * On success, the resulting socket is returned through `agent_app_socket` * and the application's reported id is updated through `agent_app_id`. */ -static int accept_agent_connection( - struct lttcomm_sock *reg_sock, - struct agent_app_id *agent_app_id, - struct lttcomm_sock **agent_app_socket) +static int accept_agent_connection(struct lttcomm_sock *reg_sock, + struct agent_app_id *agent_app_id, + struct lttcomm_sock **agent_app_socket) { int ret; struct agent_protocol_version agent_version; @@ -276,7 +275,8 @@ static int accept_agent_connection( PERROR("Failed to register new agent application"); } else if (size != 0) { ERR("Failed to register new agent application: invalid registration message length: expected length = %zu, message length = %zd", - sizeof(msg), size); + sizeof(msg), + size); } else { DBG("Failed to register new agent application: connection closed"); } @@ -284,7 +284,7 @@ static int accept_agent_connection( goto error_close_socket; } - agent_version = (struct agent_protocol_version) { + agent_version = (struct agent_protocol_version){ be32toh(msg.major_version), be32toh(msg.minor_version), }; @@ -295,17 +295,18 @@ static int accept_agent_connection( goto error_close_socket; } - *agent_app_id = (struct agent_app_id) { + *agent_app_id = (struct agent_app_id){ .pid = (pid_t) be32toh(msg.pid), .domain = (lttng_domain_type) be32toh(msg.domain), }; DBG2("New registration for agent application: pid = %ld, domain = %s, socket fd = %d", - (long) agent_app_id->pid, - domain_type_str(agent_app_id->domain), new_sock->fd); + (long) agent_app_id->pid, + domain_type_str(agent_app_id->domain), + new_sock->fd); *agent_app_socket = new_sock; - new_sock = NULL; + new_sock = nullptr; ret = 0; goto end; @@ -316,7 +317,7 @@ end: return ret; } -bool agent_tracing_is_enabled(void) +bool agent_tracing_is_enabled() { int enabled; @@ -330,19 +331,16 @@ bool agent_tracing_is_enabled(void) */ static int write_agent_port(uint16_t port) { - return utils_create_pid_file( - (pid_t) port, the_config.agent_port_file_path.value); + return utils_create_pid_file((pid_t) port, the_config.agent_port_file_path.value); } -static -void mark_thread_as_ready(struct thread_notifiers *notifiers) +static void mark_thread_as_ready(struct thread_notifiers *notifiers) { DBG("Marking agent management thread as ready"); sem_post(¬ifiers->ready); } -static -void wait_until_thread_is_ready(struct thread_notifiers *notifiers) +static void wait_until_thread_is_ready(struct thread_notifiers *notifiers) { DBG("Waiting for agent management thread to be ready"); sem_wait(¬ifiers->ready); @@ -354,13 +352,12 @@ void wait_until_thread_is_ready(struct thread_notifiers *notifiers) */ static void *thread_agent_management(void *data) { - int i, ret, pollfd; - uint32_t revents, nb_fd; + int i, ret; + uint32_t nb_fd; struct lttng_poll_event events; struct lttcomm_sock *reg_sock; struct thread_notifiers *notifiers = (thread_notifiers *) data; - const int quit_pipe_read_fd = lttng_pipe_get_readfd( - notifiers->quit_pipe); + const auto thread_quit_pipe_fd = lttng_pipe_get_readfd(notifiers->quit_pipe); DBG("Manage agent application registration."); @@ -376,8 +373,7 @@ static void *thread_agent_management(void *data) goto error_poll_create; } - ret = lttng_poll_add(&events, quit_pipe_read_fd, - LPOLLIN | LPOLLERR); + ret = lttng_poll_add(&events, thread_quit_pipe_fd, LPOLLIN); if (ret < 0) { goto error_tcp_socket; } @@ -410,20 +406,18 @@ static void *thread_agent_management(void *data) mark_thread_as_ready(notifiers); /* Add TCP socket to the poll set. */ - ret = lttng_poll_add(&events, reg_sock->fd, - LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP); + ret = lttng_poll_add(&events, reg_sock->fd, LPOLLIN | LPOLLRDHUP); if (ret < 0) { goto error; } - while (1) { + while (true) { DBG3("Manage agent polling"); /* Inifinite blocking call, waiting for transmission */ -restart: + restart: ret = lttng_poll_wait(&events, -1); - DBG3("Manage agent return from poll on %d fds", - LTTNG_POLL_GETNB(&events)); + DBG3("Manage agent return from poll on %d fds", LTTNG_POLL_GETNB(&events)); if (ret < 0) { /* * Restart interrupted system call. @@ -438,18 +432,19 @@ restart: for (i = 0; i < nb_fd; i++) { /* Fetch once the poll data */ - revents = LTTNG_POLL_GETEV(&events, i); - pollfd = LTTNG_POLL_GETFD(&events, i); + const auto revents = LTTNG_POLL_GETEV(&events, i); + const auto pollfd = LTTNG_POLL_GETFD(&events, i); - /* Thread quit pipe has been closed. Killing thread. */ - if (pollfd == quit_pipe_read_fd) { + /* Activity on thread quit pipe, exiting. */ + if (pollfd == thread_quit_pipe_fd) { + DBG("Activity on thread quit pipe"); goto exit; } /* Activity on the registration socket. */ if (revents & LPOLLIN) { struct agent_app_id new_app_id; - struct agent_app *new_app = NULL; + struct agent_app *new_app = nullptr; struct lttcomm_sock *new_app_socket; int new_app_socket_fd; @@ -466,24 +461,21 @@ restart: * new_app_socket's ownership has been * transferred to the new agent app. */ - new_app = agent_create_app(new_app_id.pid, - new_app_id.domain, - new_app_socket); + new_app = agent_create_app( + new_app_id.pid, new_app_id.domain, new_app_socket); if (!new_app) { - new_app_socket->ops->close( - new_app_socket); + new_app_socket->ops->close(new_app_socket); continue; } new_app_socket_fd = new_app_socket->fd; - new_app_socket = NULL; + new_app_socket = nullptr; /* * Since this is a command socket (write then * read), only add poll error event to only * detect shutdown. */ - ret = lttng_poll_add(&events, new_app_socket_fd, - LPOLLERR | LPOLLHUP | LPOLLRDHUP); + ret = lttng_poll_add(&events, new_app_socket_fd, LPOLLRDHUP); if (ret < 0) { agent_destroy_app(new_app); continue; @@ -506,8 +498,7 @@ restart: if (ret < 0) { agent_destroy_app(new_app); /* Removing from the poll set. */ - ret = lttng_poll_del(&events, - new_app_socket_fd); + ret = lttng_poll_del(&events, new_app_socket_fd); if (ret < 0) { session_unlock_list(); goto error; @@ -545,7 +536,7 @@ error_poll_create: DBG("Cleaning up and stopping."); rcu_thread_offline(); rcu_unregister_thread(); - return NULL; + return nullptr; } static bool shutdown_agent_management_thread(void *data) @@ -565,12 +556,12 @@ static void cleanup_agent_management_thread(void *data) free(notifiers); } -bool launch_agent_management_thread(void) +bool launch_agent_management_thread() { struct thread_notifiers *notifiers; struct lttng_thread *thread; - notifiers = (thread_notifiers *) zmalloc(sizeof(*notifiers)); + notifiers = zmalloc(); if (!notifiers) { goto error_alloc; } @@ -581,10 +572,10 @@ bool launch_agent_management_thread(void) goto error; } thread = lttng_thread_create("Agent management", - thread_agent_management, - shutdown_agent_management_thread, - cleanup_agent_management_thread, - notifiers); + thread_agent_management, + shutdown_agent_management_thread, + cleanup_agent_management_thread, + notifiers); if (!thread) { goto error; }