X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fmain.c;h=13bd649180452ad8346f8d7a5729446a9c58b157;hp=d4dae16fd43656db074bb064b2c48720710ebc7c;hb=03e431550191df8609f921c7b4054c57ee4644d8;hpb=48b40bcf9da8c227901d15e90f784d7cd5b5935c diff --git a/src/bin/lttng-sessiond/main.c b/src/bin/lttng-sessiond/main.c index d4dae16fd..13bd64918 100644 --- a/src/bin/lttng-sessiond/main.c +++ b/src/bin/lttng-sessiond/main.c @@ -586,6 +586,34 @@ static int generate_lock_file_path(char *path, size_t len) return ret; } +/* + * Wait on consumer process termination. + * + * Need to be called with the consumer data lock held or from a context + * ensuring no concurrent access to data (e.g: cleanup). + */ +static void wait_consumer(struct consumer_data *consumer_data) +{ + pid_t ret; + int status; + + if (consumer_data->pid <= 0) { + return; + } + + DBG("Waiting for complete teardown of consumerd (PID: %d)", + consumer_data->pid); + ret = waitpid(consumer_data->pid, &status, 0); + if (ret == -1) { + PERROR("consumerd waitpid pid: %d", consumer_data->pid) + } + if (!WIFEXITED(status)) { + ERR("consumerd termination with error: %d", + WEXITSTATUS(ret)); + } + consumer_data->pid = 0; +} + /* * Cleanup the session daemon's data structures. */ @@ -680,6 +708,10 @@ static void sessiond_cleanup(void) } } + wait_consumer(&kconsumer_data); + wait_consumer(&ustconsumer64_data); + wait_consumer(&ustconsumer32_data); + DBG("Cleaning up all agent apps"); agent_app_ht_clean(); @@ -1133,31 +1165,33 @@ static void *thread_manage_kernel(void *data) } /* Check for data on kernel pipe */ - if (pollfd == kernel_poll_pipe[0] && (revents & LPOLLIN)) { - (void) lttng_read(kernel_poll_pipe[0], - &tmp, 1); - /* - * Ret value is useless here, if this pipe gets any actions an - * update is required anyway. - */ - update_poll_flag = 1; - continue; - } else { - /* - * New CPU detected by the kernel. Adding kernel stream to - * kernel session and updating the kernel consumer - */ - if (revents & LPOLLIN) { + if (revents & LPOLLIN) { + if (pollfd == kernel_poll_pipe[0]) { + (void) lttng_read(kernel_poll_pipe[0], + &tmp, 1); + /* + * Ret value is useless here, if this pipe gets any actions an + * update is required anyway. + */ + update_poll_flag = 1; + continue; + } else { + /* + * New CPU detected by the kernel. Adding kernel stream to + * kernel session and updating the kernel consumer + */ ret = update_kernel_stream(&kconsumer_data, pollfd); if (ret < 0) { continue; } break; - /* - * TODO: We might want to handle the LPOLLERR | LPOLLHUP - * and unregister kernel stream at this point. - */ } + } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + update_poll_flag = 1; + continue; + } else { + ERR("Unexpected poll events %u for sock %d", revents, pollfd); + goto error; } } } @@ -1287,9 +1321,14 @@ restart: /* Event on the registration socket */ if (pollfd == consumer_data->err_sock) { - if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + if (revents & LPOLLIN) { + continue; + } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { ERR("consumer err socket poll error"); goto error; + } else { + ERR("Unexpected poll events %u for sock %d", revents, pollfd); + goto error; } } } @@ -1419,7 +1458,8 @@ restart_poll: if (pollfd == sock) { /* Event on the consumerd socket */ - if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP) + && !(revents & LPOLLIN)) { ERR("consumer err socket second poll error"); goto error; } @@ -1437,6 +1477,11 @@ restart_poll: goto exit; } else if (pollfd == consumer_data->metadata_fd) { + if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP) + && !(revents & LPOLLIN)) { + ERR("consumer err metadata socket second poll error"); + goto error; + } /* UST metadata requests */ ret = ust_consumer_metadata_request( &consumer_data->metadata_sock); @@ -1500,7 +1545,6 @@ error: unlink(consumer_data->err_unix_sock_path); unlink(consumer_data->cmd_unix_sock_path); - consumer_data->pid = 0; pthread_mutex_unlock(&consumer_data->lock); /* Cleanup metadata socket mutex. */ @@ -1605,10 +1649,7 @@ static void *thread_manage_apps(void *data) /* Inspect the apps cmd pipe */ if (pollfd == apps_cmd_pipe[0]) { - if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { - ERR("Apps command pipe error"); - goto error; - } else if (revents & LPOLLIN) { + if (revents & LPOLLIN) { int sock; /* Empty pipe */ @@ -1621,9 +1662,8 @@ static void *thread_manage_apps(void *data) health_code_update(); /* - * We only monitor the error events of the socket. This - * thread does not handle any incoming data from UST - * (POLLIN). + * Since this is a command socket (write then read), + * we only monitor the error events of the socket. */ ret = lttng_poll_add(&events, sock, LPOLLERR | LPOLLHUP | LPOLLRDHUP); @@ -1632,6 +1672,12 @@ static void *thread_manage_apps(void *data) } DBG("Apps with sock %d added to poll set", sock); + } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + ERR("Apps command pipe error"); + goto error; + } else { + ERR("Unknown poll events %u for sock %d", revents, pollfd); + goto error; } } else { /* @@ -1647,6 +1693,9 @@ static void *thread_manage_apps(void *data) /* Socket closed on remote end. */ ust_app_unregister(pollfd); + } else { + ERR("Unexpected poll events %u for sock %d", revents, pollfd); + goto error; } } @@ -1794,6 +1843,9 @@ static void sanitize_wait_queue(struct ust_reg_wait_queue *wait_queue) */ wait_node = NULL; break; + } else { + ERR("Unexpected poll events %u for sock %d", revents, pollfd); + goto error; } } } @@ -2154,10 +2206,7 @@ static void *thread_registration_apps(void *data) /* Event on the registration socket */ if (pollfd == apps_sock) { - if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { - ERR("Register apps socket poll error"); - goto error; - } else if (revents & LPOLLIN) { + if (revents & LPOLLIN) { sock = lttcomm_accept_unix_sock(apps_sock); if (sock < 0) { goto error; @@ -2244,6 +2293,12 @@ static void *thread_registration_apps(void *data) * barrier with the exchange in cds_wfcq_enqueue. */ futex_nto1_wake(&ust_cmd_queue.futex); + } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + ERR("Register apps socket poll error"); + goto error; + } else { + ERR("Unexpected poll events %u for sock %d", revents, pollfd); + goto error; } } } @@ -2941,6 +2996,8 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock, DBG("Processing client command %d", cmd_ctx->lsm->cmd_type); + assert(!rcu_read_ongoing()); + *sock_error = 0; switch (cmd_ctx->lsm->cmd_type) { @@ -4028,6 +4085,7 @@ setup_error: session_unlock_list(); } init_setup_error: + assert(!rcu_read_ongoing()); return ret; } @@ -4143,9 +4201,14 @@ restart: /* Event on the registration socket */ if (pollfd == sock) { - if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + if (revents & LPOLLIN) { + continue; + } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { ERR("Health socket poll error"); goto error; + } else { + ERR("Unexpected poll events %u for sock %d", revents, pollfd); + goto error; } } } @@ -4320,9 +4383,14 @@ static void *thread_manage_clients(void *data) /* Event on the registration socket */ if (pollfd == client_sock) { - if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + if (revents & LPOLLIN) { + continue; + } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { ERR("Client socket poll error"); goto error; + } else { + ERR("Unexpected poll events %u for sock %d", revents, pollfd); + goto error; } } }