X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fdispatch.cpp;h=811cec0d9511923f58e652f9134fe3ddd61ba61c;hb=56047f5a23df5c2c583a102b8015bbec5a7da9f1;hp=89d7c7c2e0d1fb894b02f10d7b17223ebbef9bee;hpb=64803277bbdbe0a943360d918298a48157d9da55;p=lttng-tools.git diff --git a/src/bin/lttng-sessiond/dispatch.cpp b/src/bin/lttng-sessiond/dispatch.cpp index 89d7c7c2e..811cec0d9 100644 --- a/src/bin/lttng-sessiond/dispatch.cpp +++ b/src/bin/lttng-sessiond/dispatch.cpp @@ -7,26 +7,30 @@ * */ -#include -#include -#include -#include -#include - #include "dispatch.hpp" -#include "ust-app.hpp" -#include "testpoint.hpp" #include "fd-limit.hpp" #include "health-sessiond.hpp" #include "lttng-sessiond.hpp" +#include "testpoint.hpp" #include "thread.hpp" +#include "ust-app.hpp" +#include +#include +#include + +#include +#include +#include + +namespace { struct thread_notifiers { struct ust_cmd_queue *ust_cmd_queue; int apps_cmd_pipe_write_fd; int apps_cmd_notify_pipe_write_fd; int dispatch_thread_exit; }; +} /* namespace */ /* * For each tracing session, update newly registered apps. The session list @@ -44,31 +48,29 @@ static void update_ust_app(int app_sock) return; } - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; LTTNG_ASSERT(app_sock >= 0); app = ust_app_find_by_sock(app_sock); - if (app == NULL) { + if (app == nullptr) { /* * Application can be unregistered before so * this is possible hence simply stopping the * update. */ - DBG3("UST app update failed to find app sock %d", - app_sock); - goto unlock_rcu; + DBG3("UST app update failed to find app sock %d", app_sock); + return; } /* Update all event notifiers for the app. */ ust_app_global_update_event_notifier_rules(app); /* For all tracing session(s) */ - cds_list_for_each_entry_safe(sess, stmp, &session_list->head, list) { + cds_list_for_each_entry_safe (sess, stmp, &session_list->head, list) { if (!session_get(sess)) { continue; } session_lock(sess); - if (!sess->active || !sess->ust_session || - !sess->ust_session->active) { + if (!sess->active || !sess->ust_session || !sess->ust_session->active) { goto unlock_session; } @@ -77,9 +79,6 @@ static void update_ust_app(int app_sock) session_unlock(sess); session_put(sess); } - -unlock_rcu: - rcu_read_unlock(); } /* @@ -92,7 +91,7 @@ static void sanitize_wait_queue(struct ust_reg_wait_queue *wait_queue) int ret, nb_fd = 0, i; unsigned int fd_added = 0; struct lttng_poll_event events; - struct ust_reg_wait_node *wait_node = NULL, *tmp_wait_node; + struct ust_reg_wait_node *wait_node = nullptr, *tmp_wait_node; LTTNG_ASSERT(wait_queue); @@ -108,11 +107,9 @@ static void sanitize_wait_queue(struct ust_reg_wait_queue *wait_queue) goto error_create; } - cds_list_for_each_entry_safe(wait_node, tmp_wait_node, - &wait_queue->head, head) { + cds_list_for_each_entry_safe (wait_node, tmp_wait_node, &wait_queue->head, head) { LTTNG_ASSERT(wait_node->app); - ret = lttng_poll_add(&events, wait_node->app->sock, - LPOLLHUP | LPOLLERR); + ret = lttng_poll_add(&events, wait_node->app->sock, LPOLLIN); if (ret < 0) { goto error; } @@ -139,10 +136,8 @@ static void sanitize_wait_queue(struct ust_reg_wait_queue *wait_queue) uint32_t revents = LTTNG_POLL_GETEV(&events, i); int pollfd = LTTNG_POLL_GETFD(&events, i); - cds_list_for_each_entry_safe(wait_node, tmp_wait_node, - &wait_queue->head, head) { - if (pollfd == wait_node->app->sock && - (revents & (LPOLLHUP | LPOLLERR))) { + cds_list_for_each_entry_safe (wait_node, tmp_wait_node, &wait_queue->head, head) { + if (pollfd == wait_node->app->sock && (revents & (LPOLLHUP | LPOLLERR))) { cds_list_del(&wait_node->head); wait_queue->count--; ust_app_destroy(wait_node->app); @@ -152,7 +147,7 @@ static void sanitize_wait_queue(struct ust_reg_wait_queue *wait_queue) * cds_list_for_each_entry_safe which uses * __typeof__(*wait_node). */ - wait_node = NULL; + wait_node = nullptr; break; } else { ERR("Unexpected poll events %u for sock %d", revents, pollfd); @@ -227,8 +222,8 @@ static void *thread_dispatch_ust_registration(void *data) { int ret, err = -1; struct cds_wfcq_node *node; - struct ust_command *ust_cmd = NULL; - struct ust_reg_wait_node *wait_node = NULL, *tmp_wait_node; + struct ust_command *ust_cmd = nullptr; + struct ust_reg_wait_node *wait_node = nullptr, *tmp_wait_node; struct ust_reg_wait_queue wait_queue = { .count = 0, .head = {}, @@ -237,8 +232,7 @@ static void *thread_dispatch_ust_registration(void *data) rcu_register_thread(); - health_register(the_health_sessiond, - HEALTH_SESSIOND_TYPE_APP_REG_DISPATCH); + health_register(the_health_sessiond, HEALTH_SESSIOND_TYPE_APP_REG_DISPATCH); if (testpoint(sessiond_thread_app_reg_dispatch)) { goto error_testpoint; @@ -261,8 +255,8 @@ static void *thread_dispatch_ust_registration(void *data) } do { - struct ust_app *app = NULL; - ust_cmd = NULL; + struct ust_app *app = nullptr; + ust_cmd = nullptr; /* * Make sure we don't have node(s) that have hung up before receiving @@ -273,23 +267,26 @@ static void *thread_dispatch_ust_registration(void *data) health_code_update(); /* Dequeue command for registration */ - node = cds_wfcq_dequeue_blocking( - ¬ifiers->ust_cmd_queue->head, - ¬ifiers->ust_cmd_queue->tail); - if (node == NULL) { + node = cds_wfcq_dequeue_blocking(¬ifiers->ust_cmd_queue->head, + ¬ifiers->ust_cmd_queue->tail); + if (node == nullptr) { DBG("Woken up but nothing in the UST command queue"); /* Continue thread execution */ break; } - ust_cmd = caa_container_of(node, struct ust_command, node); + ust_cmd = lttng::utils::container_of(node, &ust_command::node); DBG("Dispatching UST registration pid:%d ppid:%d uid:%d" - " gid:%d sock:%d name:%s (version %d.%d)", - ust_cmd->reg_msg.pid, ust_cmd->reg_msg.ppid, - ust_cmd->reg_msg.uid, ust_cmd->reg_msg.gid, - ust_cmd->sock, ust_cmd->reg_msg.name, - ust_cmd->reg_msg.major, ust_cmd->reg_msg.minor); + " gid:%d sock:%d name:%s (version %d.%d)", + ust_cmd->reg_msg.pid, + ust_cmd->reg_msg.ppid, + ust_cmd->reg_msg.uid, + ust_cmd->reg_msg.gid, + ust_cmd->sock, + ust_cmd->reg_msg.name, + ust_cmd->reg_msg.major, + ust_cmd->reg_msg.minor); if (ust_cmd->reg_msg.type == LTTNG_UST_CTL_SOCKET_CMD) { wait_node = zmalloc(); @@ -301,14 +298,13 @@ static void *thread_dispatch_ust_registration(void *data) } lttng_fd_put(LTTNG_FD_APPS, 1); free(ust_cmd); - ust_cmd = NULL; + ust_cmd = nullptr; goto error; } CDS_INIT_LIST_HEAD(&wait_node->head); /* Create application object if socket is CMD. */ - wait_node->app = ust_app_create(&ust_cmd->reg_msg, - ust_cmd->sock); + wait_node->app = ust_app_create(&ust_cmd->reg_msg, ust_cmd->sock); if (!wait_node->app) { ret = close(ust_cmd->sock); if (ret < 0) { @@ -316,9 +312,9 @@ static void *thread_dispatch_ust_registration(void *data) } lttng_fd_put(LTTNG_FD_APPS, 1); free(wait_node); - wait_node = NULL; + wait_node = nullptr; free(ust_cmd); - ust_cmd = NULL; + ust_cmd = nullptr; continue; } /* @@ -329,7 +325,7 @@ static void *thread_dispatch_ust_registration(void *data) wait_queue.count++; free(ust_cmd); - ust_cmd = NULL; + ust_cmd = nullptr; /* * We have to continue here since we don't have the notify * socket and the application MUST be added to the hash table @@ -341,8 +337,8 @@ static void *thread_dispatch_ust_registration(void *data) * Look for the application in the local wait queue and set the * notify socket if found. */ - cds_list_for_each_entry_safe(wait_node, tmp_wait_node, - &wait_queue.head, head) { + cds_list_for_each_entry_safe ( + wait_node, tmp_wait_node, &wait_queue.head, head) { health_code_update(); if (wait_node->app->pid == ust_cmd->reg_msg.pid) { wait_node->app->notify_sock = ust_cmd->sock; @@ -350,8 +346,9 @@ static void *thread_dispatch_ust_registration(void *data) wait_queue.count--; app = wait_node->app; free(wait_node); - wait_node = NULL; - DBG3("UST app notify socket %d is set", ust_cmd->sock); + wait_node = nullptr; + DBG3("UST app notify socket %d is set", + ust_cmd->sock); break; } } @@ -369,7 +366,7 @@ static void *thread_dispatch_ust_registration(void *data) lttng_fd_put(LTTNG_FD_APPS, 1); } free(ust_cmd); - ust_cmd = NULL; + ust_cmd = nullptr; } if (app) { @@ -381,7 +378,7 @@ static void *thread_dispatch_ust_registration(void *data) * and change its state. */ session_lock_list(); - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; /* * Add application to the global hash table. This needs to be @@ -397,10 +394,8 @@ static void *thread_dispatch_ust_registration(void *data) /* Send notify socket through the notify pipe. */ ret = send_socket_to_thread( - notifiers->apps_cmd_notify_pipe_write_fd, - app->notify_sock); + notifiers->apps_cmd_notify_pipe_write_fd, app->notify_sock); if (ret < 0) { - rcu_read_unlock(); session_unlock_list(); /* * No notify thread, stop the UST tracing. However, this is @@ -428,11 +423,9 @@ static void *thread_dispatch_ust_registration(void *data) * to the thread and unregistration will take place at that * place. */ - ret = send_socket_to_thread( - notifiers->apps_cmd_pipe_write_fd, - app->sock); + ret = send_socket_to_thread(notifiers->apps_cmd_pipe_write_fd, + app->sock); if (ret < 0) { - rcu_read_unlock(); session_unlock_list(); /* * No apps. thread, stop the UST tracing. However, this is @@ -443,10 +436,9 @@ static void *thread_dispatch_ust_registration(void *data) goto error; } - rcu_read_unlock(); session_unlock_list(); } - } while (node != NULL); + } while (node != nullptr); health_poll_entry(); /* Futex wait on queue. Blocking call on futex() */ @@ -458,8 +450,7 @@ static void *thread_dispatch_ust_registration(void *data) error: /* Clean up wait queue. */ - cds_list_for_each_entry_safe(wait_node, tmp_wait_node, - &wait_queue.head, head) { + cds_list_for_each_entry_safe (wait_node, tmp_wait_node, &wait_queue.head, head) { cds_list_del(&wait_node->head); wait_queue.count--; free(wait_node); @@ -468,13 +459,12 @@ error: /* Empty command queue. */ for (;;) { /* Dequeue command for registration */ - node = cds_wfcq_dequeue_blocking( - ¬ifiers->ust_cmd_queue->head, - ¬ifiers->ust_cmd_queue->tail); - if (node == NULL) { + node = cds_wfcq_dequeue_blocking(¬ifiers->ust_cmd_queue->head, + ¬ifiers->ust_cmd_queue->tail); + if (node == nullptr) { break; } - ust_cmd = caa_container_of(node, struct ust_command, node); + ust_cmd = lttng::utils::container_of(node, &ust_command::node); ret = close(ust_cmd->sock); if (ret < 0) { PERROR("close ust sock exit dispatch %d", ust_cmd->sock); @@ -491,7 +481,7 @@ error_testpoint: } health_unregister(the_health_sessiond); rcu_unregister_thread(); - return NULL; + return nullptr; } static bool shutdown_ust_dispatch_thread(void *data) @@ -504,8 +494,8 @@ static bool shutdown_ust_dispatch_thread(void *data) } bool launch_ust_dispatch_thread(struct ust_cmd_queue *cmd_queue, - int apps_cmd_pipe_write_fd, - int apps_cmd_notify_pipe_write_fd) + int apps_cmd_pipe_write_fd, + int apps_cmd_notify_pipe_write_fd) { struct lttng_thread *thread; struct thread_notifiers *notifiers; @@ -519,10 +509,10 @@ bool launch_ust_dispatch_thread(struct ust_cmd_queue *cmd_queue, notifiers->apps_cmd_notify_pipe_write_fd = apps_cmd_notify_pipe_write_fd; thread = lttng_thread_create("UST registration dispatch", - thread_dispatch_ust_registration, - shutdown_ust_dispatch_thread, - cleanup_ust_dispatch_thread, - notifiers); + thread_dispatch_ust_registration, + shutdown_ust_dispatch_thread, + cleanup_ust_dispatch_thread, + notifiers); if (!thread) { goto error; }