X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fmain.c;h=c02f6d91e2ba904819c58f3058d83efef218c6fd;hp=007c722921700e351cfedd6441fc31a16a9683b3;hb=d9904f8450af54cb6a533db612a7181e933f5aef;hpb=12e2b88170b3bf7a55beb692c717470752ad51eb diff --git a/src/bin/lttng-sessiond/main.c b/src/bin/lttng-sessiond/main.c index 007c72292..c02f6d91e 100644 --- a/src/bin/lttng-sessiond/main.c +++ b/src/bin/lttng-sessiond/main.c @@ -646,7 +646,8 @@ static int update_kernel_stream(struct consumer_data *consumer_data, int fd) pthread_mutex_lock(socket->lock); ret = kernel_consumer_send_channel_stream(socket, - channel, ksess); + channel, ksess, + session->output_traces ? 1 : 0); pthread_mutex_unlock(socket->lock); if (ret < 0) { rcu_read_unlock(); @@ -1334,6 +1335,91 @@ error: return ret; } +/* + * Sanitize the wait queue of the dispatch registration thread meaning removing + * invalid nodes from it. This is to avoid memory leaks for the case the UST + * notify socket is never received. + */ +static void sanitize_wait_queue(struct ust_reg_wait_queue *wait_queue) +{ + int ret, nb_fd = 0, i; + unsigned int fd_added = 0; + struct lttng_poll_event events; + struct ust_reg_wait_node *wait_node = NULL, *tmp_wait_node; + + assert(wait_queue); + + lttng_poll_init(&events); + + /* Just skip everything for an empty queue. */ + if (!wait_queue->count) { + goto end; + } + + ret = lttng_poll_create(&events, wait_queue->count, LTTNG_CLOEXEC); + if (ret < 0) { + goto error_create; + } + + cds_list_for_each_entry_safe(wait_node, tmp_wait_node, + &wait_queue->head, head) { + assert(wait_node->app); + ret = lttng_poll_add(&events, wait_node->app->sock, + LPOLLHUP | LPOLLERR); + if (ret < 0) { + goto error; + } + + fd_added = 1; + } + + if (!fd_added) { + goto end; + } + + /* + * Poll but don't block so we can quickly identify the faulty events and + * clean them afterwards from the wait queue. + */ + ret = lttng_poll_wait(&events, 0); + if (ret < 0) { + goto error; + } + nb_fd = ret; + + for (i = 0; i < nb_fd; i++) { + /* Get faulty FD. */ + uint32_t revents = LTTNG_POLL_GETEV(&events, i); + int pollfd = LTTNG_POLL_GETFD(&events, i); + + cds_list_for_each_entry_safe(wait_node, tmp_wait_node, + &wait_queue->head, head) { + if (pollfd == wait_node->app->sock && + (revents & (LPOLLHUP | LPOLLERR))) { + cds_list_del(&wait_node->head); + wait_queue->count--; + ust_app_destroy(wait_node->app); + free(wait_node); + break; + } + } + } + + if (nb_fd > 0) { + DBG("Wait queue sanitized, %d node were cleaned up", nb_fd); + } + +end: + lttng_poll_clean(&events); + return; + +error: + lttng_poll_clean(&events); +error_create: + ERR("Unable to sanitize wait queue"); + return; +} + /* * Dispatch request from the registration threads to the application * communication thread. @@ -1343,16 +1429,16 @@ static void *thread_dispatch_ust_registration(void *data) int ret, err = -1; struct cds_wfq_node *node; struct ust_command *ust_cmd = NULL; - struct { - struct ust_app *app; - struct cds_list_head head; - } *wait_node = NULL, *tmp_wait_node; + struct ust_reg_wait_node *wait_node = NULL, *tmp_wait_node; + struct ust_reg_wait_queue wait_queue = { + .count = 0, + }; health_register(HEALTH_TYPE_APP_REG_DISPATCH); health_code_update(); - CDS_LIST_HEAD(wait_queue); + CDS_INIT_LIST_HEAD(&wait_queue.head); DBG("[thread] Dispatch UST command started"); @@ -1366,6 +1452,13 @@ static void *thread_dispatch_ust_registration(void *data) struct ust_app *app = NULL; ust_cmd = NULL; + /* + * Make sure we don't have node(s) that have hung up before receiving + * the notify socket. This is to clean the list in order to avoid + * memory leaks from notify socket that are never seen. + */ + sanitize_wait_queue(&wait_queue); + health_code_update(); /* Dequeue command for registration */ node = cds_wfq_dequeue_blocking(&ust_cmd_queue.queue); @@ -1415,7 +1508,8 @@ static void *thread_dispatch_ust_registration(void *data) * Add application to the wait queue so we can set the notify * socket before putting this object in the global ht. */ - cds_list_add(&wait_node->head, &wait_queue); + cds_list_add(&wait_node->head, &wait_queue.head); + wait_queue.count++; free(ust_cmd); /* @@ -1430,11 +1524,12 @@ static void *thread_dispatch_ust_registration(void *data) * notify socket if found. */ cds_list_for_each_entry_safe(wait_node, tmp_wait_node, - &wait_queue, head) { + &wait_queue.head, head) { health_code_update(); if (wait_node->app->pid == ust_cmd->reg_msg.pid) { wait_node->app->notify_sock = ust_cmd->sock; cds_list_del(&wait_node->head); + wait_queue.count--; app = wait_node->app; free(wait_node); DBG3("UST app notify socket %d is set", ust_cmd->sock); @@ -1529,8 +1624,9 @@ static void *thread_dispatch_ust_registration(void *data) error: /* Clean up wait queue. */ cds_list_for_each_entry_safe(wait_node, tmp_wait_node, - &wait_queue, head) { + &wait_queue.head, head) { cds_list_del(&wait_node->head); + wait_queue.count--; free(wait_node); } @@ -2282,6 +2378,7 @@ static int create_ust_session(struct ltt_session *session, lus->uid = session->uid; lus->gid = session->gid; + lus->output_traces = session->output_traces; session->ust_session = lus; /* Copy session output to the newly created UST session */ @@ -2338,6 +2435,7 @@ static int create_kernel_session(struct ltt_session *session) session->kernel_session->uid = session->uid; session->kernel_session->gid = session->gid; + session->kernel_session->output_traces = session->output_traces; return LTTNG_OK; @@ -2399,6 +2497,10 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock, case LTTNG_START_TRACE: case LTTNG_STOP_TRACE: case LTTNG_DATA_PENDING: + case LTTNG_SNAPSHOT_ADD_OUTPUT: + case LTTNG_SNAPSHOT_DEL_OUTPUT: + case LTTNG_SNAPSHOT_LIST_OUTPUT: + case LTTNG_SNAPSHOT_RECORD: need_domain = 0; break; default: @@ -2467,12 +2569,7 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock, session_lock_list(); cmd_ctx->session = session_find_by_name(cmd_ctx->lsm->session.name); if (cmd_ctx->session == NULL) { - if (cmd_ctx->lsm->session.name != NULL) { - ret = LTTNG_ERR_SESS_NOT_FOUND; - } else { - /* If no session name specified */ - ret = LTTNG_ERR_SELECT_SESS; - } + ret = LTTNG_ERR_SESS_NOT_FOUND; goto error; } else { /* Acquire lock for the session */ @@ -3086,6 +3183,67 @@ skip_domain: ret = cmd_data_pending(cmd_ctx->session); break; } + case LTTNG_SNAPSHOT_ADD_OUTPUT: + { + struct lttcomm_lttng_output_id reply; + + ret = cmd_snapshot_add_output(cmd_ctx->session, + &cmd_ctx->lsm->u.snapshot_output.output, &reply.id); + if (ret != LTTNG_OK) { + goto error; + } + + ret = setup_lttng_msg(cmd_ctx, sizeof(reply)); + if (ret < 0) { + goto setup_error; + } + + /* Copy output list into message payload */ + memcpy(cmd_ctx->llm->payload, &reply, sizeof(reply)); + ret = LTTNG_OK; + break; + } + case LTTNG_SNAPSHOT_DEL_OUTPUT: + { + ret = cmd_snapshot_del_output(cmd_ctx->session, + &cmd_ctx->lsm->u.snapshot_output.output); + break; + } + case LTTNG_SNAPSHOT_LIST_OUTPUT: + { + ssize_t nb_output; + struct lttng_snapshot_output *outputs = NULL; + + nb_output = cmd_snapshot_list_outputs(cmd_ctx->session, &outputs); + if (nb_output < 0) { + ret = -nb_output; + goto error; + } + + ret = setup_lttng_msg(cmd_ctx, + nb_output * sizeof(struct lttng_snapshot_output)); + if (ret < 0) { + free(outputs); + goto setup_error; + } + + if (outputs) { + /* Copy output list into message payload */ + memcpy(cmd_ctx->llm->payload, outputs, + nb_output * sizeof(struct lttng_snapshot_output)); + free(outputs); + } + + ret = LTTNG_OK; + break; + } + case LTTNG_SNAPSHOT_RECORD: + { + ret = cmd_snapshot_record(cmd_ctx->session, + &cmd_ctx->lsm->u.snapshot_record.output, + cmd_ctx->lsm->u.snapshot_record.wait); + break; + } default: ret = LTTNG_ERR_UND; break; @@ -4166,7 +4324,7 @@ int main(int argc, char **argv) DBG2("Kernel consumer cmd path: %s", kconsumer_data.cmd_unix_sock_path); } else { - home_path = get_home_dir(); + home_path = utils_get_home_dir(); if (home_path == NULL) { /* TODO: Add --socket PATH option */ ERR("Can't get HOME directory for sockets creation.");