X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fagent-thread.c;h=575f6aee934a504ddb6248d4e87610d622d0959c;hp=2b6f776dd1d2af6010dd9c3c4f550e6873d0280d;hb=1f4962443f25c371e4b54e97f9eb867d67cbf88e;hpb=03e431550191df8609f921c7b4054c57ee4644d8 diff --git a/src/bin/lttng-sessiond/agent-thread.c b/src/bin/lttng-sessiond/agent-thread.c index 2b6f776dd..575f6aee9 100644 --- a/src/bin/lttng-sessiond/agent-thread.c +++ b/src/bin/lttng-sessiond/agent-thread.c @@ -15,7 +15,6 @@ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ -#define _GNU_SOURCE #define _LGPL_SOURCE #include @@ -32,6 +31,14 @@ #include "lttng-sessiond.h" #include "session.h" #include "utils.h" +#include "thread.h" + +struct thread_notifiers { + struct lttng_pipe *quit_pipe; + sem_t ready; +}; + +static int agent_tracing_enabled = -1; /* * Note that there is not port here. It's set after this URI is parsed so we @@ -58,6 +65,10 @@ static void update_agent_app(struct agent_app *app) session_lock_list(); cds_list_for_each_entry_safe(session, stmp, &list->head, list) { + if (!session_get(session)) { + continue; + } + session_lock(session); if (session->ust_session) { struct agent *agt; @@ -70,6 +81,7 @@ static void update_agent_app(struct agent_app *app) rcu_read_unlock(); } session_unlock(session); + session_put(session); } session_unlock_list(); } @@ -82,6 +94,8 @@ static struct lttcomm_sock *init_tcp_socket(void) int ret; struct lttng_uri *uri = NULL; struct lttcomm_sock *sock = NULL; + unsigned int port; + bool bind_succeeded = false; /* * This should never fail since the URI is hardcoded and the port is set @@ -89,8 +103,8 @@ static struct lttcomm_sock *init_tcp_socket(void) */ ret = uri_parse(default_reg_uri, &uri); assert(ret); - assert(agent_tcp_port); - uri->port = agent_tcp_port; + assert(config.agent_tcp_port.begin > 0); + uri->port = config.agent_tcp_port.begin; sock = lttcomm_alloc_sock_from_uri(uri); uri_free(uri); @@ -104,11 +118,43 @@ static struct lttcomm_sock *init_tcp_socket(void) goto error; } - ret = sock->ops->bind(sock); - if (ret < 0) { - WARN("Another session daemon is using this agent port. Agent support " - "will be deactivated to prevent interfering with the tracing."); - goto error; + for (port = config.agent_tcp_port.begin; + port <= config.agent_tcp_port.end; port++) { + ret = lttcomm_sock_set_port(sock, (uint16_t) port); + if (ret) { + ERR("[agent-thread] Failed to set port %u on socket", + port); + goto error; + } + DBG3("[agent-thread] Trying to bind on port %u", port); + ret = sock->ops->bind(sock); + if (!ret) { + bind_succeeded = true; + break; + } + + if (errno == EADDRINUSE) { + DBG("Failed to bind to port %u since it is already in use", + port); + } else { + PERROR("Failed to bind to port %u", port); + goto error; + } + } + + if (!bind_succeeded) { + if (config.agent_tcp_port.begin == config.agent_tcp_port.end) { + WARN("Another process is already using the agent port %i. " + "Agent support will be deactivated.", + config.agent_tcp_port.begin); + goto error; + } else { + WARN("All ports in the range [%i, %i] are already in use. " + "Agent support will be deactivated.", + config.agent_tcp_port.begin, + config.agent_tcp_port.end); + goto error; + } } ret = sock->ops->listen(sock, -1); @@ -117,7 +163,7 @@ static struct lttcomm_sock *init_tcp_socket(void) } DBG("[agent-thread] Listening on TCP port %u and socket %d", - agent_tcp_port, sock->fd); + port, sock->fd); return sock; @@ -133,9 +179,19 @@ error: */ static void destroy_tcp_socket(struct lttcomm_sock *sock) { + int ret; + uint16_t port; + assert(sock); - DBG3("[agent-thread] Destroy TCP socket on port %u", agent_tcp_port); + ret = lttcomm_sock_get_port(sock, &port); + if (ret) { + ERR("[agent-thread] Failed to get port of agent TCP socket"); + port = 0; + } + + DBG3("[agent-thread] Destroy TCP socket on port %" PRIu16, + port); /* This will return gracefully if fd is invalid. */ sock->ops->close(sock); @@ -224,15 +280,51 @@ error: return ret; } +bool agent_tracing_is_enabled(void) +{ + int enabled; + + enabled = uatomic_read(&agent_tracing_enabled); + assert(enabled != -1); + return enabled == 1; +} + +/* + * Write agent TCP port using the rundir. + */ +static int write_agent_port(uint16_t port) +{ + return utils_create_pid_file((pid_t) port, + config.agent_port_file_path.value); +} + +static +void mark_thread_as_ready(struct thread_notifiers *notifiers) +{ + DBG("Marking agent management thread as ready"); + sem_post(¬ifiers->ready); +} + +static +void wait_until_thread_is_ready(struct thread_notifiers *notifiers) +{ + DBG("Waiting for agent management thread to be ready"); + sem_wait(¬ifiers->ready); + DBG("Agent management thread is ready"); +} + /* * This thread manage application notify communication. */ -void *agent_thread_manage_registration(void *data) +static void *thread_agent_management(void *data) { int i, ret, pollfd; uint32_t revents, nb_fd; struct lttng_poll_event events; struct lttcomm_sock *reg_sock; + struct thread_notifiers *notifiers = data; + const int quit_pipe_read_fd = lttng_pipe_get_readfd( + notifiers->quit_pipe); DBG("[agent-thread] Manage agent application registration."); @@ -242,18 +334,45 @@ void *agent_thread_manage_registration(void *data) /* Agent initialization call MUST be called before starting the thread. */ assert(agent_apps_ht_by_sock); - /* Create pollset with size 2, quit pipe and socket. */ - ret = sessiond_set_thread_pollset(&events, 2); + /* Create pollset with size 2, quit pipe and registration socket. */ + ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC); if (ret < 0) { goto error_poll_create; } + ret = lttng_poll_add(&events, quit_pipe_read_fd, + LPOLLIN | LPOLLERR); + if (ret < 0) { + goto error_tcp_socket; + } + reg_sock = init_tcp_socket(); - if (!reg_sock) { + if (reg_sock) { + uint16_t port; + + assert(lttcomm_sock_get_port(reg_sock, &port) == 0); + + ret = write_agent_port(port); + if (ret) { + ERR("[agent-thread] Failed to create agent port file: agent tracing will be unavailable"); + /* Don't prevent the launch of the sessiond on error. */ + mark_thread_as_ready(notifiers); + goto error; + } + } else { + /* Don't prevent the launch of the sessiond on error. */ + mark_thread_as_ready(notifiers); goto error_tcp_socket; } - /* Add create valid TCP socket to poll set. */ + /* + * Signal that the agent thread is ready. The command thread + * may start to query whether or not agent tracing is enabled. + */ + uatomic_set(&agent_tracing_enabled, 1); + mark_thread_as_ready(notifiers); + + /* Add TCP socket to poll set. */ ret = lttng_poll_add(&events, reg_sock->fd, LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP); if (ret < 0) { @@ -285,14 +404,8 @@ restart: revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); - if (!revents) { - /* No activity for this FD (poll implementation). */ - continue; - } - /* Thread quit pipe has been closed. Killing thread. */ - ret = sessiond_check_thread_quit_pipe(pollfd, revents); - if (ret) { + if (pollfd == quit_pipe_read_fd) { goto exit; } @@ -355,9 +468,58 @@ error: error_tcp_socket: lttng_poll_clean(&events); error_poll_create: - DBG("[agent-thread] is cleaning up and stopping."); - + uatomic_set(&agent_tracing_enabled, 0); + DBG("[agent-thread] Cleaning up and stopping."); rcu_thread_offline(); rcu_unregister_thread(); return NULL; } + +static bool shutdown_agent_management_thread(void *data) +{ + struct thread_notifiers *notifiers = data; + const int write_fd = lttng_pipe_get_writefd(notifiers->quit_pipe); + + return notify_thread_pipe(write_fd) == 1; +} + +static void cleanup_agent_management_thread(void *data) +{ + struct thread_notifiers *notifiers = data; + + lttng_pipe_destroy(notifiers->quit_pipe); + sem_destroy(¬ifiers->ready); + free(notifiers); +} + +bool launch_agent_management_thread(void) +{ + struct thread_notifiers *notifiers; + struct lttng_thread *thread; + + notifiers = zmalloc(sizeof(*notifiers)); + if (!notifiers) { + goto error_alloc; + } + + sem_init(¬ifiers->ready, 0, 0); + notifiers->quit_pipe = lttng_pipe_open(FD_CLOEXEC); + if (!notifiers->quit_pipe) { + goto error; + } + thread = lttng_thread_create("Agent management", + thread_agent_management, + shutdown_agent_management_thread, + cleanup_agent_management_thread, + notifiers); + if (!thread) { + goto error; + } + wait_until_thread_is_ready(notifiers); + lttng_thread_put(thread); + return true; +error: + cleanup_agent_management_thread(notifiers); +error_alloc: + return false; +}