X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fmain.c;h=c02f6d91e2ba904819c58f3058d83efef218c6fd;hp=f6e7094bfca08947c3cddb5d1693818a350ee5dc;hb=d9904f8450af54cb6a533db612a7181e933f5aef;hpb=6d737ce48af40e77ca27cb78348e7f3042eab3ed diff --git a/src/bin/lttng-sessiond/main.c b/src/bin/lttng-sessiond/main.c index f6e7094bf..c02f6d91e 100644 --- a/src/bin/lttng-sessiond/main.c +++ b/src/bin/lttng-sessiond/main.c @@ -161,6 +161,7 @@ static pthread_t client_thread; static pthread_t kernel_thread; static pthread_t dispatch_thread; static pthread_t health_thread; +static pthread_t ht_cleanup_thread; /* * UST registration command queue. This queue is tied with a futex and uses a N @@ -645,7 +646,8 @@ static int update_kernel_stream(struct consumer_data *consumer_data, int fd) pthread_mutex_lock(socket->lock); ret = kernel_consumer_send_channel_stream(socket, - channel, ksess); + channel, ksess, + session->output_traces ? 1 : 0); pthread_mutex_unlock(socket->lock); if (ret < 0) { rcu_read_unlock(); @@ -1333,25 +1335,116 @@ error: return ret; } +/* + * Sanitize the wait queue of the dispatch registration thread meaning removing + * invalid nodes from it. This is to avoid memory leaks for the case the UST + * notify socket is never received. + */ +static void sanitize_wait_queue(struct ust_reg_wait_queue *wait_queue) +{ + int ret, nb_fd = 0, i; + unsigned int fd_added = 0; + struct lttng_poll_event events; + struct ust_reg_wait_node *wait_node = NULL, *tmp_wait_node; + + assert(wait_queue); + + lttng_poll_init(&events); + + /* Just skip everything for an empty queue. */ + if (!wait_queue->count) { + goto end; + } + + ret = lttng_poll_create(&events, wait_queue->count, LTTNG_CLOEXEC); + if (ret < 0) { + goto error_create; + } + + cds_list_for_each_entry_safe(wait_node, tmp_wait_node, + &wait_queue->head, head) { + assert(wait_node->app); + ret = lttng_poll_add(&events, wait_node->app->sock, + LPOLLHUP | LPOLLERR); + if (ret < 0) { + goto error; + } + + fd_added = 1; + } + + if (!fd_added) { + goto end; + } + + /* + * Poll but don't block so we can quickly identify the faulty events and + * clean them afterwards from the wait queue. + */ + ret = lttng_poll_wait(&events, 0); + if (ret < 0) { + goto error; + } + nb_fd = ret; + + for (i = 0; i < nb_fd; i++) { + /* Get faulty FD. */ + uint32_t revents = LTTNG_POLL_GETEV(&events, i); + int pollfd = LTTNG_POLL_GETFD(&events, i); + + cds_list_for_each_entry_safe(wait_node, tmp_wait_node, + &wait_queue->head, head) { + if (pollfd == wait_node->app->sock && + (revents & (LPOLLHUP | LPOLLERR))) { + cds_list_del(&wait_node->head); + wait_queue->count--; + ust_app_destroy(wait_node->app); + free(wait_node); + break; + } + } + } + + if (nb_fd > 0) { + DBG("Wait queue sanitized, %d node were cleaned up", nb_fd); + } + +end: + lttng_poll_clean(&events); + return; + +error: + lttng_poll_clean(&events); +error_create: + ERR("Unable to sanitize wait queue"); + return; +} + /* * Dispatch request from the registration threads to the application * communication thread. */ static void *thread_dispatch_ust_registration(void *data) { - int ret; + int ret, err = -1; struct cds_wfq_node *node; struct ust_command *ust_cmd = NULL; - struct { - struct ust_app *app; - struct cds_list_head head; - } *wait_node = NULL, *tmp_wait_node; + struct ust_reg_wait_node *wait_node = NULL, *tmp_wait_node; + struct ust_reg_wait_queue wait_queue = { + .count = 0, + }; - CDS_LIST_HEAD(wait_queue); + health_register(HEALTH_TYPE_APP_REG_DISPATCH); + + health_code_update(); + + CDS_INIT_LIST_HEAD(&wait_queue.head); DBG("[thread] Dispatch UST command started"); while (!CMM_LOAD_SHARED(dispatch_thread_exit)) { + health_code_update(); + /* Atomically prepare the queue futex */ futex_nto1_prepare(&ust_cmd_queue.futex); @@ -1359,6 +1452,14 @@ static void *thread_dispatch_ust_registration(void *data) struct ust_app *app = NULL; ust_cmd = NULL; + /* + * Make sure we don't have node(s) that have hung up before receiving + * the notify socket. This is to clean the list in order to avoid + * memory leaks from notify socket that are never seen. + */ + sanitize_wait_queue(&wait_queue); + + health_code_update(); /* Dequeue command for registration */ node = cds_wfq_dequeue_blocking(&ust_cmd_queue.queue); if (node == NULL) { @@ -1407,7 +1508,8 @@ static void *thread_dispatch_ust_registration(void *data) * Add application to the wait queue so we can set the notify * socket before putting this object in the global ht. */ - cds_list_add(&wait_node->head, &wait_queue); + cds_list_add(&wait_node->head, &wait_queue.head); + wait_queue.count++; free(ust_cmd); /* @@ -1422,10 +1524,12 @@ static void *thread_dispatch_ust_registration(void *data) * notify socket if found. */ cds_list_for_each_entry_safe(wait_node, tmp_wait_node, - &wait_queue, head) { + &wait_queue.head, head) { + health_code_update(); if (wait_node->app->pid == ust_cmd->reg_msg.pid) { wait_node->app->notify_sock = ust_cmd->sock; cds_list_del(&wait_node->head); + wait_queue.count--; app = wait_node->app; free(wait_node); DBG3("UST app notify socket %d is set", ust_cmd->sock); @@ -1509,19 +1613,29 @@ static void *thread_dispatch_ust_registration(void *data) } } while (node != NULL); + health_poll_entry(); /* Futex wait on queue. Blocking call on futex() */ futex_nto1_wait(&ust_cmd_queue.futex); + health_poll_exit(); } + /* Normal exit, no error */ + err = 0; error: /* Clean up wait queue. */ cds_list_for_each_entry_safe(wait_node, tmp_wait_node, - &wait_queue, head) { + &wait_queue.head, head) { cds_list_del(&wait_node->head); + wait_queue.count--; free(wait_node); } DBG("Dispatch thread dying"); + if (err) { + health_error(); + ERR("Health error occurred in %s", __func__); + } + health_unregister(); return NULL; } @@ -2264,6 +2378,7 @@ static int create_ust_session(struct ltt_session *session, lus->uid = session->uid; lus->gid = session->gid; + lus->output_traces = session->output_traces; session->ust_session = lus; /* Copy session output to the newly created UST session */ @@ -2320,6 +2435,7 @@ static int create_kernel_session(struct ltt_session *session) session->kernel_session->uid = session->uid; session->kernel_session->gid = session->gid; + session->kernel_session->output_traces = session->output_traces; return LTTNG_OK; @@ -2381,6 +2497,10 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock, case LTTNG_START_TRACE: case LTTNG_STOP_TRACE: case LTTNG_DATA_PENDING: + case LTTNG_SNAPSHOT_ADD_OUTPUT: + case LTTNG_SNAPSHOT_DEL_OUTPUT: + case LTTNG_SNAPSHOT_LIST_OUTPUT: + case LTTNG_SNAPSHOT_RECORD: need_domain = 0; break; default: @@ -2449,12 +2569,7 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock, session_lock_list(); cmd_ctx->session = session_find_by_name(cmd_ctx->lsm->session.name); if (cmd_ctx->session == NULL) { - if (cmd_ctx->lsm->session.name != NULL) { - ret = LTTNG_ERR_SESS_NOT_FOUND; - } else { - /* If no session name specified */ - ret = LTTNG_ERR_SELECT_SESS; - } + ret = LTTNG_ERR_SESS_NOT_FOUND; goto error; } else { /* Acquire lock for the session */ @@ -3068,6 +3183,67 @@ skip_domain: ret = cmd_data_pending(cmd_ctx->session); break; } + case LTTNG_SNAPSHOT_ADD_OUTPUT: + { + struct lttcomm_lttng_output_id reply; + + ret = cmd_snapshot_add_output(cmd_ctx->session, + &cmd_ctx->lsm->u.snapshot_output.output, &reply.id); + if (ret != LTTNG_OK) { + goto error; + } + + ret = setup_lttng_msg(cmd_ctx, sizeof(reply)); + if (ret < 0) { + goto setup_error; + } + + /* Copy output list into message payload */ + memcpy(cmd_ctx->llm->payload, &reply, sizeof(reply)); + ret = LTTNG_OK; + break; + } + case LTTNG_SNAPSHOT_DEL_OUTPUT: + { + ret = cmd_snapshot_del_output(cmd_ctx->session, + &cmd_ctx->lsm->u.snapshot_output.output); + break; + } + case LTTNG_SNAPSHOT_LIST_OUTPUT: + { + ssize_t nb_output; + struct lttng_snapshot_output *outputs = NULL; + + nb_output = cmd_snapshot_list_outputs(cmd_ctx->session, &outputs); + if (nb_output < 0) { + ret = -nb_output; + goto error; + } + + ret = setup_lttng_msg(cmd_ctx, + nb_output * sizeof(struct lttng_snapshot_output)); + if (ret < 0) { + free(outputs); + goto setup_error; + } + + if (outputs) { + /* Copy output list into message payload */ + memcpy(cmd_ctx->llm->payload, outputs, + nb_output * sizeof(struct lttng_snapshot_output)); + free(outputs); + } + + ret = LTTNG_OK; + break; + } + case LTTNG_SNAPSHOT_RECORD: + { + ret = cmd_snapshot_record(cmd_ctx->session, + &cmd_ctx->lsm->u.snapshot_record.output, + cmd_ctx->lsm->u.snapshot_record.wait); + break; + } default: ret = LTTNG_ERR_UND; break; @@ -3225,13 +3401,25 @@ restart: case LTTNG_HEALTH_CONSUMER: reply.ret_code = check_consumer_health(); break; + case LTTNG_HEALTH_HT_CLEANUP: + reply.ret_code = health_check_state(HEALTH_TYPE_HT_CLEANUP); + break; + case LTTNG_HEALTH_APP_MANAGE_NOTIFY: + reply.ret_code = health_check_state(HEALTH_TYPE_APP_MANAGE_NOTIFY); + break; + case LTTNG_HEALTH_APP_REG_DISPATCH: + reply.ret_code = health_check_state(HEALTH_TYPE_APP_REG_DISPATCH); + break; case LTTNG_HEALTH_ALL: reply.ret_code = health_check_state(HEALTH_TYPE_APP_MANAGE) && health_check_state(HEALTH_TYPE_APP_REG) && health_check_state(HEALTH_TYPE_CMD) && health_check_state(HEALTH_TYPE_KERNEL) && - check_consumer_health(); + check_consumer_health() && + health_check_state(HEALTH_TYPE_HT_CLEANUP) && + health_check_state(HEALTH_TYPE_APP_MANAGE_NOTIFY) && + health_check_state(HEALTH_TYPE_APP_REG_DISPATCH); break; default: reply.ret_code = LTTNG_ERR_UND; @@ -3276,12 +3464,6 @@ error: PERROR("close"); } } - if (new_sock >= 0) { - ret = close(new_sock); - if (ret) { - PERROR("close"); - } - } lttng_poll_clean(&events); @@ -4142,7 +4324,7 @@ int main(int argc, char **argv) DBG2("Kernel consumer cmd path: %s", kconsumer_data.cmd_unix_sock_path); } else { - home_path = get_home_dir(); + home_path = utils_get_home_dir(); if (home_path == NULL) { /* TODO: Add --socket PATH option */ ERR("Can't get HOME directory for sockets creation."); @@ -4299,6 +4481,11 @@ int main(int argc, char **argv) } } + /* Setup the thread ht_cleanup communication pipe. */ + if (utils_create_pipe_cloexec(ht_cleanup_pipe) < 0) { + goto exit; + } + /* Setup the thread apps communication pipe. */ if ((ret = utils_create_pipe_cloexec(apps_cmd_pipe)) < 0) { goto exit; @@ -4337,6 +4524,14 @@ int main(int argc, char **argv) write_pidfile(); + /* Create thread to manage the client socket */ + ret = pthread_create(&ht_cleanup_thread, NULL, + thread_ht_cleanup, (void *) NULL); + if (ret != 0) { + PERROR("pthread_create ht_cleanup"); + goto exit_ht_cleanup; + } + /* Create thread to manage the client socket */ ret = pthread_create(&health_thread, NULL, thread_manage_health, (void *) NULL); @@ -4456,6 +4651,12 @@ exit_client: } exit_health: + ret = pthread_join(ht_cleanup_thread, &status); + if (ret != 0) { + PERROR("pthread_join ht cleanup thread"); + goto error; /* join error, exit without cleanup */ + } +exit_ht_cleanup: exit: /* * cleanup() is called when no other thread is running.