X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fmain.c;h=e9529f1bf9378dc46bb3ddeaa250b10fafaef249;hp=339b20d3ee433be2d17f33d3462b104ab6266448;hb=785d2d0dc3aec3a4e44fcf677155dd07e8e4cc1f;hpb=e7fe706f887aa4d753b102a610f802f7dd816655 diff --git a/src/bin/lttng-sessiond/main.c b/src/bin/lttng-sessiond/main.c index 339b20d3e..e9529f1bf 100644 --- a/src/bin/lttng-sessiond/main.c +++ b/src/bin/lttng-sessiond/main.c @@ -222,12 +222,6 @@ enum consumerd_state { static enum consumerd_state ust_consumerd_state; static enum consumerd_state kernel_consumerd_state; -/* Used for the health monitoring of the session daemon. See health.h */ -struct health_state health_thread_cmd; -struct health_state health_thread_app_manage; -struct health_state health_thread_app_reg; -struct health_state health_thread_kernel; - /* * Socket timeout for receiving and sending in seconds. */ @@ -670,14 +664,13 @@ error: } /* - * For each tracing session, update newly registered apps. + * For each tracing session, update newly registered apps. The session list + * lock MUST be acquired before calling this. */ static void update_ust_app(int app_sock) { struct ltt_session *sess, *stmp; - session_lock_list(); - /* For all tracing session(s) */ cds_list_for_each_entry_safe(sess, stmp, &session_list_ptr->head, list) { session_lock(sess); @@ -686,8 +679,6 @@ static void update_ust_app(int app_sock) } session_unlock(sess); } - - session_unlock_list(); } /* @@ -705,6 +696,8 @@ static void *thread_manage_kernel(void *data) DBG("[thread] Thread manage kernel started"); + health_register(HEALTH_TYPE_KERNEL); + /* * This first step of the while is to clean this structure which could free * non NULL pointers so zero it before the loop. @@ -715,14 +708,14 @@ static void *thread_manage_kernel(void *data) goto error_testpoint; } - health_code_update(&health_thread_kernel); + health_code_update(); if (testpoint(thread_manage_kernel_before_loop)) { goto error_testpoint; } while (1) { - health_code_update(&health_thread_kernel); + health_code_update(); if (update_poll_flag == 1) { /* Clean events object. We are about to populate it again. */ @@ -750,9 +743,9 @@ static void *thread_manage_kernel(void *data) /* Poll infinite value of time */ restart: - health_poll_update(&health_thread_kernel); + health_poll_entry(); ret = lttng_poll_wait(&events, -1); - health_poll_update(&health_thread_kernel); + health_poll_exit(); if (ret < 0) { /* * Restart interrupted system call. @@ -775,7 +768,7 @@ static void *thread_manage_kernel(void *data) revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); - health_code_update(&health_thread_kernel); + health_code_update(); /* Thread quit pipe has been closed. Killing thread. */ ret = check_thread_quit_pipe(pollfd, revents); @@ -823,12 +816,12 @@ error_testpoint: utils_close_pipe(kernel_poll_pipe); kernel_poll_pipe[0] = kernel_poll_pipe[1] = -1; if (err) { - health_error(&health_thread_kernel); + health_error(); ERR("Health error occurred in %s", __func__); WARN("Kernel thread died unexpectedly. " "Kernel tracing can continue but CPU hotplug is disabled."); } - health_exit(&health_thread_kernel); + health_unregister(); DBG("Kernel thread dying"); return NULL; } @@ -869,24 +862,9 @@ static void *thread_manage_consumer(void *data) DBG("[thread] Manage consumer started"); - /* - * Since the consumer thread can be spawned at any moment in time, we init - * the health to a poll status (1, which is a valid health over time). - * When the thread starts, we update here the health to a "code" path being - * an even value so this thread, when reaching a poll wait, does not - * trigger an error with an even value. - * - * Here is the use case we avoid. - * - * +1: the first poll update during initialization (main()) - * +2 * x: multiple code update once in this thread. - * +1: poll wait in this thread (being a good health state). - * == even number which after the wait period shows as a bad health. - * - * In a nutshell, the following poll update to the health state brings back - * the state to an even value meaning a code path. - */ - health_poll_update(&consumer_data->health); + health_register(HEALTH_TYPE_CONSUMER); + + health_code_update(); /* * Pass 2 as size here for the thread quit pipe and kconsumerd_err_sock. @@ -907,18 +885,18 @@ static void *thread_manage_consumer(void *data) goto error; } - health_code_update(&consumer_data->health); + health_code_update(); /* Inifinite blocking call, waiting for transmission */ restart: - health_poll_update(&consumer_data->health); + health_poll_entry(); if (testpoint(thread_manage_consumer)) { goto error; } ret = lttng_poll_wait(&events, -1); - health_poll_update(&consumer_data->health); + health_poll_exit(); if (ret < 0) { /* * Restart interrupted system call. @@ -936,7 +914,7 @@ restart: revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); - health_code_update(&consumer_data->health); + health_code_update(); /* Thread quit pipe has been closed. Killing thread. */ ret = check_thread_quit_pipe(pollfd, revents); @@ -965,7 +943,7 @@ restart: */ (void) utils_set_fd_cloexec(sock); - health_code_update(&consumer_data->health); + health_code_update(); DBG2("Receiving code from consumer err_sock"); @@ -976,7 +954,7 @@ restart: goto error; } - health_code_update(&consumer_data->health); + health_code_update(); if (code == LTTCOMM_CONSUMERD_COMMAND_SOCK_READY) { consumer_data->cmd_sock = @@ -1006,13 +984,13 @@ restart: goto error; } - health_code_update(&consumer_data->health); + health_code_update(); /* Inifinite blocking call, waiting for transmission */ restart_poll: - health_poll_update(&consumer_data->health); + health_poll_entry(); ret = lttng_poll_wait(&events, -1); - health_poll_update(&consumer_data->health); + health_poll_exit(); if (ret < 0) { /* * Restart interrupted system call. @@ -1030,7 +1008,7 @@ restart_poll: revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); - health_code_update(&consumer_data->health); + health_code_update(); /* Thread quit pipe has been closed. Killing thread. */ ret = check_thread_quit_pipe(pollfd, revents); @@ -1048,7 +1026,7 @@ restart_poll: } } - health_code_update(&consumer_data->health); + health_code_update(); /* Wait for any kconsumerd error */ ret = lttcomm_recv_unix_sock(sock, &code, @@ -1099,10 +1077,10 @@ error: lttng_poll_clean(&events); error_poll: if (err) { - health_error(&consumer_data->health); + health_error(); ERR("Health error occurred in %s", __func__); } - health_exit(&consumer_data->health); + health_unregister(); DBG("consumer thread cleanup completed"); return NULL; @@ -1123,11 +1101,13 @@ static void *thread_manage_apps(void *data) rcu_register_thread(); rcu_thread_online(); + health_register(HEALTH_TYPE_APP_MANAGE); + if (testpoint(thread_manage_apps)) { goto error_testpoint; } - health_code_update(&health_thread_app_manage); + health_code_update(); ret = create_thread_poll_set(&events, 2); if (ret < 0) { @@ -1143,16 +1123,16 @@ static void *thread_manage_apps(void *data) goto error; } - health_code_update(&health_thread_app_manage); + health_code_update(); while (1) { DBG("Apps thread polling on %d fds", LTTNG_POLL_GETNB(&events)); /* Inifinite blocking call, waiting for transmission */ restart: - health_poll_update(&health_thread_app_manage); + health_poll_entry(); ret = lttng_poll_wait(&events, -1); - health_poll_update(&health_thread_app_manage); + health_poll_exit(); if (ret < 0) { /* * Restart interrupted system call. @@ -1170,7 +1150,7 @@ static void *thread_manage_apps(void *data) revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); - health_code_update(&health_thread_app_manage); + health_code_update(); /* Thread quit pipe has been closed. Killing thread. */ ret = check_thread_quit_pipe(pollfd, revents); @@ -1194,18 +1174,28 @@ static void *thread_manage_apps(void *data) goto error; } - health_code_update(&health_thread_app_manage); + health_code_update(); + + /* + * @session_lock + * Lock the global session list so from the register up to + * the registration done message, no thread can see the + * application and change its state. + */ + session_lock_list(); /* Register applicaton to the session daemon */ ret = ust_app_register(&ust_cmd.reg_msg, ust_cmd.sock); if (ret == -ENOMEM) { + session_unlock_list(); goto error; } else if (ret < 0) { + session_unlock_list(); break; } - health_code_update(&health_thread_app_manage); + health_code_update(); /* * Validate UST version compatibility. @@ -1219,7 +1209,7 @@ static void *thread_manage_apps(void *data) update_ust_app(ust_cmd.sock); } - health_code_update(&health_thread_app_manage); + health_code_update(); ret = ust_app_register_done(ust_cmd.sock); if (ret < 0) { @@ -1237,6 +1227,7 @@ static void *thread_manage_apps(void *data) ret = lttng_poll_add(&events, ust_cmd.sock, LPOLLERR & LPOLLHUP & LPOLLRDHUP); if (ret < 0) { + session_unlock_list(); goto error; } @@ -1249,8 +1240,9 @@ static void *thread_manage_apps(void *data) DBG("Apps with sock %d added to poll set", ust_cmd.sock); } + session_unlock_list(); - health_code_update(&health_thread_app_manage); + health_code_update(); break; } @@ -1272,7 +1264,7 @@ static void *thread_manage_apps(void *data) } } - health_code_update(&health_thread_app_manage); + health_code_update(); } } @@ -1291,10 +1283,10 @@ error_testpoint: */ if (err) { - health_error(&health_thread_app_manage); + health_error(); ERR("Health error occurred in %s", __func__); } - health_exit(&health_thread_app_manage); + health_unregister(); DBG("Application communication apps thread cleanup complete"); rcu_thread_offline(); rcu_unregister_thread(); @@ -1391,6 +1383,8 @@ static void *thread_registration_apps(void *data) DBG("[thread] Manage application registration started"); + health_register(HEALTH_TYPE_APP_REG); + if (testpoint(thread_registration_apps)) { goto error_testpoint; } @@ -1428,9 +1422,9 @@ static void *thread_registration_apps(void *data) /* Inifinite blocking call, waiting for transmission */ restart: - health_poll_update(&health_thread_app_reg); + health_poll_entry(); ret = lttng_poll_wait(&events, -1); - health_poll_update(&health_thread_app_reg); + health_poll_exit(); if (ret < 0) { /* * Restart interrupted system call. @@ -1444,7 +1438,7 @@ static void *thread_registration_apps(void *data) nb_fd = ret; for (i = 0; i < nb_fd; i++) { - health_code_update(&health_thread_app_reg); + health_code_update(); /* Fetch once the poll data */ revents = LTTNG_POLL_GETEV(&events, i); @@ -1496,7 +1490,7 @@ static void *thread_registration_apps(void *data) sock = -1; continue; } - health_code_update(&health_thread_app_reg); + health_code_update(); ret = lttcomm_recv_unix_sock(sock, &ust_cmd->reg_msg, sizeof(struct ust_register_msg)); if (ret < 0 || ret < sizeof(struct ust_register_msg)) { @@ -1514,7 +1508,7 @@ static void *thread_registration_apps(void *data) sock = -1; continue; } - health_code_update(&health_thread_app_reg); + health_code_update(); ust_cmd->sock = sock; sock = -1; @@ -1545,7 +1539,7 @@ static void *thread_registration_apps(void *data) exit: error: if (err) { - health_error(&health_thread_app_reg); + health_error(); ERR("Health error occurred in %s", __func__); } @@ -1573,7 +1567,7 @@ error_listen: error_create_poll: error_testpoint: DBG("UST Registration thread cleanup complete"); - health_exit(&health_thread_app_reg); + health_unregister(); return NULL; } @@ -1948,9 +1942,7 @@ static int check_consumer_health(void) { int ret; - ret = health_check_state(&kconsumer_data.health) && - health_check_state(&ustconsumer32_data.health) && - health_check_state(&ustconsumer64_data.health); + ret = health_check_state(HEALTH_TYPE_CONSUMER); DBG3("Health consumer check %d", ret); @@ -2104,7 +2096,7 @@ static int create_ust_session(struct ltt_session *session, DBG("Creating UST session"); - lus = trace_ust_create_session(session->path, session->id, domain); + lus = trace_ust_create_session(session->path, session->id); if (lus == NULL) { ret = LTTNG_ERR_UST_SESS_FAIL; goto error; @@ -2351,8 +2343,7 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock, /* Start the kernel consumer daemon */ pthread_mutex_lock(&kconsumer_data.pid_mutex); if (kconsumer_data.pid == 0 && - cmd_ctx->lsm->cmd_type != LTTNG_REGISTER_CONSUMER && - cmd_ctx->session->start_consumer) { + cmd_ctx->lsm->cmd_type != LTTNG_REGISTER_CONSUMER) { pthread_mutex_unlock(&kconsumer_data.pid_mutex); ret = start_consumerd(&kconsumer_data); if (ret < 0) { @@ -2399,8 +2390,7 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock, pthread_mutex_lock(&ustconsumer64_data.pid_mutex); if (consumerd64_bin[0] != '\0' && ustconsumer64_data.pid == 0 && - cmd_ctx->lsm->cmd_type != LTTNG_REGISTER_CONSUMER && - cmd_ctx->session->start_consumer) { + cmd_ctx->lsm->cmd_type != LTTNG_REGISTER_CONSUMER) { pthread_mutex_unlock(&ustconsumer64_data.pid_mutex); ret = start_consumerd(&ustconsumer64_data); if (ret < 0) { @@ -2428,8 +2418,7 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock, /* 32-bit */ if (consumerd32_bin[0] != '\0' && ustconsumer32_data.pid == 0 && - cmd_ctx->lsm->cmd_type != LTTNG_REGISTER_CONSUMER && - cmd_ctx->session->start_consumer) { + cmd_ctx->lsm->cmd_type != LTTNG_REGISTER_CONSUMER) { pthread_mutex_unlock(&ustconsumer32_data.pid_mutex); ret = start_consumerd(&ustconsumer32_data); if (ret < 0) { @@ -2493,6 +2482,21 @@ skip_domain: } } + /* + * Send relayd information to consumer as soon as we have a domain and a + * session defined. + */ + if (cmd_ctx->session && need_domain) { + /* + * Setup relayd if not done yet. If the relayd information was already + * sent to the consumer, this call will gracefully return. + */ + ret = cmd_setup_relayd(cmd_ctx->session); + if (ret != LTTNG_OK) { + goto error; + } + } + /* Process by command type */ switch (cmd_ctx->lsm->cmd_type) { case LTTNG_ADD_CONTEXT: @@ -2523,41 +2527,12 @@ skip_domain: cmd_ctx->lsm->u.disable.channel_name); break; } - case LTTNG_DISABLE_CONSUMER: - { - ret = cmd_disable_consumer(cmd_ctx->lsm->domain.type, cmd_ctx->session); - break; - } case LTTNG_ENABLE_CHANNEL: { ret = cmd_enable_channel(cmd_ctx->session, cmd_ctx->lsm->domain.type, &cmd_ctx->lsm->u.channel.chan, kernel_poll_pipe[1]); break; } - case LTTNG_ENABLE_CONSUMER: - { - /* - * XXX: 0 means that this URI should be applied on the session. Should - * be a DOMAIN enuam. - */ - ret = cmd_enable_consumer(cmd_ctx->lsm->domain.type, cmd_ctx->session); - if (ret != LTTNG_OK) { - goto error; - } - - if (cmd_ctx->lsm->domain.type == 0) { - /* Add the URI for the UST session if a consumer is present. */ - if (cmd_ctx->session->ust_session && - cmd_ctx->session->ust_session->consumer) { - ret = cmd_enable_consumer(LTTNG_DOMAIN_UST, cmd_ctx->session); - } else if (cmd_ctx->session->kernel_session && - cmd_ctx->session->kernel_session->consumer) { - ret = cmd_enable_consumer(LTTNG_DOMAIN_KERNEL, - cmd_ctx->session); - } - } - break; - } case LTTNG_ENABLE_EVENT: { ret = cmd_enable_event(cmd_ctx->session, cmd_ctx->lsm->domain.type, @@ -3068,26 +3043,26 @@ restart: switch (msg.component) { case LTTNG_HEALTH_CMD: - reply.ret_code = health_check_state(&health_thread_cmd); + reply.ret_code = health_check_state(HEALTH_TYPE_CMD); break; case LTTNG_HEALTH_APP_MANAGE: - reply.ret_code = health_check_state(&health_thread_app_manage); + reply.ret_code = health_check_state(HEALTH_TYPE_APP_MANAGE); break; case LTTNG_HEALTH_APP_REG: - reply.ret_code = health_check_state(&health_thread_app_reg); + reply.ret_code = health_check_state(HEALTH_TYPE_APP_REG); break; case LTTNG_HEALTH_KERNEL: - reply.ret_code = health_check_state(&health_thread_kernel); + reply.ret_code = health_check_state(HEALTH_TYPE_KERNEL); break; case LTTNG_HEALTH_CONSUMER: reply.ret_code = check_consumer_health(); break; case LTTNG_HEALTH_ALL: reply.ret_code = - health_check_state(&health_thread_app_manage) && - health_check_state(&health_thread_app_reg) && - health_check_state(&health_thread_cmd) && - health_check_state(&health_thread_kernel) && + health_check_state(HEALTH_TYPE_APP_MANAGE) && + health_check_state(HEALTH_TYPE_APP_REG) && + health_check_state(HEALTH_TYPE_CMD) && + health_check_state(HEALTH_TYPE_KERNEL) && check_consumer_health(); break; default: @@ -3162,11 +3137,13 @@ static void *thread_manage_clients(void *data) rcu_register_thread(); + health_register(HEALTH_TYPE_CMD); + if (testpoint(thread_manage_clients)) { goto error_testpoint; } - health_code_update(&health_thread_cmd); + health_code_update(); ret = lttcomm_listen_unix_sock(client_sock); if (ret < 0) { @@ -3199,16 +3176,16 @@ static void *thread_manage_clients(void *data) goto error; } - health_code_update(&health_thread_cmd); + health_code_update(); while (1) { DBG("Accepting client command ..."); /* Inifinite blocking call, waiting for transmission */ restart: - health_poll_update(&health_thread_cmd); + health_poll_entry(); ret = lttng_poll_wait(&events, -1); - health_poll_update(&health_thread_cmd); + health_poll_exit(); if (ret < 0) { /* * Restart interrupted system call. @@ -3226,7 +3203,7 @@ static void *thread_manage_clients(void *data) revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); - health_code_update(&health_thread_cmd); + health_code_update(); /* Thread quit pipe has been closed. Killing thread. */ ret = check_thread_quit_pipe(pollfd, revents); @@ -3246,7 +3223,7 @@ static void *thread_manage_clients(void *data) DBG("Wait for client response"); - health_code_update(&health_thread_cmd); + health_code_update(); sock = lttcomm_accept_unix_sock(client_sock); if (sock < 0) { @@ -3282,7 +3259,7 @@ static void *thread_manage_clients(void *data) cmd_ctx->llm = NULL; cmd_ctx->session = NULL; - health_code_update(&health_thread_cmd); + health_code_update(); /* * Data is received from the lttng client. The struct @@ -3303,7 +3280,7 @@ static void *thread_manage_clients(void *data) continue; } - health_code_update(&health_thread_cmd); + health_code_update(); // TODO: Validate cmd_ctx including sanity check for // security purpose. @@ -3336,7 +3313,7 @@ static void *thread_manage_clients(void *data) continue; } - health_code_update(&health_thread_cmd); + health_code_update(); DBG("Sending response (size: %d, retcode: %s)", cmd_ctx->lttng_msg_size, @@ -3355,7 +3332,7 @@ static void *thread_manage_clients(void *data) clean_command_ctx(&cmd_ctx); - health_code_update(&health_thread_cmd); + health_code_update(); } exit: @@ -3382,11 +3359,11 @@ error_testpoint: } if (err) { - health_error(&health_thread_cmd); + health_error(); ERR("Health error occurred in %s", __func__); } - health_exit(&health_thread_cmd); + health_unregister(); DBG("Client thread dying"); @@ -4167,26 +4144,6 @@ int main(int argc, char **argv) cmd_init(); - /* Init all health thread counters. */ - health_init(&health_thread_cmd); - health_init(&health_thread_kernel); - health_init(&health_thread_app_manage); - health_init(&health_thread_app_reg); - - /* - * Init health counters of the consumer thread. We do a quick hack here to - * the state of the consumer health is fine even if the thread is not - * started. Once the thread starts, the health state is updated with a poll - * value to set a health code path. This is simply to ease our life and has - * no cost what so ever. - */ - health_init(&kconsumer_data.health); - health_poll_update(&kconsumer_data.health); - health_init(&ustconsumer32_data.health); - health_poll_update(&ustconsumer32_data.health); - health_init(&ustconsumer64_data.health); - health_poll_update(&ustconsumer64_data.health); - /* Check for the application socket timeout env variable. */ env_app_timeout = getenv(DEFAULT_APP_SOCKET_TIMEOUT_ENV); if (env_app_timeout) {