X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fmain.c;h=f7bb53ef7ead038e479964b6e11bb2259eeb1cd7;hp=d999928feb8a0f06bfe24857fc3182f56c4f903f;hb=5d2e1e66a968d9e555f9b8b00d0589ebfaf3de32;hpb=927ca06aed61ff6dd3f64ae71854f2d7f9acebe5 diff --git a/src/bin/lttng-sessiond/main.c b/src/bin/lttng-sessiond/main.c index d999928fe..f7bb53ef7 100644 --- a/src/bin/lttng-sessiond/main.c +++ b/src/bin/lttng-sessiond/main.c @@ -37,7 +37,6 @@ #include #include -#include #include #include #include @@ -61,14 +60,12 @@ #include "fd-limit.h" #include "health.h" #include "testpoint.h" +#include "ust-thread.h" #define CONSUMERD_FILE "lttng-consumerd" /* Const values */ -const char default_home_dir[] = DEFAULT_HOME_DIR; const char default_tracing_group[] = DEFAULT_TRACING_GROUP; -const char default_ust_sock_dir[] = DEFAULT_UST_SOCK_DIR; -const char default_global_apps_pipe[] = DEFAULT_GLOBAL_APPS_PIPE; const char *progname; const char *opt_tracing_group; @@ -149,8 +146,11 @@ static int thread_quit_pipe[2] = { -1, -1 }; */ static int apps_cmd_pipe[2] = { -1, -1 }; +int apps_cmd_notify_pipe[2] = { -1, -1 }; + /* Pthread, Mutexes and Semaphores */ static pthread_t apps_thread; +static pthread_t apps_notify_thread; static pthread_t reg_apps_thread; static pthread_t client_thread; static pthread_t kernel_thread; @@ -222,12 +222,6 @@ enum consumerd_state { static enum consumerd_state ust_consumerd_state; static enum consumerd_state kernel_consumerd_state; -/* Used for the health monitoring of the session daemon. See health.h */ -struct health_state health_thread_cmd; -struct health_state health_thread_app_manage; -struct health_state health_thread_app_reg; -struct health_state health_thread_kernel; - /* * Socket timeout for receiving and sending in seconds. */ @@ -285,15 +279,11 @@ void setup_consumerd_path(void) /* * Create a poll set with O_CLOEXEC and add the thread quit pipe to the set. */ -static int create_thread_poll_set(struct lttng_poll_event *events, - unsigned int size) +int sessiond_set_thread_pollset(struct lttng_poll_event *events, size_t size) { int ret; - if (events == NULL || size == 0) { - ret = -1; - goto error; - } + assert(events); ret = lttng_poll_create(events, size, LTTNG_CLOEXEC); if (ret < 0) { @@ -301,7 +291,7 @@ static int create_thread_poll_set(struct lttng_poll_event *events, } /* Add quit pipe */ - ret = lttng_poll_add(events, thread_quit_pipe[0], LPOLLIN); + ret = lttng_poll_add(events, thread_quit_pipe[0], LPOLLIN | LPOLLERR); if (ret < 0) { goto error; } @@ -317,7 +307,7 @@ error: * * Return 1 if it was triggered else 0; */ -static int check_thread_quit_pipe(int fd, uint32_t events) +int sessiond_check_thread_quit_pipe(int fd, uint32_t events) { if (fd == thread_quit_pipe[0] && (events & LPOLLIN)) { return 1; @@ -670,14 +660,13 @@ error: } /* - * For each tracing session, update newly registered apps. + * For each tracing session, update newly registered apps. The session list + * lock MUST be acquired before calling this. */ static void update_ust_app(int app_sock) { struct ltt_session *sess, *stmp; - session_lock_list(); - /* For all tracing session(s) */ cds_list_for_each_entry_safe(sess, stmp, &session_list_ptr->head, list) { session_lock(sess); @@ -686,8 +675,6 @@ static void update_ust_app(int app_sock) } session_unlock(sess); } - - session_unlock_list(); } /* @@ -717,20 +704,20 @@ static void *thread_manage_kernel(void *data) goto error_testpoint; } - health_code_update(&health_thread_kernel); + health_code_update(); if (testpoint(thread_manage_kernel_before_loop)) { goto error_testpoint; } while (1) { - health_code_update(&health_thread_kernel); + health_code_update(); if (update_poll_flag == 1) { /* Clean events object. We are about to populate it again. */ lttng_poll_clean(&events); - ret = create_thread_poll_set(&events, 2); + ret = sessiond_set_thread_pollset(&events, 2); if (ret < 0) { goto error_poll_create; } @@ -752,9 +739,9 @@ static void *thread_manage_kernel(void *data) /* Poll infinite value of time */ restart: - health_poll_update(&health_thread_kernel); + health_poll_entry(); ret = lttng_poll_wait(&events, -1); - health_poll_update(&health_thread_kernel); + health_poll_exit(); if (ret < 0) { /* * Restart interrupted system call. @@ -777,10 +764,10 @@ static void *thread_manage_kernel(void *data) revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); - health_code_update(&health_thread_kernel); + health_code_update(); /* Thread quit pipe has been closed. Killing thread. */ - ret = check_thread_quit_pipe(pollfd, revents); + ret = sessiond_check_thread_quit_pipe(pollfd, revents); if (ret) { err = 0; goto exit; @@ -825,7 +812,7 @@ error_testpoint: utils_close_pipe(kernel_poll_pipe); kernel_poll_pipe[0] = kernel_poll_pipe[1] = -1; if (err) { - health_error(&health_thread_kernel); + health_error(); ERR("Health error occurred in %s", __func__); WARN("Kernel thread died unexpectedly. " "Kernel tracing can continue but CPU hotplug is disabled."); @@ -873,30 +860,13 @@ static void *thread_manage_consumer(void *data) health_register(HEALTH_TYPE_CONSUMER); - /* - * Since the consumer thread can be spawned at any moment in time, we init - * the health to a poll status (1, which is a valid health over time). - * When the thread starts, we update here the health to a "code" path being - * an even value so this thread, when reaching a poll wait, does not - * trigger an error with an even value. - * - * Here is the use case we avoid. - * - * +1: the first poll update during initialization (main()) - * +2 * x: multiple code update once in this thread. - * +1: poll wait in this thread (being a good health state). - * == even number which after the wait period shows as a bad health. - * - * In a nutshell, the following poll update to the health state brings back - * the state to an even value meaning a code path. - */ - health_poll_update(&consumer_data->health); + health_code_update(); /* * Pass 2 as size here for the thread quit pipe and kconsumerd_err_sock. * Nothing more will be added to this poll set. */ - ret = create_thread_poll_set(&events, 2); + ret = sessiond_set_thread_pollset(&events, 2); if (ret < 0) { goto error_poll; } @@ -911,18 +881,18 @@ static void *thread_manage_consumer(void *data) goto error; } - health_code_update(&consumer_data->health); + health_code_update(); /* Inifinite blocking call, waiting for transmission */ restart: - health_poll_update(&consumer_data->health); + health_poll_entry(); if (testpoint(thread_manage_consumer)) { goto error; } ret = lttng_poll_wait(&events, -1); - health_poll_update(&consumer_data->health); + health_poll_exit(); if (ret < 0) { /* * Restart interrupted system call. @@ -940,10 +910,10 @@ restart: revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); - health_code_update(&consumer_data->health); + health_code_update(); /* Thread quit pipe has been closed. Killing thread. */ - ret = check_thread_quit_pipe(pollfd, revents); + ret = sessiond_check_thread_quit_pipe(pollfd, revents); if (ret) { err = 0; goto exit; @@ -969,7 +939,7 @@ restart: */ (void) utils_set_fd_cloexec(sock); - health_code_update(&consumer_data->health); + health_code_update(); DBG2("Receiving code from consumer err_sock"); @@ -980,7 +950,7 @@ restart: goto error; } - health_code_update(&consumer_data->health); + health_code_update(); if (code == LTTCOMM_CONSUMERD_COMMAND_SOCK_READY) { consumer_data->cmd_sock = @@ -1010,13 +980,13 @@ restart: goto error; } - health_code_update(&consumer_data->health); + health_code_update(); /* Inifinite blocking call, waiting for transmission */ restart_poll: - health_poll_update(&consumer_data->health); + health_poll_entry(); ret = lttng_poll_wait(&events, -1); - health_poll_update(&consumer_data->health); + health_poll_exit(); if (ret < 0) { /* * Restart interrupted system call. @@ -1034,10 +1004,10 @@ restart_poll: revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); - health_code_update(&consumer_data->health); + health_code_update(); /* Thread quit pipe has been closed. Killing thread. */ - ret = check_thread_quit_pipe(pollfd, revents); + ret = sessiond_check_thread_quit_pipe(pollfd, revents); if (ret) { err = 0; goto exit; @@ -1052,7 +1022,7 @@ restart_poll: } } - health_code_update(&consumer_data->health); + health_code_update(); /* Wait for any kconsumerd error */ ret = lttcomm_recv_unix_sock(sock, &code, @@ -1103,7 +1073,7 @@ error: lttng_poll_clean(&events); error_poll: if (err) { - health_error(&consumer_data->health); + health_error(); ERR("Health error occurred in %s", __func__); } health_unregister(); @@ -1119,7 +1089,6 @@ static void *thread_manage_apps(void *data) { int i, ret, pollfd, err = -1; uint32_t revents, nb_fd; - struct ust_command ust_cmd; struct lttng_poll_event events; DBG("[thread] Manage application started"); @@ -1133,9 +1102,9 @@ static void *thread_manage_apps(void *data) goto error_testpoint; } - health_code_update(&health_thread_app_manage); + health_code_update(); - ret = create_thread_poll_set(&events, 2); + ret = sessiond_set_thread_pollset(&events, 2); if (ret < 0) { goto error_poll_create; } @@ -1149,16 +1118,16 @@ static void *thread_manage_apps(void *data) goto error; } - health_code_update(&health_thread_app_manage); + health_code_update(); while (1) { DBG("Apps thread polling on %d fds", LTTNG_POLL_GETNB(&events)); /* Inifinite blocking call, waiting for transmission */ restart: - health_poll_update(&health_thread_app_manage); + health_poll_entry(); ret = lttng_poll_wait(&events, -1); - health_poll_update(&health_thread_app_manage); + health_poll_exit(); if (ret < 0) { /* * Restart interrupted system call. @@ -1176,10 +1145,10 @@ static void *thread_manage_apps(void *data) revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); - health_code_update(&health_thread_app_manage); + health_code_update(); /* Thread quit pipe has been closed. Killing thread. */ - ret = check_thread_quit_pipe(pollfd, revents); + ret = sessiond_check_thread_quit_pipe(pollfd, revents); if (ret) { err = 0; goto exit; @@ -1191,72 +1160,39 @@ static void *thread_manage_apps(void *data) ERR("Apps command pipe error"); goto error; } else if (revents & LPOLLIN) { + int sock; + /* Empty pipe */ do { - ret = read(apps_cmd_pipe[0], &ust_cmd, sizeof(ust_cmd)); + ret = read(apps_cmd_pipe[0], &sock, sizeof(sock)); } while (ret < 0 && errno == EINTR); - if (ret < 0 || ret < sizeof(ust_cmd)) { + if (ret < 0 || ret < sizeof(sock)) { PERROR("read apps cmd pipe"); goto error; } - health_code_update(&health_thread_app_manage); - - /* Register applicaton to the session daemon */ - ret = ust_app_register(&ust_cmd.reg_msg, - ust_cmd.sock); - if (ret == -ENOMEM) { - goto error; - } else if (ret < 0) { - break; - } - - health_code_update(&health_thread_app_manage); + health_code_update(); /* - * Validate UST version compatibility. + * We only monitor the error events of the socket. This + * thread does not handle any incoming data from UST + * (POLLIN). */ - ret = ust_app_validate_version(ust_cmd.sock); - if (ret >= 0) { - /* - * Add channel(s) and event(s) to newly registered apps - * from lttng global UST domain. - */ - update_ust_app(ust_cmd.sock); - } - - health_code_update(&health_thread_app_manage); - - ret = ust_app_register_done(ust_cmd.sock); + ret = lttng_poll_add(&events, sock, + LPOLLERR | LPOLLHUP | LPOLLRDHUP); if (ret < 0) { - /* - * If the registration is not possible, we simply - * unregister the apps and continue - */ - ust_app_unregister(ust_cmd.sock); - } else { - /* - * We only monitor the error events of the socket. This - * thread does not handle any incoming data from UST - * (POLLIN). - */ - ret = lttng_poll_add(&events, ust_cmd.sock, - LPOLLERR & LPOLLHUP & LPOLLRDHUP); - if (ret < 0) { - goto error; - } + goto error; + } - /* Set socket timeout for both receiving and ending */ - (void) lttcomm_setsockopt_rcv_timeout(ust_cmd.sock, - app_socket_timeout); - (void) lttcomm_setsockopt_snd_timeout(ust_cmd.sock, - app_socket_timeout); + /* Set socket timeout for both receiving and ending */ + (void) lttcomm_setsockopt_rcv_timeout(sock, + app_socket_timeout); + (void) lttcomm_setsockopt_snd_timeout(sock, + app_socket_timeout); - DBG("Apps with sock %d added to poll set", - ust_cmd.sock); - } + DBG("Apps with sock %d added to poll set", sock); - health_code_update(&health_thread_app_manage); + health_code_update(); break; } @@ -1278,7 +1214,7 @@ static void *thread_manage_apps(void *data) } } - health_code_update(&health_thread_app_manage); + health_code_update(); } } @@ -1297,7 +1233,7 @@ error_testpoint: */ if (err) { - health_error(&health_thread_app_manage); + health_error(); ERR("Health error occurred in %s", __func__); } health_unregister(); @@ -1307,6 +1243,38 @@ error_testpoint: return NULL; } +/* + * Send a socket to a thread This is called from the dispatch UST registration + * thread once all sockets are set for the application. + * + * On success, return 0 else a negative value being the errno message of the + * write(). + */ +static int send_socket_to_thread(int fd, int sock) +{ + int ret; + + /* Sockets MUST be set or else this should not have been called. */ + assert(fd >= 0); + assert(sock >= 0); + + do { + ret = write(fd, &sock, sizeof(sock)); + } while (ret < 0 && errno == EINTR); + if (ret < 0 || ret != sizeof(sock)) { + PERROR("write apps pipe %d", fd); + if (ret < 0) { + ret = -errno; + } + goto error; + } + + /* All good. Don't send back the write positive ret value. */ + ret = 0; +error: + return ret; +} + /* * Dispatch request from the registration threads to the application * communication thread. @@ -1316,6 +1284,12 @@ static void *thread_dispatch_ust_registration(void *data) int ret; struct cds_wfq_node *node; struct ust_command *ust_cmd = NULL; + struct { + struct ust_app *app; + struct cds_list_head head; + } *wait_node = NULL, *tmp_wait_node; + + CDS_LIST_HEAD(wait_queue); DBG("[thread] Dispatch UST command started"); @@ -1324,6 +1298,8 @@ static void *thread_dispatch_ust_registration(void *data) futex_nto1_prepare(&ust_cmd_queue.futex); do { + struct ust_app *app = NULL; + /* Dequeue command for registration */ node = cds_wfq_dequeue_blocking(&ust_cmd_queue.queue); if (node == NULL) { @@ -1340,34 +1316,122 @@ static void *thread_dispatch_ust_registration(void *data) ust_cmd->reg_msg.uid, ust_cmd->reg_msg.gid, ust_cmd->sock, ust_cmd->reg_msg.name, ust_cmd->reg_msg.major, ust_cmd->reg_msg.minor); - /* - * Inform apps thread of the new application registration. This - * call is blocking so we can be assured that the data will be read - * at some point in time or wait to the end of the world :) - */ - if (apps_cmd_pipe[1] >= 0) { - do { - ret = write(apps_cmd_pipe[1], ust_cmd, - sizeof(struct ust_command)); - } while (ret < 0 && errno == EINTR); - if (ret < 0 || ret != sizeof(struct ust_command)) { - PERROR("write apps cmd pipe"); - if (errno == EBADF) { - /* - * We can't inform the application thread to process - * registration. We will exit or else application - * registration will not occur and tracing will never - * start. - */ - goto error; + + if (ust_cmd->reg_msg.type == USTCTL_SOCKET_CMD) { + wait_node = zmalloc(sizeof(*wait_node)); + if (!wait_node) { + PERROR("zmalloc wait_node dispatch"); + goto error; + } + CDS_INIT_LIST_HEAD(&wait_node->head); + + /* Create application object if socket is CMD. */ + wait_node->app = ust_app_create(&ust_cmd->reg_msg, + ust_cmd->sock); + if (!wait_node->app) { + ret = close(ust_cmd->sock); + if (ret < 0) { + PERROR("close ust sock dispatch %d", ust_cmd->sock); } + lttng_fd_put(1, LTTNG_FD_APPS); + free(wait_node); + continue; + } + /* + * Add application to the wait queue so we can set the notify + * socket before putting this object in the global ht. + */ + cds_list_add(&wait_node->head, &wait_queue); + + /* + * We have to continue here since we don't have the notify + * socket and the application MUST be added to the hash table + * only at that moment. + */ + continue; + } else { + /* + * Look for the application in the local wait queue and set the + * notify socket if found. + */ + cds_list_for_each_entry_safe(wait_node, tmp_wait_node, + &wait_queue, head) { + if (wait_node->app->pid == ust_cmd->reg_msg.pid) { + wait_node->app->notify_sock = ust_cmd->sock; + cds_list_del(&wait_node->head); + app = wait_node->app; + free(wait_node); + DBG3("UST app notify socket %d is set", ust_cmd->sock); + break; + } + } + } + + if (app) { + /* + * @session_lock_list + * + * Lock the global session list so from the register up to the + * registration done message, no thread can see the application + * and change its state. + */ + session_lock_list(); + rcu_read_lock(); + + /* + * Add application to the global hash table. This needs to be + * done before the update to the UST registry can locate the + * application. + */ + ust_app_add(app); + + /* Set app version. This call will print an error if needed. */ + (void) ust_app_version(app); + + /* Send notify socket through the notify pipe. */ + ret = send_socket_to_thread(apps_cmd_notify_pipe[1], + app->notify_sock); + if (ret < 0) { + rcu_read_unlock(); + session_unlock_list(); + /* No notify thread, stop the UST tracing. */ + goto error; + } + + /* + * Update newly registered application with the tracing + * registry info already enabled information. + */ + update_ust_app(app->sock); + + /* + * Don't care about return value. Let the manage apps threads + * handle app unregistration upon socket close. + */ + (void) ust_app_register_done(app->sock); + + /* + * Even if the application socket has been closed, send the app + * to the thread and unregistration will take place at that + * place. + */ + ret = send_socket_to_thread(apps_cmd_pipe[1], app->sock); + if (ret < 0) { + rcu_read_unlock(); + session_unlock_list(); + /* No apps. thread, stop the UST tracing. */ + goto error; } + + rcu_read_unlock(); + session_unlock_list(); } else { - /* Application manager thread is not available. */ + /* Application manager threads are not available. */ ret = close(ust_cmd->sock); if (ret < 0) { PERROR("close ust_cmd sock"); } + lttng_fd_put(1, LTTNG_FD_APPS); } free(ust_cmd); } while (node != NULL); @@ -1377,6 +1441,13 @@ static void *thread_dispatch_ust_registration(void *data) } error: + /* Clean up wait queue. */ + cds_list_for_each_entry_safe(wait_node, tmp_wait_node, + &wait_queue, head) { + cds_list_del(&wait_node->head); + free(wait_node); + } + DBG("Dispatch thread dying"); return NULL; } @@ -1412,7 +1483,7 @@ static void *thread_registration_apps(void *data) * Pass 2 as size here for the thread quit pipe and apps socket. Nothing * more will be added to this poll set. */ - ret = create_thread_poll_set(&events, 2); + ret = sessiond_set_thread_pollset(&events, 2); if (ret < 0) { goto error_create_poll; } @@ -1436,9 +1507,9 @@ static void *thread_registration_apps(void *data) /* Inifinite blocking call, waiting for transmission */ restart: - health_poll_update(&health_thread_app_reg); + health_poll_entry(); ret = lttng_poll_wait(&events, -1); - health_poll_update(&health_thread_app_reg); + health_poll_exit(); if (ret < 0) { /* * Restart interrupted system call. @@ -1452,14 +1523,14 @@ static void *thread_registration_apps(void *data) nb_fd = ret; for (i = 0; i < nb_fd; i++) { - health_code_update(&health_thread_app_reg); + health_code_update(); /* Fetch once the poll data */ revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); /* Thread quit pipe has been closed. Killing thread. */ - ret = check_thread_quit_pipe(pollfd, revents); + ret = sessiond_check_thread_quit_pipe(pollfd, revents); if (ret) { err = 0; goto exit; @@ -1504,16 +1575,12 @@ static void *thread_registration_apps(void *data) sock = -1; continue; } - health_code_update(&health_thread_app_reg); - ret = lttcomm_recv_unix_sock(sock, &ust_cmd->reg_msg, - sizeof(struct ust_register_msg)); - if (ret < 0 || ret < sizeof(struct ust_register_msg)) { - if (ret < 0) { - PERROR("lttcomm_recv_unix_sock register apps"); - } else { - ERR("Wrong size received on apps register"); - } + + health_code_update(); + ret = ust_app_recv_registration(sock, &ust_cmd->reg_msg); + if (ret < 0) { free(ust_cmd); + /* Close socket of the application. */ ret = close(sock); if (ret) { PERROR("close"); @@ -1522,7 +1589,7 @@ static void *thread_registration_apps(void *data) sock = -1; continue; } - health_code_update(&health_thread_app_reg); + health_code_update(); ust_cmd->sock = sock; sock = -1; @@ -1553,7 +1620,7 @@ static void *thread_registration_apps(void *data) exit: error: if (err) { - health_error(&health_thread_app_reg); + health_error(); ERR("Health error occurred in %s", __func__); } @@ -2357,8 +2424,7 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock, /* Start the kernel consumer daemon */ pthread_mutex_lock(&kconsumer_data.pid_mutex); if (kconsumer_data.pid == 0 && - cmd_ctx->lsm->cmd_type != LTTNG_REGISTER_CONSUMER && - cmd_ctx->session->start_consumer) { + cmd_ctx->lsm->cmd_type != LTTNG_REGISTER_CONSUMER) { pthread_mutex_unlock(&kconsumer_data.pid_mutex); ret = start_consumerd(&kconsumer_data); if (ret < 0) { @@ -2405,8 +2471,7 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock, pthread_mutex_lock(&ustconsumer64_data.pid_mutex); if (consumerd64_bin[0] != '\0' && ustconsumer64_data.pid == 0 && - cmd_ctx->lsm->cmd_type != LTTNG_REGISTER_CONSUMER && - cmd_ctx->session->start_consumer) { + cmd_ctx->lsm->cmd_type != LTTNG_REGISTER_CONSUMER) { pthread_mutex_unlock(&ustconsumer64_data.pid_mutex); ret = start_consumerd(&ustconsumer64_data); if (ret < 0) { @@ -2434,8 +2499,7 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock, /* 32-bit */ if (consumerd32_bin[0] != '\0' && ustconsumer32_data.pid == 0 && - cmd_ctx->lsm->cmd_type != LTTNG_REGISTER_CONSUMER && - cmd_ctx->session->start_consumer) { + cmd_ctx->lsm->cmd_type != LTTNG_REGISTER_CONSUMER) { pthread_mutex_unlock(&ustconsumer32_data.pid_mutex); ret = start_consumerd(&ustconsumer32_data); if (ret < 0) { @@ -2499,6 +2563,21 @@ skip_domain: } } + /* + * Send relayd information to consumer as soon as we have a domain and a + * session defined. + */ + if (cmd_ctx->session && need_domain) { + /* + * Setup relayd if not done yet. If the relayd information was already + * sent to the consumer, this call will gracefully return. + */ + ret = cmd_setup_relayd(cmd_ctx->session); + if (ret != LTTNG_OK) { + goto error; + } + } + /* Process by command type */ switch (cmd_ctx->lsm->cmd_type) { case LTTNG_ADD_CONTEXT: @@ -2529,41 +2608,12 @@ skip_domain: cmd_ctx->lsm->u.disable.channel_name); break; } - case LTTNG_DISABLE_CONSUMER: - { - ret = cmd_disable_consumer(cmd_ctx->lsm->domain.type, cmd_ctx->session); - break; - } case LTTNG_ENABLE_CHANNEL: { ret = cmd_enable_channel(cmd_ctx->session, cmd_ctx->lsm->domain.type, &cmd_ctx->lsm->u.channel.chan, kernel_poll_pipe[1]); break; } - case LTTNG_ENABLE_CONSUMER: - { - /* - * XXX: 0 means that this URI should be applied on the session. Should - * be a DOMAIN enuam. - */ - ret = cmd_enable_consumer(cmd_ctx->lsm->domain.type, cmd_ctx->session); - if (ret != LTTNG_OK) { - goto error; - } - - if (cmd_ctx->lsm->domain.type == 0) { - /* Add the URI for the UST session if a consumer is present. */ - if (cmd_ctx->session->ust_session && - cmd_ctx->session->ust_session->consumer) { - ret = cmd_enable_consumer(LTTNG_DOMAIN_UST, cmd_ctx->session); - } else if (cmd_ctx->session->kernel_session && - cmd_ctx->session->kernel_session->consumer) { - ret = cmd_enable_consumer(LTTNG_DOMAIN_KERNEL, - cmd_ctx->session); - } - } - break; - } case LTTNG_ENABLE_EVENT: { ret = cmd_enable_event(cmd_ctx->session, cmd_ctx->lsm->domain.type, @@ -2997,7 +3047,7 @@ static void *thread_manage_health(void *data) * Pass 2 as size here for the thread quit pipe and client_sock. Nothing * more will be added to this poll set. */ - ret = create_thread_poll_set(&events, 2); + ret = sessiond_set_thread_pollset(&events, 2); if (ret < 0) { goto error; } @@ -3032,7 +3082,7 @@ restart: pollfd = LTTNG_POLL_GETFD(&events, i); /* Thread quit pipe has been closed. Killing thread. */ - ret = check_thread_quit_pipe(pollfd, revents); + ret = sessiond_check_thread_quit_pipe(pollfd, revents); if (ret) { err = 0; goto exit; @@ -3174,7 +3224,7 @@ static void *thread_manage_clients(void *data) goto error_testpoint; } - health_code_update(&health_thread_cmd); + health_code_update(); ret = lttcomm_listen_unix_sock(client_sock); if (ret < 0) { @@ -3185,7 +3235,7 @@ static void *thread_manage_clients(void *data) * Pass 2 as size here for the thread quit pipe and client_sock. Nothing * more will be added to this poll set. */ - ret = create_thread_poll_set(&events, 2); + ret = sessiond_set_thread_pollset(&events, 2); if (ret < 0) { goto error_create_poll; } @@ -3207,16 +3257,16 @@ static void *thread_manage_clients(void *data) goto error; } - health_code_update(&health_thread_cmd); + health_code_update(); while (1) { DBG("Accepting client command ..."); /* Inifinite blocking call, waiting for transmission */ restart: - health_poll_update(&health_thread_cmd); + health_poll_entry(); ret = lttng_poll_wait(&events, -1); - health_poll_update(&health_thread_cmd); + health_poll_exit(); if (ret < 0) { /* * Restart interrupted system call. @@ -3234,10 +3284,10 @@ static void *thread_manage_clients(void *data) revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); - health_code_update(&health_thread_cmd); + health_code_update(); /* Thread quit pipe has been closed. Killing thread. */ - ret = check_thread_quit_pipe(pollfd, revents); + ret = sessiond_check_thread_quit_pipe(pollfd, revents); if (ret) { err = 0; goto exit; @@ -3254,7 +3304,7 @@ static void *thread_manage_clients(void *data) DBG("Wait for client response"); - health_code_update(&health_thread_cmd); + health_code_update(); sock = lttcomm_accept_unix_sock(client_sock); if (sock < 0) { @@ -3290,7 +3340,7 @@ static void *thread_manage_clients(void *data) cmd_ctx->llm = NULL; cmd_ctx->session = NULL; - health_code_update(&health_thread_cmd); + health_code_update(); /* * Data is received from the lttng client. The struct @@ -3311,7 +3361,7 @@ static void *thread_manage_clients(void *data) continue; } - health_code_update(&health_thread_cmd); + health_code_update(); // TODO: Validate cmd_ctx including sanity check for // security purpose. @@ -3344,7 +3394,7 @@ static void *thread_manage_clients(void *data) continue; } - health_code_update(&health_thread_cmd); + health_code_update(); DBG("Sending response (size: %d, retcode: %s)", cmd_ctx->lttng_msg_size, @@ -3363,7 +3413,7 @@ static void *thread_manage_clients(void *data) clean_command_ctx(&cmd_ctx); - health_code_update(&health_thread_cmd); + health_code_update(); } exit: @@ -3390,7 +3440,7 @@ error_testpoint: } if (err) { - health_error(&health_thread_cmd); + health_error(); ERR("Health error occurred in %s", __func__); } @@ -4037,7 +4087,7 @@ int main(int argc, char **argv) /* Set global SHM for ust */ if (strlen(wait_shm_path) == 0) { snprintf(wait_shm_path, PATH_MAX, - DEFAULT_HOME_APPS_WAIT_SHM_PATH, geteuid()); + DEFAULT_HOME_APPS_WAIT_SHM_PATH, getuid()); } /* Set health check Unix path */ @@ -4053,6 +4103,7 @@ int main(int argc, char **argv) DBG("Client socket path %s", client_unix_sock_path); DBG("Application socket path %s", apps_unix_sock_path); + DBG("Application wait path %s", wait_shm_path); DBG("LTTng run directory path: %s", rundir); /* 32 bits consumerd path setup */ @@ -4161,6 +4212,11 @@ int main(int argc, char **argv) goto exit; } + /* Setup the thread apps notify communication pipe. */ + if (utils_create_pipe_cloexec(apps_cmd_notify_pipe) < 0) { + goto exit; + } + /* Init UST command queue. */ cds_wfq_init(&ust_cmd_queue.queue); @@ -4225,6 +4281,14 @@ int main(int argc, char **argv) goto exit_apps; } + /* Create thread to manage application notify socket */ + ret = pthread_create(&apps_notify_thread, NULL, + ust_thread_manage_notify, (void *) NULL); + if (ret != 0) { + PERROR("pthread_create apps"); + goto exit_apps; + } + /* Don't start this thread if kernel tracing is not requested nor root */ if (is_root && !opt_no_kernel) { /* Create kernel thread to manage kernel event */