X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fmain.c;h=4b6a39dd5ecea5ef176139e7da2c6058f5884094;hp=2abf9d0e79b5015b015f03c23bf5d5ee73092137;hb=e8209f6b352b3aa279d8d452e396adef6f7159c7;hpb=fceb65dfca20fb1e2071a44ada9fe61384d2b890 diff --git a/src/bin/lttng-sessiond/main.c b/src/bin/lttng-sessiond/main.c index 2abf9d0e7..4b6a39dd5 100644 --- a/src/bin/lttng-sessiond/main.c +++ b/src/bin/lttng-sessiond/main.c @@ -44,6 +44,7 @@ #include #include #include +#include #include "lttng-sessiond.h" #include "channel.h" @@ -59,6 +60,7 @@ #include "utils.h" #include "fd-limit.h" #include "filter.h" +#include "health.h" #define CONSUMERD_FILE "lttng-consumerd" @@ -85,6 +87,8 @@ static struct consumer_data kconsumer_data = { .cmd_unix_sock_path = DEFAULT_KCONSUMERD_CMD_SOCK_PATH, .err_sock = -1, .cmd_sock = -1, + .pid_mutex = PTHREAD_MUTEX_INITIALIZER, + .lock = PTHREAD_MUTEX_INITIALIZER, }; static struct consumer_data ustconsumer64_data = { .type = LTTNG_CONSUMER64_UST, @@ -92,6 +96,8 @@ static struct consumer_data ustconsumer64_data = { .cmd_unix_sock_path = DEFAULT_USTCONSUMERD64_CMD_SOCK_PATH, .err_sock = -1, .cmd_sock = -1, + .pid_mutex = PTHREAD_MUTEX_INITIALIZER, + .lock = PTHREAD_MUTEX_INITIALIZER, }; static struct consumer_data ustconsumer32_data = { .type = LTTNG_CONSUMER32_UST, @@ -99,8 +105,11 @@ static struct consumer_data ustconsumer32_data = { .cmd_unix_sock_path = DEFAULT_USTCONSUMERD32_CMD_SOCK_PATH, .err_sock = -1, .cmd_sock = -1, + .pid_mutex = PTHREAD_MUTEX_INITIALIZER, + .lock = PTHREAD_MUTEX_INITIALIZER, }; +/* Shared between threads */ static int dispatch_thread_exit; /* Global application Unix socket path */ @@ -109,6 +118,8 @@ static char apps_unix_sock_path[PATH_MAX]; static char client_unix_sock_path[PATH_MAX]; /* global wait shm path for UST */ static char wait_shm_path[PATH_MAX]; +/* Global health check unix path */ +static char health_unix_sock_path[PATH_MAX]; /* Sockets and FDs */ static int client_sock = -1; @@ -134,6 +145,7 @@ static pthread_t reg_apps_thread; static pthread_t client_thread; static pthread_t kernel_thread; static pthread_t dispatch_thread; +static pthread_t health_thread; /* * UST registration command queue. This queue is tied with a futex and uses a N @@ -208,6 +220,12 @@ static enum consumerd_state kernel_consumerd_state; */ static unsigned int relayd_net_seq_idx; +/* Used for the health monitoring of the session daemon. See health.h */ +struct health_state health_thread_cmd; +struct health_state health_thread_app_manage; +struct health_state health_thread_app_reg; +struct health_state health_thread_kernel; + static void setup_consumerd_path(void) { @@ -353,23 +371,56 @@ error: */ static void teardown_kernel_session(struct ltt_session *session) { + int ret; + struct lttng_ht_iter iter; + struct ltt_kernel_session *ksess; + struct consumer_socket *socket; + if (!session->kernel_session) { DBG3("No kernel session when tearing down session"); return; } + ksess = session->kernel_session; + DBG("Tearing down kernel session"); + /* + * Destroy relayd associated with the session consumer. This action is + * valid since in order to destroy a session we must acquire the session + * lock. This means that there CAN NOT be stream(s) being sent to a + * consumer since this action also requires the session lock at any time. + * + * At this point, we are sure that not streams data will be lost after this + * command is issued. + */ + if (ksess->consumer && ksess->consumer->type == CONSUMER_DST_NET) { + cds_lfht_for_each_entry(ksess->consumer->socks->ht, &iter.iter, socket, + node.node) { + ret = consumer_send_destroy_relayd(socket, ksess->consumer); + if (ret < 0) { + ERR("Unable to send destroy relayd command to consumer"); + /* Continue since we MUST delete everything at this point. */ + } + } + } + /* * If a custom kernel consumer was registered, close the socket before * tearing down the complete kernel session structure */ - if (kconsumer_data.cmd_sock >= 0 && - session->kernel_session->consumer_fd != kconsumer_data.cmd_sock) { - lttcomm_close_unix_sock(session->kernel_session->consumer_fd); + cds_lfht_for_each_entry(ksess->consumer->socks->ht, &iter.iter, socket, + node.node) { + if (socket->fd != kconsumer_data.cmd_sock) { + rcu_read_lock(); + consumer_del_socket(socket, ksess->consumer); + lttcomm_close_unix_sock(socket->fd); + consumer_destroy_socket(socket); + rcu_read_unlock(); + } } - trace_kernel_destroy_session(session->kernel_session); + trace_kernel_destroy_session(ksess); } /* @@ -379,20 +430,44 @@ static void teardown_kernel_session(struct ltt_session *session) static void teardown_ust_session(struct ltt_session *session) { int ret; + struct lttng_ht_iter iter; + struct ltt_ust_session *usess; + struct consumer_socket *socket; if (!session->ust_session) { DBG3("No UST session when tearing down session"); return; } + usess = session->ust_session; DBG("Tearing down UST session(s)"); - ret = ust_app_destroy_trace_all(session->ust_session); + /* + * Destroy relayd associated with the session consumer. This action is + * valid since in order to destroy a session we must acquire the session + * lock. This means that there CAN NOT be stream(s) being sent to a + * consumer since this action also requires the session lock at any time. + * + * At this point, we are sure that not streams data will be lost after this + * command is issued. + */ + if (usess->consumer && usess->consumer->type == CONSUMER_DST_NET) { + cds_lfht_for_each_entry(usess->consumer->socks->ht, &iter.iter, socket, + node.node) { + ret = consumer_send_destroy_relayd(socket, usess->consumer); + if (ret < 0) { + ERR("Unable to send destroy relayd command to consumer"); + /* Continue since we MUST delete everything at this point. */ + } + } + } + + ret = ust_app_destroy_trace_all(usess); if (ret) { ERR("Error in ust_app_destroy_trace_all"); } - trace_ust_destroy_session(session->ust_session); + trace_ust_destroy_session(usess); } /* @@ -410,7 +485,7 @@ static void stop_threads(void) } /* Dispatch thread */ - dispatch_thread_exit = 1; + CMM_STORE_SHARED(dispatch_thread_exit, 1); futex_nto1_wake(&ust_cmd_queue.futex); } @@ -456,8 +531,6 @@ static void cleanup(void) DBG("Closing all UST sockets"); ust_app_clean_list(); - pthread_mutex_destroy(&kconsumer_data.pid_mutex); - if (is_root && !opt_no_kernel) { DBG2("Closing kernel fd"); if (kernel_tracer_fd >= 0) { @@ -622,6 +695,7 @@ static int update_kernel_stream(struct consumer_data *consumer_data, int fd) { int ret = 0; struct ltt_session *session; + struct ltt_kernel_session *ksess; struct ltt_kernel_channel *channel; DBG("Updating kernel streams for channel fd %d", fd); @@ -633,14 +707,9 @@ static int update_kernel_stream(struct consumer_data *consumer_data, int fd) session_unlock(session); continue; } + ksess = session->kernel_session; - /* This is not suppose to be -1 but this is an extra security check */ - if (session->kernel_session->consumer_fd < 0) { - session->kernel_session->consumer_fd = consumer_data->cmd_sock; - } - - cds_list_for_each_entry(channel, - &session->kernel_session->channel_list.head, list) { + cds_list_for_each_entry(channel, &ksess->channel_list.head, list) { if (channel->fd == fd) { DBG("Channel found, updating kernel streams"); ret = kernel_open_channel_stream(channel); @@ -653,13 +722,23 @@ static int update_kernel_stream(struct consumer_data *consumer_data, int fd) * that tracing is started so it is safe to send our updated * stream fds. */ - if (session->kernel_session->consumer_fds_sent == 1 && - session->kernel_session->consumer != NULL) { - ret = kernel_consumer_send_channel_stream( - session->kernel_session->consumer_fd, channel, - session->kernel_session); - if (ret < 0) { - goto error; + if (ksess->consumer_fds_sent == 1 && ksess->consumer != NULL) { + struct lttng_ht_iter iter; + struct consumer_socket *socket; + + + cds_lfht_for_each_entry(ksess->consumer->socks->ht, + &iter.iter, socket, node.node) { + /* Code flow error */ + assert(socket->fd >= 0); + + pthread_mutex_lock(socket->lock); + ret = kernel_consumer_send_channel_stream(socket->fd, + channel, ksess); + pthread_mutex_unlock(socket->lock); + if (ret < 0) { + goto error; + } } } goto error; @@ -705,13 +784,15 @@ static void update_ust_app(int app_sock) */ static void *thread_manage_kernel(void *data) { - int ret, i, pollfd, update_poll_flag = 1; + int ret, i, pollfd, update_poll_flag = 1, err = -1; uint32_t revents, nb_fd; char tmp; struct lttng_poll_event events; DBG("Thread manage kernel started"); + health_code_update(&health_thread_kernel); + ret = create_thread_poll_set(&events, 2); if (ret < 0) { goto error_poll_create; @@ -723,6 +804,8 @@ static void *thread_manage_kernel(void *data) } while (1) { + health_code_update(&health_thread_kernel); + if (update_poll_flag == 1) { /* * Reset number of fd in the poll set. Always 2 since there is the thread @@ -746,7 +829,9 @@ static void *thread_manage_kernel(void *data) /* Poll infinite value of time */ restart: + health_poll_update(&health_thread_kernel); ret = lttng_poll_wait(&events, -1); + health_poll_update(&health_thread_kernel); if (ret < 0) { /* * Restart interrupted system call. @@ -767,10 +852,13 @@ static void *thread_manage_kernel(void *data) revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); + health_code_update(&health_thread_kernel); + /* Thread quit pipe has been closed. Killing thread. */ ret = check_thread_quit_pipe(pollfd, revents); if (ret) { - goto error; + err = 0; + goto exit; } /* Check for data on kernel pipe */ @@ -798,9 +886,15 @@ static void *thread_manage_kernel(void *data) } } +exit: error: lttng_poll_clean(&events); error_poll_create: + if (err) { + health_error(&health_thread_kernel); + ERR("Health error occurred in %s", __func__); + } + health_exit(&health_thread_kernel); DBG("Kernel thread dying"); return NULL; } @@ -810,7 +904,7 @@ error_poll_create: */ static void *thread_manage_consumer(void *data) { - int sock = -1, i, ret, pollfd; + int sock = -1, i, ret, pollfd, err = -1; uint32_t revents, nb_fd; enum lttcomm_return_code code; struct lttng_poll_event events; @@ -818,6 +912,8 @@ static void *thread_manage_consumer(void *data) DBG("[thread] Manage consumer started"); + health_code_update(&consumer_data->health); + ret = lttcomm_listen_unix_sock(consumer_data->err_sock); if (ret < 0) { goto error_listen; @@ -839,9 +935,13 @@ static void *thread_manage_consumer(void *data) nb_fd = LTTNG_POLL_GETNB(&events); + health_code_update(&consumer_data->health); + /* Inifinite blocking call, waiting for transmission */ restart: + health_poll_update(&consumer_data->health); ret = lttng_poll_wait(&events, -1); + health_poll_update(&consumer_data->health); if (ret < 0) { /* * Restart interrupted system call. @@ -857,10 +957,13 @@ restart: revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); + health_code_update(&consumer_data->health); + /* Thread quit pipe has been closed. Killing thread. */ ret = check_thread_quit_pipe(pollfd, revents); if (ret) { - goto error; + err = 0; + goto exit; } /* Event on the registration socket */ @@ -877,6 +980,8 @@ restart: goto error; } + health_code_update(&consumer_data->health); + DBG2("Receiving code from consumer err_sock"); /* Getting status code from kconsumerd */ @@ -886,6 +991,8 @@ restart: goto error; } + health_code_update(&consumer_data->health); + if (code == CONSUMERD_COMMAND_SOCK_READY) { consumer_data->cmd_sock = lttcomm_connect_unix_sock(consumer_data->cmd_unix_sock_path); @@ -914,12 +1021,16 @@ restart: goto error; } + health_code_update(&consumer_data->health); + /* Update number of fd */ nb_fd = LTTNG_POLL_GETNB(&events); /* Inifinite blocking call, waiting for transmission */ restart_poll: + health_poll_update(&consumer_data->health); ret = lttng_poll_wait(&events, -1); + health_poll_update(&consumer_data->health); if (ret < 0) { /* * Restart interrupted system call. @@ -935,10 +1046,13 @@ restart_poll: revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); + health_code_update(&consumer_data->health); + /* Thread quit pipe has been closed. Killing thread. */ ret = check_thread_quit_pipe(pollfd, revents); if (ret) { - goto error; + err = 0; + goto exit; } /* Event on the kconsumerd socket */ @@ -950,6 +1064,8 @@ restart_poll: } } + health_code_update(&consumer_data->health); + /* Wait for any kconsumerd error */ ret = lttcomm_recv_unix_sock(sock, &code, sizeof(enum lttcomm_return_code)); @@ -960,6 +1076,7 @@ restart_poll: ERR("consumer return code : %s", lttcomm_get_readable_code(-code)); +exit: error: /* Immediately set the consumerd state to stopped */ if (consumer_data->type == LTTNG_CONSUMER_KERNEL) { @@ -998,6 +1115,11 @@ error: lttng_poll_clean(&events); error_poll: error_listen: + if (err) { + health_error(&consumer_data->health); + ERR("Health error occurred in %s", __func__); + } + health_exit(&consumer_data->health); DBG("consumer thread cleanup completed"); return NULL; @@ -1008,7 +1130,7 @@ error_listen: */ static void *thread_manage_apps(void *data) { - int i, ret, pollfd; + int i, ret, pollfd, err = -1; uint32_t revents, nb_fd; struct ust_command ust_cmd; struct lttng_poll_event events; @@ -1018,6 +1140,8 @@ static void *thread_manage_apps(void *data) rcu_register_thread(); rcu_thread_online(); + health_code_update(&health_thread_app_manage); + ret = create_thread_poll_set(&events, 2); if (ret < 0) { goto error_poll_create; @@ -1028,6 +1152,8 @@ static void *thread_manage_apps(void *data) goto error; } + health_code_update(&health_thread_app_manage); + while (1) { /* Zeroed the events structure */ lttng_poll_reset(&events); @@ -1038,7 +1164,9 @@ static void *thread_manage_apps(void *data) /* Inifinite blocking call, waiting for transmission */ restart: + health_poll_update(&health_thread_app_manage); ret = lttng_poll_wait(&events, -1); + health_poll_update(&health_thread_app_manage); if (ret < 0) { /* * Restart interrupted system call. @@ -1054,10 +1182,13 @@ static void *thread_manage_apps(void *data) revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); + health_code_update(&health_thread_app_manage); + /* Thread quit pipe has been closed. Killing thread. */ ret = check_thread_quit_pipe(pollfd, revents); if (ret) { - goto error; + err = 0; + goto exit; } /* Inspect the apps cmd pipe */ @@ -1073,6 +1204,8 @@ static void *thread_manage_apps(void *data) goto error; } + health_code_update(&health_thread_app_manage); + /* Register applicaton to the session daemon */ ret = ust_app_register(&ust_cmd.reg_msg, ust_cmd.sock); @@ -1082,6 +1215,8 @@ static void *thread_manage_apps(void *data) break; } + health_code_update(&health_thread_app_manage); + /* * Validate UST version compatibility. */ @@ -1094,6 +1229,8 @@ static void *thread_manage_apps(void *data) update_ust_app(ust_cmd.sock); } + health_code_update(&health_thread_app_manage); + ret = ust_app_register_done(ust_cmd.sock); if (ret < 0) { /* @@ -1117,6 +1254,8 @@ static void *thread_manage_apps(void *data) ust_cmd.sock); } + health_code_update(&health_thread_app_manage); + break; } } else { @@ -1136,12 +1275,20 @@ static void *thread_manage_apps(void *data) break; } } + + health_code_update(&health_thread_app_manage); } } +exit: error: lttng_poll_clean(&events); error_poll_create: + if (err) { + health_error(&health_thread_app_manage); + ERR("Health error occurred in %s", __func__); + } + health_exit(&health_thread_app_manage); DBG("Application communication apps thread cleanup complete"); rcu_thread_offline(); rcu_unregister_thread(); @@ -1160,7 +1307,7 @@ static void *thread_dispatch_ust_registration(void *data) DBG("[thread] Dispatch UST command started"); - while (!dispatch_thread_exit) { + while (!CMM_LOAD_SHARED(dispatch_thread_exit)) { /* Atomically prepare the queue futex */ futex_nto1_prepare(&ust_cmd_queue.futex); @@ -1217,7 +1364,7 @@ error: */ static void *thread_registration_apps(void *data) { - int sock = -1, i, ret, pollfd; + int sock = -1, i, ret, pollfd, err = -1; uint32_t revents, nb_fd; struct lttng_poll_event events; /* @@ -1263,7 +1410,9 @@ static void *thread_registration_apps(void *data) /* Inifinite blocking call, waiting for transmission */ restart: + health_poll_update(&health_thread_app_reg); ret = lttng_poll_wait(&events, -1); + health_poll_update(&health_thread_app_reg); if (ret < 0) { /* * Restart interrupted system call. @@ -1275,6 +1424,8 @@ static void *thread_registration_apps(void *data) } for (i = 0; i < nb_fd; i++) { + health_code_update(&health_thread_app_reg); + /* Fetch once the poll data */ revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); @@ -1282,7 +1433,8 @@ static void *thread_registration_apps(void *data) /* Thread quit pipe has been closed. Killing thread. */ ret = check_thread_quit_pipe(pollfd, revents); if (ret) { - goto error; + err = 0; + goto exit; } /* Event on the registration socket */ @@ -1318,6 +1470,7 @@ static void *thread_registration_apps(void *data) sock = -1; continue; } + health_code_update(&health_thread_app_reg); ret = lttcomm_recv_unix_sock(sock, &ust_cmd->reg_msg, sizeof(struct ust_register_msg)); if (ret < 0 || ret < sizeof(struct ust_register_msg)) { @@ -1335,6 +1488,7 @@ static void *thread_registration_apps(void *data) sock = -1; continue; } + health_code_update(&health_thread_app_reg); ust_cmd->sock = sock; sock = -1; @@ -1362,7 +1516,14 @@ static void *thread_registration_apps(void *data) } } +exit: error: + if (err) { + health_error(&health_thread_app_reg); + ERR("Health error occurred in %s", __func__); + } + health_exit(&health_thread_app_reg); + /* Notify that the registration thread is gone */ notify_ust_apps(0); @@ -1468,7 +1629,8 @@ static int join_consumer_thread(struct consumer_data *consumer_data) void *status; int ret; - if (consumer_data->pid != 0) { + /* Consumer pid must be a real one. */ + if (consumer_data->pid > 0) { ret = kill(consumer_data->pid, SIGTERM); if (ret) { ERR("Error killing consumer daemon"); @@ -1681,6 +1843,23 @@ error: return ret; } +/* + * Compute health status of each consumer. If one of them is zero (bad + * state), we return 0. + */ +static int check_consumer_health(void) +{ + int ret; + + ret = health_check_state(&kconsumer_data.health) && + health_check_state(&ustconsumer32_data.health) && + health_check_state(&ustconsumer64_data.health); + + DBG3("Health consumer check %d", ret); + + return ret; +} + /* * Check version of the lttng-modules. */ @@ -1758,21 +1937,24 @@ error: static int init_kernel_tracing(struct ltt_kernel_session *session) { int ret = 0; + struct lttng_ht_iter iter; + struct consumer_socket *socket; - if (session->consumer_fds_sent == 0 && session->consumer != NULL) { - /* - * Assign default kernel consumer socket if no consumer assigned to the - * kernel session. At this point, it's NOT supposed to be -1 but this is - * an extra security check. - */ - if (session->consumer_fd < 0) { - session->consumer_fd = kconsumer_data.cmd_sock; - } + assert(session); - ret = kernel_consumer_send_session(session->consumer_fd, session); - if (ret < 0) { - ret = LTTCOMM_KERN_CONSUMER_FAIL; - goto error; + if (session->consumer_fds_sent == 0 && session->consumer != NULL) { + cds_lfht_for_each_entry(session->consumer->socks->ht, &iter.iter, + socket, node.node) { + /* Code flow error */ + assert(socket->fd >= 0); + + pthread_mutex_lock(socket->lock); + ret = kernel_consumer_send_session(socket->fd, session); + pthread_mutex_unlock(socket->lock); + if (ret < 0) { + ret = LTTCOMM_KERN_CONSUMER_FAIL; + goto error; + } } } @@ -1881,25 +2063,12 @@ static int send_socket_relayd_consumer(int domain, struct ltt_session *session, session->net_handle = 1; } - switch (domain) { - case LTTNG_DOMAIN_KERNEL: - /* Send relayd socket to consumer. */ - ret = kernel_consumer_send_relayd_socket(consumer_fd, sock, - consumer, relayd_uri->stype); - if (ret < 0) { - ret = LTTCOMM_ENABLE_CONSUMER_FAIL; - goto close_sock; - } - break; - case LTTNG_DOMAIN_UST: - /* Send relayd socket to consumer. */ - ret = ust_consumer_send_relayd_socket(consumer_fd, sock, - consumer, relayd_uri->stype); - if (ret < 0) { - ret = LTTCOMM_ENABLE_CONSUMER_FAIL; - goto close_sock; - } - break; + /* Send relayd socket to consumer. */ + ret = consumer_send_relayd_socket(consumer_fd, sock, + consumer, relayd_uri->stype); + if (ret < 0) { + ret = LTTCOMM_ENABLE_CONSUMER_FAIL; + goto close_sock; } ret = LTTCOMM_OK; @@ -1956,6 +2125,8 @@ static int setup_relayd(struct ltt_session *session) int ret = LTTCOMM_OK; struct ltt_ust_session *usess; struct ltt_kernel_session *ksess; + struct consumer_socket *socket; + struct lttng_ht_iter iter; assert(session); @@ -1964,34 +2135,37 @@ static int setup_relayd(struct ltt_session *session) DBG2("Setting relayd for session %s", session->name); - if (usess && usess->consumer->sock == -1 && - usess->consumer->type == CONSUMER_DST_NET && + if (usess && usess->consumer->type == CONSUMER_DST_NET && usess->consumer->enabled) { - /* Setup relayd for 64 bits consumer */ - if (ust_consumerd64_fd >= 0) { + /* For each consumer socket, send relayd sockets */ + cds_lfht_for_each_entry(usess->consumer->socks->ht, &iter.iter, + socket, node.node) { + /* Code flow error */ + assert(socket->fd >= 0); + + pthread_mutex_lock(socket->lock); send_sockets_relayd_consumer(LTTNG_DOMAIN_UST, session, - usess->consumer, ust_consumerd64_fd); + usess->consumer, socket->fd); + pthread_mutex_unlock(socket->lock); if (ret != LTTCOMM_OK) { goto error; } } - - /* Setup relayd for 32 bits consumer */ - if (ust_consumerd32_fd >= 0) { - send_sockets_relayd_consumer(LTTNG_DOMAIN_UST, session, - usess->consumer, ust_consumerd32_fd); + } else if (ksess && ksess->consumer->type == CONSUMER_DST_NET && + ksess->consumer->enabled) { + cds_lfht_for_each_entry(ksess->consumer->socks->ht, &iter.iter, + socket, node.node) { + /* Code flow error */ + assert(socket->fd >= 0); + + pthread_mutex_lock(socket->lock); + send_sockets_relayd_consumer(LTTNG_DOMAIN_KERNEL, session, + ksess->consumer, socket->fd); + pthread_mutex_unlock(socket->lock); if (ret != LTTCOMM_OK) { goto error; } } - } else if (ksess && ksess->consumer->sock == -1 && - ksess->consumer->type == CONSUMER_DST_NET && - ksess->consumer->enabled) { - send_sockets_relayd_consumer(LTTNG_DOMAIN_KERNEL, session, - ksess->consumer, ksess->consumer_fd); - if (ret != LTTCOMM_OK) { - goto error; - } } error: @@ -2121,11 +2295,6 @@ static int create_kernel_session(struct ltt_session *session) goto error; } - /* Set kernel consumer socket fd */ - if (kconsumer_data.cmd_sock >= 0) { - session->kernel_session->consumer_fd = kconsumer_data.cmd_sock; - } - /* Copy session output to the newly created Kernel session */ ret = copy_session_consumer(LTTNG_DOMAIN_KERNEL, session); if (ret != LTTCOMM_OK) { @@ -3367,6 +3536,9 @@ static int cmd_destroy_session(struct ltt_session *session, char *name) { int ret; + /* Safety net */ + assert(session); + /* Clean kernel session teardown */ teardown_kernel_session(session); /* UST session teardown */ @@ -3436,6 +3608,7 @@ static int cmd_register_consumer(struct ltt_session *session, int domain, char *sock_path) { int ret, sock; + struct consumer_socket *socket; switch (domain) { case LTTNG_DOMAIN_KERNEL: @@ -3451,7 +3624,29 @@ static int cmd_register_consumer(struct ltt_session *session, int domain, goto error; } - session->kernel_session->consumer_fd = sock; + socket = consumer_allocate_socket(sock); + if (socket == NULL) { + ret = LTTCOMM_FATAL; + close(sock); + goto error; + } + + socket->lock = zmalloc(sizeof(pthread_mutex_t)); + if (socket->lock == NULL) { + PERROR("zmalloc pthread mutex"); + ret = LTTCOMM_FATAL; + goto error; + } + pthread_mutex_init(socket->lock, NULL); + + rcu_read_lock(); + consumer_add_socket(socket, session->kernel_session->consumer); + rcu_read_unlock(); + + pthread_mutex_lock(&kconsumer_data.pid_mutex); + kconsumer_data.pid = -1; + pthread_mutex_unlock(&kconsumer_data.pid_mutex); + break; default: /* TODO: Userspace tracing */ @@ -3607,6 +3802,10 @@ static int cmd_set_consumer_uri(int domain, struct ltt_session *session, switch (domain) { case LTTNG_DOMAIN_KERNEL: + { + struct lttng_ht_iter iter; + struct consumer_socket *socket; + /* Code flow error if we don't have a kernel session here. */ assert(ksess); @@ -3640,11 +3839,20 @@ static int cmd_set_consumer_uri(int domain, struct ltt_session *session, sizeof(consumer->subdir)); } - ret = send_socket_relayd_consumer(domain, session, uri, consumer, - ksess->consumer_fd); - if (ret != LTTCOMM_OK) { - goto error; + cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, + socket, node.node) { + /* Code flow error */ + assert(socket->fd >= 0); + + pthread_mutex_lock(socket->lock); + ret = send_socket_relayd_consumer(domain, session, uri, consumer, + socket->fd); + pthread_mutex_unlock(socket->lock); + if (ret != LTTCOMM_OK) { + goto error; + } } + break; case LTTNG_DST_PATH: DBG2("Setting trace directory path from URI to %s", uri->dst.path); @@ -3660,6 +3868,7 @@ static int cmd_set_consumer_uri(int domain, struct ltt_session *session, /* All good! */ break; + } case LTTNG_DOMAIN_UST: /* Code flow error if we don't have a kernel session here. */ assert(usess); @@ -3680,6 +3889,8 @@ static int cmd_set_consumer_uri(int domain, struct ltt_session *session, case LTTNG_DST_IPV4: case LTTNG_DST_IPV6: { + struct consumer_socket *socket; + DBG2("Setting network URI for UST session %s", session->name); /* Set URI into consumer object */ @@ -3695,22 +3906,31 @@ static int cmd_set_consumer_uri(int domain, struct ltt_session *session, sizeof(consumer->subdir)); } - if (ust_consumerd64_fd >= 0) { + rcu_read_lock(); + socket = consumer_find_socket(uatomic_read(&ust_consumerd64_fd), + consumer); + if (socket != NULL) { + pthread_mutex_lock(socket->lock); ret = send_socket_relayd_consumer(domain, session, uri, - consumer, ust_consumerd64_fd); + consumer, socket->fd); + pthread_mutex_unlock(socket->lock); if (ret != LTTCOMM_OK) { goto error; } } - if (ust_consumerd32_fd >= 0) { + socket = consumer_find_socket(uatomic_read(&ust_consumerd32_fd), + consumer); + if (socket != NULL) { + pthread_mutex_lock(socket->lock); ret = send_socket_relayd_consumer(domain, session, uri, - consumer, ust_consumerd32_fd); + consumer, socket->fd); + pthread_mutex_unlock(socket->lock); if (ret != LTTCOMM_OK) { goto error; } } - + rcu_read_unlock(); break; } case LTTNG_DST_PATH: @@ -4096,6 +4316,8 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock, /* Need a session for kernel command */ if (need_tracing_session) { + struct consumer_socket *socket; + if (cmd_ctx->session->kernel_session == NULL) { ret = create_kernel_session(cmd_ctx->session); if (ret < 0) { @@ -4115,13 +4337,29 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock, goto error; } uatomic_set(&kernel_consumerd_state, CONSUMER_STARTED); - - /* Set consumer fd of the session */ - cmd_ctx->session->kernel_session->consumer_fd = - kconsumer_data.cmd_sock; } else { pthread_mutex_unlock(&kconsumer_data.pid_mutex); } + + /* Set kernel consumer socket fd */ + if (kconsumer_data.cmd_sock >= 0) { + rcu_read_lock(); + socket = consumer_find_socket(kconsumer_data.cmd_sock, + cmd_ctx->session->kernel_session->consumer); + rcu_read_unlock(); + if (socket == NULL) { + socket = consumer_allocate_socket(kconsumer_data.cmd_sock); + if (socket == NULL) { + goto error; + } + + socket->lock = &kconsumer_data.lock; + rcu_read_lock(); + consumer_add_socket(socket, + cmd_ctx->session->kernel_session->consumer); + rcu_read_unlock(); + } + } } break; @@ -4134,6 +4372,8 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock, } if (need_tracing_session) { + struct consumer_socket *socket; + if (cmd_ctx->session->ust_session == NULL) { ret = create_ust_session(cmd_ctx->session, &cmd_ctx->lsm->domain); @@ -4152,15 +4392,40 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock, ret = start_consumerd(&ustconsumer64_data); if (ret < 0) { ret = LTTCOMM_UST_CONSUMER64_FAIL; - ust_consumerd64_fd = -EINVAL; + uatomic_set(&ust_consumerd64_fd, -EINVAL); goto error; } - ust_consumerd64_fd = ustconsumer64_data.cmd_sock; + uatomic_set(&ust_consumerd64_fd, ustconsumer64_data.cmd_sock); uatomic_set(&ust_consumerd_state, CONSUMER_STARTED); } else { pthread_mutex_unlock(&ustconsumer64_data.pid_mutex); } + + /* + * Setup socket for consumer 64 bit. No need for atomic access + * since it was set above and can ONLY be set in this thread. + */ + if (ust_consumerd64_fd >= 0) { + rcu_read_lock(); + socket = consumer_find_socket(uatomic_read(&ust_consumerd64_fd), + cmd_ctx->session->ust_session->consumer); + rcu_read_unlock(); + if (socket == NULL) { + socket = consumer_allocate_socket(ust_consumerd64_fd); + if (socket == NULL) { + goto error; + } + socket->lock = &ustconsumer32_data.lock; + + rcu_read_lock(); + consumer_add_socket(socket, + cmd_ctx->session->ust_session->consumer); + rcu_read_unlock(); + } + DBG3("UST consumer 64 bit socket set to %d", socket->fd); + } + /* 32-bit */ if (consumerd32_bin[0] != '\0' && ustconsumer32_data.pid == 0 && @@ -4169,15 +4434,39 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock, ret = start_consumerd(&ustconsumer32_data); if (ret < 0) { ret = LTTCOMM_UST_CONSUMER32_FAIL; - ust_consumerd32_fd = -EINVAL; + uatomic_set(&ust_consumerd32_fd, -EINVAL); goto error; } - ust_consumerd32_fd = ustconsumer32_data.cmd_sock; + uatomic_set(&ust_consumerd32_fd, ustconsumer32_data.cmd_sock); uatomic_set(&ust_consumerd_state, CONSUMER_STARTED); } else { pthread_mutex_unlock(&ustconsumer32_data.pid_mutex); } + + /* + * Setup socket for consumer 64 bit. No need for atomic access + * since it was set above and can ONLY be set in this thread. + */ + if (ust_consumerd32_fd >= 0) { + rcu_read_lock(); + socket = consumer_find_socket(uatomic_read(&ust_consumerd64_fd), + cmd_ctx->session->ust_session->consumer); + rcu_read_unlock(); + if (socket == NULL) { + socket = consumer_allocate_socket(ust_consumerd32_fd); + if (socket == NULL) { + goto error; + } + socket->lock = &ustconsumer32_data.lock; + + rcu_read_lock(); + consumer_add_socket(socket, + cmd_ctx->session->ust_session->consumer); + rcu_read_unlock(); + } + DBG3("UST consumer 32 bit socket set to %d", socket->fd); + } } break; } @@ -4558,13 +4847,194 @@ init_setup_error: return ret; } +/* + * Thread managing health check socket. + */ +static void *thread_manage_health(void *data) +{ + int sock = -1, new_sock, ret, i, pollfd, err = -1; + uint32_t revents, nb_fd; + struct lttng_poll_event events; + struct lttcomm_health_msg msg; + struct lttcomm_health_data reply; + + DBG("[thread] Manage health check started"); + + rcu_register_thread(); + + /* Create unix socket */ + sock = lttcomm_create_unix_sock(health_unix_sock_path); + if (sock < 0) { + ERR("Unable to create health check Unix socket"); + ret = -1; + goto error; + } + + ret = lttcomm_listen_unix_sock(sock); + if (ret < 0) { + goto error; + } + + /* + * Pass 2 as size here for the thread quit pipe and client_sock. Nothing + * more will be added to this poll set. + */ + ret = create_thread_poll_set(&events, 2); + if (ret < 0) { + goto error; + } + + /* Add the application registration socket */ + ret = lttng_poll_add(&events, sock, LPOLLIN | LPOLLPRI); + if (ret < 0) { + goto error; + } + + while (1) { + DBG("Health check ready"); + + nb_fd = LTTNG_POLL_GETNB(&events); + + /* Inifinite blocking call, waiting for transmission */ +restart: + ret = lttng_poll_wait(&events, -1); + if (ret < 0) { + /* + * Restart interrupted system call. + */ + if (errno == EINTR) { + goto restart; + } + goto error; + } + + for (i = 0; i < nb_fd; i++) { + /* Fetch once the poll data */ + revents = LTTNG_POLL_GETEV(&events, i); + pollfd = LTTNG_POLL_GETFD(&events, i); + + /* Thread quit pipe has been closed. Killing thread. */ + ret = check_thread_quit_pipe(pollfd, revents); + if (ret) { + err = 0; + goto exit; + } + + /* Event on the registration socket */ + if (pollfd == sock) { + if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + ERR("Health socket poll error"); + goto error; + } + } + } + + new_sock = lttcomm_accept_unix_sock(sock); + if (new_sock < 0) { + goto error; + } + + DBG("Receiving data from client for health..."); + ret = lttcomm_recv_unix_sock(new_sock, (void *)&msg, sizeof(msg)); + if (ret <= 0) { + DBG("Nothing recv() from client... continuing"); + ret = close(new_sock); + if (ret) { + PERROR("close"); + } + new_sock = -1; + continue; + } + + rcu_thread_online(); + + switch (msg.component) { + case LTTNG_HEALTH_CMD: + reply.ret_code = health_check_state(&health_thread_cmd); + break; + case LTTNG_HEALTH_APP_MANAGE: + reply.ret_code = health_check_state(&health_thread_app_manage); + break; + case LTTNG_HEALTH_APP_REG: + reply.ret_code = health_check_state(&health_thread_app_reg); + break; + case LTTNG_HEALTH_KERNEL: + reply.ret_code = health_check_state(&health_thread_kernel); + break; + case LTTNG_HEALTH_CONSUMER: + reply.ret_code = check_consumer_health(); + break; + case LTTNG_HEALTH_ALL: + reply.ret_code = + health_check_state(&health_thread_app_manage) && + health_check_state(&health_thread_app_reg) && + health_check_state(&health_thread_cmd) && + health_check_state(&health_thread_kernel) && + check_consumer_health(); + break; + default: + reply.ret_code = LTTCOMM_UND; + break; + } + + /* + * Flip ret value since 0 is a success and 1 indicates a bad health for + * the client where in the sessiond it is the opposite. Again, this is + * just to make things easier for us poor developer which enjoy a lot + * lazyness. + */ + if (reply.ret_code == 0 || reply.ret_code == 1) { + reply.ret_code = !reply.ret_code; + } + + DBG2("Health check return value %d", reply.ret_code); + + ret = send_unix_sock(new_sock, (void *) &reply, sizeof(reply)); + if (ret < 0) { + ERR("Failed to send health data back to client"); + } + + /* End of transmission */ + ret = close(new_sock); + if (ret) { + PERROR("close"); + } + new_sock = -1; + } + +exit: +error: + if (err) { + ERR("Health error occurred in %s", __func__); + } + DBG("Health check thread dying"); + unlink(health_unix_sock_path); + if (sock >= 0) { + ret = close(sock); + if (ret) { + PERROR("close"); + } + } + if (new_sock >= 0) { + ret = close(new_sock); + if (ret) { + PERROR("close"); + } + } + + lttng_poll_clean(&events); + + rcu_unregister_thread(); + return NULL; +} + /* * This thread manage all clients request using the unix client socket for * communication. */ static void *thread_manage_clients(void *data) { - int sock = -1, ret, i, pollfd; + int sock = -1, ret, i, pollfd, err = -1; int sock_error; uint32_t revents, nb_fd; struct command_ctx *cmd_ctx = NULL; @@ -4574,6 +5044,8 @@ static void *thread_manage_clients(void *data) rcu_register_thread(); + health_code_update(&health_thread_cmd); + ret = lttcomm_listen_unix_sock(client_sock); if (ret < 0) { goto error; @@ -4601,6 +5073,8 @@ static void *thread_manage_clients(void *data) kill(ppid, SIGUSR1); } + health_code_update(&health_thread_cmd); + while (1) { DBG("Accepting client command ..."); @@ -4608,7 +5082,9 @@ static void *thread_manage_clients(void *data) /* Inifinite blocking call, waiting for transmission */ restart: + health_poll_update(&health_thread_cmd); ret = lttng_poll_wait(&events, -1); + health_poll_update(&health_thread_cmd); if (ret < 0) { /* * Restart interrupted system call. @@ -4624,10 +5100,13 @@ static void *thread_manage_clients(void *data) revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); + health_code_update(&health_thread_cmd); + /* Thread quit pipe has been closed. Killing thread. */ ret = check_thread_quit_pipe(pollfd, revents); if (ret) { - goto error; + err = 0; + goto exit; } /* Event on the registration socket */ @@ -4641,6 +5120,8 @@ static void *thread_manage_clients(void *data) DBG("Wait for client response"); + health_code_update(&health_thread_cmd); + sock = lttcomm_accept_unix_sock(client_sock); if (sock < 0) { goto error; @@ -4669,6 +5150,8 @@ static void *thread_manage_clients(void *data) cmd_ctx->llm = NULL; cmd_ctx->session = NULL; + health_code_update(&health_thread_cmd); + /* * Data is received from the lttng client. The struct * lttcomm_session_msg (lsm) contains the command and data request of @@ -4688,6 +5171,8 @@ static void *thread_manage_clients(void *data) continue; } + health_code_update(&health_thread_cmd); + // TODO: Validate cmd_ctx including sanity check for // security purpose. @@ -4719,6 +5204,8 @@ static void *thread_manage_clients(void *data) continue; } + health_code_update(&health_thread_cmd); + DBG("Sending response (size: %d, retcode: %s)", cmd_ctx->lttng_msg_size, lttng_strerror(-cmd_ctx->llm->ret_code)); @@ -4735,9 +5222,18 @@ static void *thread_manage_clients(void *data) sock = -1; clean_command_ctx(&cmd_ctx); + + health_code_update(&health_thread_cmd); } +exit: error: + if (err) { + health_error(&health_thread_cmd); + ERR("Health error occurred in %s", __func__); + } + health_exit(&health_thread_cmd); + DBG("Client thread dying"); unlink(client_unix_sock_path); if (client_sock >= 0) { @@ -5286,6 +5782,11 @@ int main(int argc, char **argv) DEFAULT_GLOBAL_APPS_WAIT_SHM_PATH); } + if (strlen(health_unix_sock_path) == 0) { + snprintf(health_unix_sock_path, sizeof(health_unix_sock_path), + DEFAULT_GLOBAL_HEALTH_UNIX_SOCK); + } + /* Setup kernel consumerd path */ snprintf(kconsumer_data.err_unix_sock_path, PATH_MAX, DEFAULT_KCONSUMERD_ERR_SOCK_PATH, rundir); @@ -5336,6 +5837,12 @@ int main(int argc, char **argv) snprintf(wait_shm_path, PATH_MAX, DEFAULT_HOME_APPS_WAIT_SHM_PATH, geteuid()); } + + /* Set health check Unix path */ + if (strlen(health_unix_sock_path) == 0) { + snprintf(health_unix_sock_path, sizeof(health_unix_sock_path), + DEFAULT_HOME_HEALTH_UNIX_SOCK, home_path); + } } /* Set consumer initial state */ @@ -5468,6 +5975,32 @@ int main(int argc, char **argv) */ uatomic_set(&relayd_net_seq_idx, 1); + /* Init all health thread counters. */ + health_init(&health_thread_cmd); + health_init(&health_thread_kernel); + health_init(&health_thread_app_manage); + health_init(&health_thread_app_reg); + + /* + * Init health counters of the consumer thread. We do a quick hack here to + * the state of the consumer health is fine even if the thread is not + * started. This is simply to ease our life and has no cost what so ever. + */ + health_init(&kconsumer_data.health); + health_poll_update(&kconsumer_data.health); + health_init(&ustconsumer32_data.health); + health_poll_update(&ustconsumer32_data.health); + health_init(&ustconsumer64_data.health); + health_poll_update(&ustconsumer64_data.health); + + /* Create thread to manage the client socket */ + ret = pthread_create(&health_thread, NULL, + thread_manage_health, (void *) NULL); + if (ret != 0) { + PERROR("pthread_create health"); + goto exit_health; + } + /* Create thread to manage the client socket */ ret = pthread_create(&client_thread, NULL, thread_manage_clients, (void *) NULL); @@ -5549,6 +6082,7 @@ exit_dispatch: } exit_client: +exit_health: exit: /* * cleanup() is called when no other thread is running.