X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fmain.c;h=25f5b4a6c2d1bc3aada2ab5c269747a24b4f9439;hp=1641ebeb876dfe2bc820ce6b0ea5d608148656c1;hb=5d1b02193b8788400b04dee1c53965357f51c52c;hpb=917a718d4ec336ca98820f3cf56a2db57fc9b1dd diff --git a/src/bin/lttng-sessiond/main.c b/src/bin/lttng-sessiond/main.c index 1641ebeb8..25f5b4a6c 100644 --- a/src/bin/lttng-sessiond/main.c +++ b/src/bin/lttng-sessiond/main.c @@ -83,6 +83,7 @@ #include "timer.h" #include "thread.h" #include "client.h" +#include "dispatch.h" static const char *help_msg = #ifdef LTTNG_EMBED_HELP @@ -133,10 +134,6 @@ static const struct option long_options[] = { /* Command line options to ignore from configuration file */ static const char *config_ignore_options[] = { "help", "version", "config" }; - -/* Shared between threads */ -static int dispatch_thread_exit; - static int apps_sock = -1; /* @@ -150,7 +147,6 @@ static pthread_t apps_thread; static pthread_t apps_notify_thread; static pthread_t reg_apps_thread; static pthread_t kernel_thread; -static pthread_t dispatch_thread; static pthread_t agent_reg_thread; static pthread_t load_session_thread; @@ -191,10 +187,6 @@ static void stop_threads(void) if (ret < 0) { ERR("write error on thread quit pipe"); } - - /* Dispatch thread */ - CMM_STORE_SHARED(dispatch_thread_exit, 1); - futex_nto1_wake(&ust_cmd_queue.futex); } /* @@ -547,55 +539,6 @@ error: return ret; } -/* - * For each tracing session, update newly registered apps. The session list - * lock MUST be acquired before calling this. - */ -static void update_ust_app(int app_sock) -{ - struct ltt_session *sess, *stmp; - const struct ltt_session_list *session_list = session_get_list(); - - /* Consumer is in an ERROR state. Stop any application update. */ - if (uatomic_read(&ust_consumerd_state) == CONSUMER_ERROR) { - /* Stop the update process since the consumer is dead. */ - return; - } - - /* For all tracing session(s) */ - cds_list_for_each_entry_safe(sess, stmp, &session_list->head, list) { - struct ust_app *app; - - if (!session_get(sess)) { - continue; - } - session_lock(sess); - if (!sess->ust_session) { - goto unlock_session; - } - - rcu_read_lock(); - assert(app_sock >= 0); - app = ust_app_find_by_sock(app_sock); - if (app == NULL) { - /* - * Application can be unregistered before so - * this is possible hence simply stopping the - * update. - */ - DBG3("UST app update failed to find app sock %d", - app_sock); - goto unlock_rcu; - } - ust_app_global_update(sess->ust_session, app); - unlock_rcu: - rcu_read_unlock(); - unlock_session: - session_unlock(sess); - session_put(sess); - } -} - /* * This thread manage event coming from the kernel. * @@ -1304,400 +1247,6 @@ error_testpoint: return NULL; } -/* - * Send a socket to a thread This is called from the dispatch UST registration - * thread once all sockets are set for the application. - * - * The sock value can be invalid, we don't really care, the thread will handle - * it and make the necessary cleanup if so. - * - * On success, return 0 else a negative value being the errno message of the - * write(). - */ -static int send_socket_to_thread(int fd, int sock) -{ - ssize_t ret; - - /* - * It's possible that the FD is set as invalid with -1 concurrently just - * before calling this function being a shutdown state of the thread. - */ - if (fd < 0) { - ret = -EBADF; - goto error; - } - - ret = lttng_write(fd, &sock, sizeof(sock)); - if (ret < sizeof(sock)) { - PERROR("write apps pipe %d", fd); - if (ret < 0) { - ret = -errno; - } - goto error; - } - - /* All good. Don't send back the write positive ret value. */ - ret = 0; -error: - return (int) ret; -} - -/* - * Sanitize the wait queue of the dispatch registration thread meaning removing - * invalid nodes from it. This is to avoid memory leaks for the case the UST - * notify socket is never received. - */ -static void sanitize_wait_queue(struct ust_reg_wait_queue *wait_queue) -{ - int ret, nb_fd = 0, i; - unsigned int fd_added = 0; - struct lttng_poll_event events; - struct ust_reg_wait_node *wait_node = NULL, *tmp_wait_node; - - assert(wait_queue); - - lttng_poll_init(&events); - - /* Just skip everything for an empty queue. */ - if (!wait_queue->count) { - goto end; - } - - ret = lttng_poll_create(&events, wait_queue->count, LTTNG_CLOEXEC); - if (ret < 0) { - goto error_create; - } - - cds_list_for_each_entry_safe(wait_node, tmp_wait_node, - &wait_queue->head, head) { - assert(wait_node->app); - ret = lttng_poll_add(&events, wait_node->app->sock, - LPOLLHUP | LPOLLERR); - if (ret < 0) { - goto error; - } - - fd_added = 1; - } - - if (!fd_added) { - goto end; - } - - /* - * Poll but don't block so we can quickly identify the faulty events and - * clean them afterwards from the wait queue. - */ - ret = lttng_poll_wait(&events, 0); - if (ret < 0) { - goto error; - } - nb_fd = ret; - - for (i = 0; i < nb_fd; i++) { - /* Get faulty FD. */ - uint32_t revents = LTTNG_POLL_GETEV(&events, i); - int pollfd = LTTNG_POLL_GETFD(&events, i); - - if (!revents) { - /* No activity for this FD (poll implementation). */ - continue; - } - - cds_list_for_each_entry_safe(wait_node, tmp_wait_node, - &wait_queue->head, head) { - if (pollfd == wait_node->app->sock && - (revents & (LPOLLHUP | LPOLLERR))) { - cds_list_del(&wait_node->head); - wait_queue->count--; - ust_app_destroy(wait_node->app); - free(wait_node); - /* - * Silence warning of use-after-free in - * cds_list_for_each_entry_safe which uses - * __typeof__(*wait_node). - */ - wait_node = NULL; - break; - } else { - ERR("Unexpected poll events %u for sock %d", revents, pollfd); - goto error; - } - } - } - - if (nb_fd > 0) { - DBG("Wait queue sanitized, %d node were cleaned up", nb_fd); - } - -end: - lttng_poll_clean(&events); - return; - -error: - lttng_poll_clean(&events); -error_create: - ERR("Unable to sanitize wait queue"); - return; -} - -/* - * Dispatch request from the registration threads to the application - * communication thread. - */ -static void *thread_dispatch_ust_registration(void *data) -{ - int ret, err = -1; - struct cds_wfcq_node *node; - struct ust_command *ust_cmd = NULL; - struct ust_reg_wait_node *wait_node = NULL, *tmp_wait_node; - struct ust_reg_wait_queue wait_queue = { - .count = 0, - }; - - rcu_register_thread(); - - health_register(health_sessiond, HEALTH_SESSIOND_TYPE_APP_REG_DISPATCH); - - if (testpoint(sessiond_thread_app_reg_dispatch)) { - goto error_testpoint; - } - - health_code_update(); - - CDS_INIT_LIST_HEAD(&wait_queue.head); - - DBG("[thread] Dispatch UST command started"); - - for (;;) { - health_code_update(); - - /* Atomically prepare the queue futex */ - futex_nto1_prepare(&ust_cmd_queue.futex); - - if (CMM_LOAD_SHARED(dispatch_thread_exit)) { - break; - } - - do { - struct ust_app *app = NULL; - ust_cmd = NULL; - - /* - * Make sure we don't have node(s) that have hung up before receiving - * the notify socket. This is to clean the list in order to avoid - * memory leaks from notify socket that are never seen. - */ - sanitize_wait_queue(&wait_queue); - - health_code_update(); - /* Dequeue command for registration */ - node = cds_wfcq_dequeue_blocking(&ust_cmd_queue.head, &ust_cmd_queue.tail); - if (node == NULL) { - DBG("Woken up but nothing in the UST command queue"); - /* Continue thread execution */ - break; - } - - ust_cmd = caa_container_of(node, struct ust_command, node); - - DBG("Dispatching UST registration pid:%d ppid:%d uid:%d" - " gid:%d sock:%d name:%s (version %d.%d)", - ust_cmd->reg_msg.pid, ust_cmd->reg_msg.ppid, - ust_cmd->reg_msg.uid, ust_cmd->reg_msg.gid, - ust_cmd->sock, ust_cmd->reg_msg.name, - ust_cmd->reg_msg.major, ust_cmd->reg_msg.minor); - - if (ust_cmd->reg_msg.type == USTCTL_SOCKET_CMD) { - wait_node = zmalloc(sizeof(*wait_node)); - if (!wait_node) { - PERROR("zmalloc wait_node dispatch"); - ret = close(ust_cmd->sock); - if (ret < 0) { - PERROR("close ust sock dispatch %d", ust_cmd->sock); - } - lttng_fd_put(LTTNG_FD_APPS, 1); - free(ust_cmd); - goto error; - } - CDS_INIT_LIST_HEAD(&wait_node->head); - - /* Create application object if socket is CMD. */ - wait_node->app = ust_app_create(&ust_cmd->reg_msg, - ust_cmd->sock); - if (!wait_node->app) { - ret = close(ust_cmd->sock); - if (ret < 0) { - PERROR("close ust sock dispatch %d", ust_cmd->sock); - } - lttng_fd_put(LTTNG_FD_APPS, 1); - free(wait_node); - free(ust_cmd); - continue; - } - /* - * Add application to the wait queue so we can set the notify - * socket before putting this object in the global ht. - */ - cds_list_add(&wait_node->head, &wait_queue.head); - wait_queue.count++; - - free(ust_cmd); - /* - * We have to continue here since we don't have the notify - * socket and the application MUST be added to the hash table - * only at that moment. - */ - continue; - } else { - /* - * Look for the application in the local wait queue and set the - * notify socket if found. - */ - cds_list_for_each_entry_safe(wait_node, tmp_wait_node, - &wait_queue.head, head) { - health_code_update(); - if (wait_node->app->pid == ust_cmd->reg_msg.pid) { - wait_node->app->notify_sock = ust_cmd->sock; - cds_list_del(&wait_node->head); - wait_queue.count--; - app = wait_node->app; - free(wait_node); - DBG3("UST app notify socket %d is set", ust_cmd->sock); - break; - } - } - - /* - * With no application at this stage the received socket is - * basically useless so close it before we free the cmd data - * structure for good. - */ - if (!app) { - ret = close(ust_cmd->sock); - if (ret < 0) { - PERROR("close ust sock dispatch %d", ust_cmd->sock); - } - lttng_fd_put(LTTNG_FD_APPS, 1); - } - free(ust_cmd); - } - - if (app) { - /* - * @session_lock_list - * - * Lock the global session list so from the register up to the - * registration done message, no thread can see the application - * and change its state. - */ - session_lock_list(); - rcu_read_lock(); - - /* - * Add application to the global hash table. This needs to be - * done before the update to the UST registry can locate the - * application. - */ - ust_app_add(app); - - /* Set app version. This call will print an error if needed. */ - (void) ust_app_version(app); - - /* Send notify socket through the notify pipe. */ - ret = send_socket_to_thread(apps_cmd_notify_pipe[1], - app->notify_sock); - if (ret < 0) { - rcu_read_unlock(); - session_unlock_list(); - /* - * No notify thread, stop the UST tracing. However, this is - * not an internal error of the this thread thus setting - * the health error code to a normal exit. - */ - err = 0; - goto error; - } - - /* - * Update newly registered application with the tracing - * registry info already enabled information. - */ - update_ust_app(app->sock); - - /* - * Don't care about return value. Let the manage apps threads - * handle app unregistration upon socket close. - */ - (void) ust_app_register_done(app); - - /* - * Even if the application socket has been closed, send the app - * to the thread and unregistration will take place at that - * place. - */ - ret = send_socket_to_thread(apps_cmd_pipe[1], app->sock); - if (ret < 0) { - rcu_read_unlock(); - session_unlock_list(); - /* - * No apps. thread, stop the UST tracing. However, this is - * not an internal error of the this thread thus setting - * the health error code to a normal exit. - */ - err = 0; - goto error; - } - - rcu_read_unlock(); - session_unlock_list(); - } - } while (node != NULL); - - health_poll_entry(); - /* Futex wait on queue. Blocking call on futex() */ - futex_nto1_wait(&ust_cmd_queue.futex); - health_poll_exit(); - } - /* Normal exit, no error */ - err = 0; - -error: - /* Clean up wait queue. */ - cds_list_for_each_entry_safe(wait_node, tmp_wait_node, - &wait_queue.head, head) { - cds_list_del(&wait_node->head); - wait_queue.count--; - free(wait_node); - } - - /* Empty command queue. */ - for (;;) { - /* Dequeue command for registration */ - node = cds_wfcq_dequeue_blocking(&ust_cmd_queue.head, &ust_cmd_queue.tail); - if (node == NULL) { - break; - } - ust_cmd = caa_container_of(node, struct ust_command, node); - ret = close(ust_cmd->sock); - if (ret < 0) { - PERROR("close ust sock exit dispatch %d", ust_cmd->sock); - } - lttng_fd_put(LTTNG_FD_APPS, 1); - free(ust_cmd); - } - -error_testpoint: - DBG("Dispatch thread dying"); - if (err) { - health_error(); - ERR("Health error occurred in %s", __func__); - } - health_unregister(health_sessiond); - rcu_unregister_thread(); - return NULL; -} - /* * This thread manage application registration. */ @@ -3344,14 +2893,9 @@ int main(int argc, char **argv) goto exit_client; } - /* Create thread to dispatch registration */ - ret = pthread_create(&dispatch_thread, default_pthread_attr(), - thread_dispatch_ust_registration, (void *) NULL); - if (ret) { - errno = ret; - PERROR("pthread_create dispatch"); + if (!launch_ust_dispatch_thread(&ust_cmd_queue, apps_cmd_pipe[1], + apps_cmd_notify_pipe[1])) { retval = -1; - stop_threads(); goto exit_dispatch; } @@ -3493,17 +3037,6 @@ exit_apps: retval = -1; } exit_reg_apps: - - /* - * Join dispatch thread after joining reg_apps_thread to ensure - * we don't leak applications in the queue. - */ - ret = pthread_join(dispatch_thread, &status); - if (ret) { - errno = ret; - PERROR("pthread_join"); - retval = -1; - } exit_dispatch: exit_client: exit_rotation: