X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fmain.c;h=4b6a39dd5ecea5ef176139e7da2c6058f5884094;hp=bf5adc50130c6609bb9239542d3153941def70fa;hb=e8209f6b352b3aa279d8d452e396adef6f7159c7;hpb=44a5e5eb99f1d8b528f83fda5585677a3882f5f5 diff --git a/src/bin/lttng-sessiond/main.c b/src/bin/lttng-sessiond/main.c index bf5adc501..4b6a39dd5 100644 --- a/src/bin/lttng-sessiond/main.c +++ b/src/bin/lttng-sessiond/main.c @@ -44,6 +44,7 @@ #include #include #include +#include #include "lttng-sessiond.h" #include "channel.h" @@ -86,6 +87,8 @@ static struct consumer_data kconsumer_data = { .cmd_unix_sock_path = DEFAULT_KCONSUMERD_CMD_SOCK_PATH, .err_sock = -1, .cmd_sock = -1, + .pid_mutex = PTHREAD_MUTEX_INITIALIZER, + .lock = PTHREAD_MUTEX_INITIALIZER, }; static struct consumer_data ustconsumer64_data = { .type = LTTNG_CONSUMER64_UST, @@ -93,6 +96,8 @@ static struct consumer_data ustconsumer64_data = { .cmd_unix_sock_path = DEFAULT_USTCONSUMERD64_CMD_SOCK_PATH, .err_sock = -1, .cmd_sock = -1, + .pid_mutex = PTHREAD_MUTEX_INITIALIZER, + .lock = PTHREAD_MUTEX_INITIALIZER, }; static struct consumer_data ustconsumer32_data = { .type = LTTNG_CONSUMER32_UST, @@ -100,8 +105,11 @@ static struct consumer_data ustconsumer32_data = { .cmd_unix_sock_path = DEFAULT_USTCONSUMERD32_CMD_SOCK_PATH, .err_sock = -1, .cmd_sock = -1, + .pid_mutex = PTHREAD_MUTEX_INITIALIZER, + .lock = PTHREAD_MUTEX_INITIALIZER, }; +/* Shared between threads */ static int dispatch_thread_exit; /* Global application Unix socket path */ @@ -214,6 +222,7 @@ static unsigned int relayd_net_seq_idx; /* Used for the health monitoring of the session daemon. See health.h */ struct health_state health_thread_cmd; +struct health_state health_thread_app_manage; struct health_state health_thread_app_reg; struct health_state health_thread_kernel; @@ -362,23 +371,56 @@ error: */ static void teardown_kernel_session(struct ltt_session *session) { + int ret; + struct lttng_ht_iter iter; + struct ltt_kernel_session *ksess; + struct consumer_socket *socket; + if (!session->kernel_session) { DBG3("No kernel session when tearing down session"); return; } + ksess = session->kernel_session; + DBG("Tearing down kernel session"); + /* + * Destroy relayd associated with the session consumer. This action is + * valid since in order to destroy a session we must acquire the session + * lock. This means that there CAN NOT be stream(s) being sent to a + * consumer since this action also requires the session lock at any time. + * + * At this point, we are sure that not streams data will be lost after this + * command is issued. + */ + if (ksess->consumer && ksess->consumer->type == CONSUMER_DST_NET) { + cds_lfht_for_each_entry(ksess->consumer->socks->ht, &iter.iter, socket, + node.node) { + ret = consumer_send_destroy_relayd(socket, ksess->consumer); + if (ret < 0) { + ERR("Unable to send destroy relayd command to consumer"); + /* Continue since we MUST delete everything at this point. */ + } + } + } + /* * If a custom kernel consumer was registered, close the socket before * tearing down the complete kernel session structure */ - if (kconsumer_data.cmd_sock >= 0 && - session->kernel_session->consumer_fd != kconsumer_data.cmd_sock) { - lttcomm_close_unix_sock(session->kernel_session->consumer_fd); + cds_lfht_for_each_entry(ksess->consumer->socks->ht, &iter.iter, socket, + node.node) { + if (socket->fd != kconsumer_data.cmd_sock) { + rcu_read_lock(); + consumer_del_socket(socket, ksess->consumer); + lttcomm_close_unix_sock(socket->fd); + consumer_destroy_socket(socket); + rcu_read_unlock(); + } } - trace_kernel_destroy_session(session->kernel_session); + trace_kernel_destroy_session(ksess); } /* @@ -388,20 +430,44 @@ static void teardown_kernel_session(struct ltt_session *session) static void teardown_ust_session(struct ltt_session *session) { int ret; + struct lttng_ht_iter iter; + struct ltt_ust_session *usess; + struct consumer_socket *socket; if (!session->ust_session) { DBG3("No UST session when tearing down session"); return; } + usess = session->ust_session; DBG("Tearing down UST session(s)"); - ret = ust_app_destroy_trace_all(session->ust_session); + /* + * Destroy relayd associated with the session consumer. This action is + * valid since in order to destroy a session we must acquire the session + * lock. This means that there CAN NOT be stream(s) being sent to a + * consumer since this action also requires the session lock at any time. + * + * At this point, we are sure that not streams data will be lost after this + * command is issued. + */ + if (usess->consumer && usess->consumer->type == CONSUMER_DST_NET) { + cds_lfht_for_each_entry(usess->consumer->socks->ht, &iter.iter, socket, + node.node) { + ret = consumer_send_destroy_relayd(socket, usess->consumer); + if (ret < 0) { + ERR("Unable to send destroy relayd command to consumer"); + /* Continue since we MUST delete everything at this point. */ + } + } + } + + ret = ust_app_destroy_trace_all(usess); if (ret) { ERR("Error in ust_app_destroy_trace_all"); } - trace_ust_destroy_session(session->ust_session); + trace_ust_destroy_session(usess); } /* @@ -419,7 +485,7 @@ static void stop_threads(void) } /* Dispatch thread */ - dispatch_thread_exit = 1; + CMM_STORE_SHARED(dispatch_thread_exit, 1); futex_nto1_wake(&ust_cmd_queue.futex); } @@ -465,8 +531,6 @@ static void cleanup(void) DBG("Closing all UST sockets"); ust_app_clean_list(); - pthread_mutex_destroy(&kconsumer_data.pid_mutex); - if (is_root && !opt_no_kernel) { DBG2("Closing kernel fd"); if (kernel_tracer_fd >= 0) { @@ -631,6 +695,7 @@ static int update_kernel_stream(struct consumer_data *consumer_data, int fd) { int ret = 0; struct ltt_session *session; + struct ltt_kernel_session *ksess; struct ltt_kernel_channel *channel; DBG("Updating kernel streams for channel fd %d", fd); @@ -642,14 +707,9 @@ static int update_kernel_stream(struct consumer_data *consumer_data, int fd) session_unlock(session); continue; } + ksess = session->kernel_session; - /* This is not suppose to be -1 but this is an extra security check */ - if (session->kernel_session->consumer_fd < 0) { - session->kernel_session->consumer_fd = consumer_data->cmd_sock; - } - - cds_list_for_each_entry(channel, - &session->kernel_session->channel_list.head, list) { + cds_list_for_each_entry(channel, &ksess->channel_list.head, list) { if (channel->fd == fd) { DBG("Channel found, updating kernel streams"); ret = kernel_open_channel_stream(channel); @@ -662,13 +722,23 @@ static int update_kernel_stream(struct consumer_data *consumer_data, int fd) * that tracing is started so it is safe to send our updated * stream fds. */ - if (session->kernel_session->consumer_fds_sent == 1 && - session->kernel_session->consumer != NULL) { - ret = kernel_consumer_send_channel_stream( - session->kernel_session->consumer_fd, channel, - session->kernel_session); - if (ret < 0) { - goto error; + if (ksess->consumer_fds_sent == 1 && ksess->consumer != NULL) { + struct lttng_ht_iter iter; + struct consumer_socket *socket; + + + cds_lfht_for_each_entry(ksess->consumer->socks->ht, + &iter.iter, socket, node.node) { + /* Code flow error */ + assert(socket->fd >= 0); + + pthread_mutex_lock(socket->lock); + ret = kernel_consumer_send_channel_stream(socket->fd, + channel, ksess); + pthread_mutex_unlock(socket->lock); + if (ret < 0) { + goto error; + } } } goto error; @@ -714,7 +784,7 @@ static void update_ust_app(int app_sock) */ static void *thread_manage_kernel(void *data) { - int ret, i, pollfd, update_poll_flag = 1; + int ret, i, pollfd, update_poll_flag = 1, err = -1; uint32_t revents, nb_fd; char tmp; struct lttng_poll_event events; @@ -787,7 +857,8 @@ static void *thread_manage_kernel(void *data) /* Thread quit pipe has been closed. Killing thread. */ ret = check_thread_quit_pipe(pollfd, revents); if (ret) { - goto error; + err = 0; + goto exit; } /* Check for data on kernel pipe */ @@ -815,10 +886,15 @@ static void *thread_manage_kernel(void *data) } } +exit: error: lttng_poll_clean(&events); error_poll_create: - health_reset(&health_thread_kernel); + if (err) { + health_error(&health_thread_kernel); + ERR("Health error occurred in %s", __func__); + } + health_exit(&health_thread_kernel); DBG("Kernel thread dying"); return NULL; } @@ -828,7 +904,7 @@ error_poll_create: */ static void *thread_manage_consumer(void *data) { - int sock = -1, i, ret, pollfd; + int sock = -1, i, ret, pollfd, err = -1; uint32_t revents, nb_fd; enum lttcomm_return_code code; struct lttng_poll_event events; @@ -886,7 +962,8 @@ restart: /* Thread quit pipe has been closed. Killing thread. */ ret = check_thread_quit_pipe(pollfd, revents); if (ret) { - goto error; + err = 0; + goto exit; } /* Event on the registration socket */ @@ -974,7 +1051,8 @@ restart_poll: /* Thread quit pipe has been closed. Killing thread. */ ret = check_thread_quit_pipe(pollfd, revents); if (ret) { - goto error; + err = 0; + goto exit; } /* Event on the kconsumerd socket */ @@ -998,6 +1076,7 @@ restart_poll: ERR("consumer return code : %s", lttcomm_get_readable_code(-code)); +exit: error: /* Immediately set the consumerd state to stopped */ if (consumer_data->type == LTTNG_CONSUMER_KERNEL) { @@ -1036,7 +1115,11 @@ error: lttng_poll_clean(&events); error_poll: error_listen: - health_reset(&consumer_data->health); + if (err) { + health_error(&consumer_data->health); + ERR("Health error occurred in %s", __func__); + } + health_exit(&consumer_data->health); DBG("consumer thread cleanup completed"); return NULL; @@ -1047,7 +1130,7 @@ error_listen: */ static void *thread_manage_apps(void *data) { - int i, ret, pollfd; + int i, ret, pollfd, err = -1; uint32_t revents, nb_fd; struct ust_command ust_cmd; struct lttng_poll_event events; @@ -1057,7 +1140,7 @@ static void *thread_manage_apps(void *data) rcu_register_thread(); rcu_thread_online(); - health_code_update(&health_thread_app_reg); + health_code_update(&health_thread_app_manage); ret = create_thread_poll_set(&events, 2); if (ret < 0) { @@ -1069,7 +1152,7 @@ static void *thread_manage_apps(void *data) goto error; } - health_code_update(&health_thread_app_reg); + health_code_update(&health_thread_app_manage); while (1) { /* Zeroed the events structure */ @@ -1081,9 +1164,9 @@ static void *thread_manage_apps(void *data) /* Inifinite blocking call, waiting for transmission */ restart: - health_poll_update(&health_thread_app_reg); + health_poll_update(&health_thread_app_manage); ret = lttng_poll_wait(&events, -1); - health_poll_update(&health_thread_app_reg); + health_poll_update(&health_thread_app_manage); if (ret < 0) { /* * Restart interrupted system call. @@ -1099,12 +1182,13 @@ static void *thread_manage_apps(void *data) revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); - health_code_update(&health_thread_app_reg); + health_code_update(&health_thread_app_manage); /* Thread quit pipe has been closed. Killing thread. */ ret = check_thread_quit_pipe(pollfd, revents); if (ret) { - goto error; + err = 0; + goto exit; } /* Inspect the apps cmd pipe */ @@ -1120,7 +1204,7 @@ static void *thread_manage_apps(void *data) goto error; } - health_code_update(&health_thread_app_reg); + health_code_update(&health_thread_app_manage); /* Register applicaton to the session daemon */ ret = ust_app_register(&ust_cmd.reg_msg, @@ -1131,7 +1215,7 @@ static void *thread_manage_apps(void *data) break; } - health_code_update(&health_thread_app_reg); + health_code_update(&health_thread_app_manage); /* * Validate UST version compatibility. @@ -1145,7 +1229,7 @@ static void *thread_manage_apps(void *data) update_ust_app(ust_cmd.sock); } - health_code_update(&health_thread_app_reg); + health_code_update(&health_thread_app_manage); ret = ust_app_register_done(ust_cmd.sock); if (ret < 0) { @@ -1170,7 +1254,7 @@ static void *thread_manage_apps(void *data) ust_cmd.sock); } - health_code_update(&health_thread_app_reg); + health_code_update(&health_thread_app_manage); break; } @@ -1192,14 +1276,19 @@ static void *thread_manage_apps(void *data) } } - health_code_update(&health_thread_app_reg); + health_code_update(&health_thread_app_manage); } } +exit: error: lttng_poll_clean(&events); error_poll_create: - health_reset(&health_thread_app_reg); + if (err) { + health_error(&health_thread_app_manage); + ERR("Health error occurred in %s", __func__); + } + health_exit(&health_thread_app_manage); DBG("Application communication apps thread cleanup complete"); rcu_thread_offline(); rcu_unregister_thread(); @@ -1218,7 +1307,7 @@ static void *thread_dispatch_ust_registration(void *data) DBG("[thread] Dispatch UST command started"); - while (!dispatch_thread_exit) { + while (!CMM_LOAD_SHARED(dispatch_thread_exit)) { /* Atomically prepare the queue futex */ futex_nto1_prepare(&ust_cmd_queue.futex); @@ -1275,7 +1364,7 @@ error: */ static void *thread_registration_apps(void *data) { - int sock = -1, i, ret, pollfd; + int sock = -1, i, ret, pollfd, err = -1; uint32_t revents, nb_fd; struct lttng_poll_event events; /* @@ -1321,7 +1410,9 @@ static void *thread_registration_apps(void *data) /* Inifinite blocking call, waiting for transmission */ restart: + health_poll_update(&health_thread_app_reg); ret = lttng_poll_wait(&events, -1); + health_poll_update(&health_thread_app_reg); if (ret < 0) { /* * Restart interrupted system call. @@ -1333,6 +1424,8 @@ static void *thread_registration_apps(void *data) } for (i = 0; i < nb_fd; i++) { + health_code_update(&health_thread_app_reg); + /* Fetch once the poll data */ revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); @@ -1340,7 +1433,8 @@ static void *thread_registration_apps(void *data) /* Thread quit pipe has been closed. Killing thread. */ ret = check_thread_quit_pipe(pollfd, revents); if (ret) { - goto error; + err = 0; + goto exit; } /* Event on the registration socket */ @@ -1376,6 +1470,7 @@ static void *thread_registration_apps(void *data) sock = -1; continue; } + health_code_update(&health_thread_app_reg); ret = lttcomm_recv_unix_sock(sock, &ust_cmd->reg_msg, sizeof(struct ust_register_msg)); if (ret < 0 || ret < sizeof(struct ust_register_msg)) { @@ -1393,6 +1488,7 @@ static void *thread_registration_apps(void *data) sock = -1; continue; } + health_code_update(&health_thread_app_reg); ust_cmd->sock = sock; sock = -1; @@ -1420,7 +1516,14 @@ static void *thread_registration_apps(void *data) } } +exit: error: + if (err) { + health_error(&health_thread_app_reg); + ERR("Health error occurred in %s", __func__); + } + health_exit(&health_thread_app_reg); + /* Notify that the registration thread is gone */ notify_ust_apps(0); @@ -1526,7 +1629,8 @@ static int join_consumer_thread(struct consumer_data *consumer_data) void *status; int ret; - if (consumer_data->pid != 0) { + /* Consumer pid must be a real one. */ + if (consumer_data->pid > 0) { ret = kill(consumer_data->pid, SIGTERM); if (ret) { ERR("Error killing consumer daemon"); @@ -1740,15 +1844,15 @@ error: } /* - * Compute health status of each consumer. + * Compute health status of each consumer. If one of them is zero (bad + * state), we return 0. */ static int check_consumer_health(void) { int ret; - ret = - health_check_state(&kconsumer_data.health) & - health_check_state(&ustconsumer32_data.health) & + ret = health_check_state(&kconsumer_data.health) && + health_check_state(&ustconsumer32_data.health) && health_check_state(&ustconsumer64_data.health); DBG3("Health consumer check %d", ret); @@ -1833,21 +1937,24 @@ error: static int init_kernel_tracing(struct ltt_kernel_session *session) { int ret = 0; + struct lttng_ht_iter iter; + struct consumer_socket *socket; - if (session->consumer_fds_sent == 0 && session->consumer != NULL) { - /* - * Assign default kernel consumer socket if no consumer assigned to the - * kernel session. At this point, it's NOT supposed to be -1 but this is - * an extra security check. - */ - if (session->consumer_fd < 0) { - session->consumer_fd = kconsumer_data.cmd_sock; - } + assert(session); - ret = kernel_consumer_send_session(session->consumer_fd, session); - if (ret < 0) { - ret = LTTCOMM_KERN_CONSUMER_FAIL; - goto error; + if (session->consumer_fds_sent == 0 && session->consumer != NULL) { + cds_lfht_for_each_entry(session->consumer->socks->ht, &iter.iter, + socket, node.node) { + /* Code flow error */ + assert(socket->fd >= 0); + + pthread_mutex_lock(socket->lock); + ret = kernel_consumer_send_session(socket->fd, session); + pthread_mutex_unlock(socket->lock); + if (ret < 0) { + ret = LTTCOMM_KERN_CONSUMER_FAIL; + goto error; + } } } @@ -1956,25 +2063,12 @@ static int send_socket_relayd_consumer(int domain, struct ltt_session *session, session->net_handle = 1; } - switch (domain) { - case LTTNG_DOMAIN_KERNEL: - /* Send relayd socket to consumer. */ - ret = kernel_consumer_send_relayd_socket(consumer_fd, sock, - consumer, relayd_uri->stype); - if (ret < 0) { - ret = LTTCOMM_ENABLE_CONSUMER_FAIL; - goto close_sock; - } - break; - case LTTNG_DOMAIN_UST: - /* Send relayd socket to consumer. */ - ret = ust_consumer_send_relayd_socket(consumer_fd, sock, - consumer, relayd_uri->stype); - if (ret < 0) { - ret = LTTCOMM_ENABLE_CONSUMER_FAIL; - goto close_sock; - } - break; + /* Send relayd socket to consumer. */ + ret = consumer_send_relayd_socket(consumer_fd, sock, + consumer, relayd_uri->stype); + if (ret < 0) { + ret = LTTCOMM_ENABLE_CONSUMER_FAIL; + goto close_sock; } ret = LTTCOMM_OK; @@ -2031,6 +2125,8 @@ static int setup_relayd(struct ltt_session *session) int ret = LTTCOMM_OK; struct ltt_ust_session *usess; struct ltt_kernel_session *ksess; + struct consumer_socket *socket; + struct lttng_ht_iter iter; assert(session); @@ -2039,34 +2135,37 @@ static int setup_relayd(struct ltt_session *session) DBG2("Setting relayd for session %s", session->name); - if (usess && usess->consumer->sock == -1 && - usess->consumer->type == CONSUMER_DST_NET && + if (usess && usess->consumer->type == CONSUMER_DST_NET && usess->consumer->enabled) { - /* Setup relayd for 64 bits consumer */ - if (ust_consumerd64_fd >= 0) { + /* For each consumer socket, send relayd sockets */ + cds_lfht_for_each_entry(usess->consumer->socks->ht, &iter.iter, + socket, node.node) { + /* Code flow error */ + assert(socket->fd >= 0); + + pthread_mutex_lock(socket->lock); send_sockets_relayd_consumer(LTTNG_DOMAIN_UST, session, - usess->consumer, ust_consumerd64_fd); + usess->consumer, socket->fd); + pthread_mutex_unlock(socket->lock); if (ret != LTTCOMM_OK) { goto error; } } - - /* Setup relayd for 32 bits consumer */ - if (ust_consumerd32_fd >= 0) { - send_sockets_relayd_consumer(LTTNG_DOMAIN_UST, session, - usess->consumer, ust_consumerd32_fd); + } else if (ksess && ksess->consumer->type == CONSUMER_DST_NET && + ksess->consumer->enabled) { + cds_lfht_for_each_entry(ksess->consumer->socks->ht, &iter.iter, + socket, node.node) { + /* Code flow error */ + assert(socket->fd >= 0); + + pthread_mutex_lock(socket->lock); + send_sockets_relayd_consumer(LTTNG_DOMAIN_KERNEL, session, + ksess->consumer, socket->fd); + pthread_mutex_unlock(socket->lock); if (ret != LTTCOMM_OK) { goto error; } } - } else if (ksess && ksess->consumer->sock == -1 && - ksess->consumer->type == CONSUMER_DST_NET && - ksess->consumer->enabled) { - send_sockets_relayd_consumer(LTTNG_DOMAIN_KERNEL, session, - ksess->consumer, ksess->consumer_fd); - if (ret != LTTCOMM_OK) { - goto error; - } } error: @@ -2196,11 +2295,6 @@ static int create_kernel_session(struct ltt_session *session) goto error; } - /* Set kernel consumer socket fd */ - if (kconsumer_data.cmd_sock >= 0) { - session->kernel_session->consumer_fd = kconsumer_data.cmd_sock; - } - /* Copy session output to the newly created Kernel session */ ret = copy_session_consumer(LTTNG_DOMAIN_KERNEL, session); if (ret != LTTCOMM_OK) { @@ -3442,6 +3536,9 @@ static int cmd_destroy_session(struct ltt_session *session, char *name) { int ret; + /* Safety net */ + assert(session); + /* Clean kernel session teardown */ teardown_kernel_session(session); /* UST session teardown */ @@ -3511,6 +3608,7 @@ static int cmd_register_consumer(struct ltt_session *session, int domain, char *sock_path) { int ret, sock; + struct consumer_socket *socket; switch (domain) { case LTTNG_DOMAIN_KERNEL: @@ -3526,7 +3624,29 @@ static int cmd_register_consumer(struct ltt_session *session, int domain, goto error; } - session->kernel_session->consumer_fd = sock; + socket = consumer_allocate_socket(sock); + if (socket == NULL) { + ret = LTTCOMM_FATAL; + close(sock); + goto error; + } + + socket->lock = zmalloc(sizeof(pthread_mutex_t)); + if (socket->lock == NULL) { + PERROR("zmalloc pthread mutex"); + ret = LTTCOMM_FATAL; + goto error; + } + pthread_mutex_init(socket->lock, NULL); + + rcu_read_lock(); + consumer_add_socket(socket, session->kernel_session->consumer); + rcu_read_unlock(); + + pthread_mutex_lock(&kconsumer_data.pid_mutex); + kconsumer_data.pid = -1; + pthread_mutex_unlock(&kconsumer_data.pid_mutex); + break; default: /* TODO: Userspace tracing */ @@ -3682,6 +3802,10 @@ static int cmd_set_consumer_uri(int domain, struct ltt_session *session, switch (domain) { case LTTNG_DOMAIN_KERNEL: + { + struct lttng_ht_iter iter; + struct consumer_socket *socket; + /* Code flow error if we don't have a kernel session here. */ assert(ksess); @@ -3715,11 +3839,20 @@ static int cmd_set_consumer_uri(int domain, struct ltt_session *session, sizeof(consumer->subdir)); } - ret = send_socket_relayd_consumer(domain, session, uri, consumer, - ksess->consumer_fd); - if (ret != LTTCOMM_OK) { - goto error; + cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, + socket, node.node) { + /* Code flow error */ + assert(socket->fd >= 0); + + pthread_mutex_lock(socket->lock); + ret = send_socket_relayd_consumer(domain, session, uri, consumer, + socket->fd); + pthread_mutex_unlock(socket->lock); + if (ret != LTTCOMM_OK) { + goto error; + } } + break; case LTTNG_DST_PATH: DBG2("Setting trace directory path from URI to %s", uri->dst.path); @@ -3735,6 +3868,7 @@ static int cmd_set_consumer_uri(int domain, struct ltt_session *session, /* All good! */ break; + } case LTTNG_DOMAIN_UST: /* Code flow error if we don't have a kernel session here. */ assert(usess); @@ -3755,6 +3889,8 @@ static int cmd_set_consumer_uri(int domain, struct ltt_session *session, case LTTNG_DST_IPV4: case LTTNG_DST_IPV6: { + struct consumer_socket *socket; + DBG2("Setting network URI for UST session %s", session->name); /* Set URI into consumer object */ @@ -3770,22 +3906,31 @@ static int cmd_set_consumer_uri(int domain, struct ltt_session *session, sizeof(consumer->subdir)); } - if (ust_consumerd64_fd >= 0) { + rcu_read_lock(); + socket = consumer_find_socket(uatomic_read(&ust_consumerd64_fd), + consumer); + if (socket != NULL) { + pthread_mutex_lock(socket->lock); ret = send_socket_relayd_consumer(domain, session, uri, - consumer, ust_consumerd64_fd); + consumer, socket->fd); + pthread_mutex_unlock(socket->lock); if (ret != LTTCOMM_OK) { goto error; } } - if (ust_consumerd32_fd >= 0) { + socket = consumer_find_socket(uatomic_read(&ust_consumerd32_fd), + consumer); + if (socket != NULL) { + pthread_mutex_lock(socket->lock); ret = send_socket_relayd_consumer(domain, session, uri, - consumer, ust_consumerd32_fd); + consumer, socket->fd); + pthread_mutex_unlock(socket->lock); if (ret != LTTCOMM_OK) { goto error; } } - + rcu_read_unlock(); break; } case LTTNG_DST_PATH: @@ -4171,6 +4316,8 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock, /* Need a session for kernel command */ if (need_tracing_session) { + struct consumer_socket *socket; + if (cmd_ctx->session->kernel_session == NULL) { ret = create_kernel_session(cmd_ctx->session); if (ret < 0) { @@ -4190,13 +4337,29 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock, goto error; } uatomic_set(&kernel_consumerd_state, CONSUMER_STARTED); - - /* Set consumer fd of the session */ - cmd_ctx->session->kernel_session->consumer_fd = - kconsumer_data.cmd_sock; } else { pthread_mutex_unlock(&kconsumer_data.pid_mutex); } + + /* Set kernel consumer socket fd */ + if (kconsumer_data.cmd_sock >= 0) { + rcu_read_lock(); + socket = consumer_find_socket(kconsumer_data.cmd_sock, + cmd_ctx->session->kernel_session->consumer); + rcu_read_unlock(); + if (socket == NULL) { + socket = consumer_allocate_socket(kconsumer_data.cmd_sock); + if (socket == NULL) { + goto error; + } + + socket->lock = &kconsumer_data.lock; + rcu_read_lock(); + consumer_add_socket(socket, + cmd_ctx->session->kernel_session->consumer); + rcu_read_unlock(); + } + } } break; @@ -4209,6 +4372,8 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock, } if (need_tracing_session) { + struct consumer_socket *socket; + if (cmd_ctx->session->ust_session == NULL) { ret = create_ust_session(cmd_ctx->session, &cmd_ctx->lsm->domain); @@ -4227,15 +4392,40 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock, ret = start_consumerd(&ustconsumer64_data); if (ret < 0) { ret = LTTCOMM_UST_CONSUMER64_FAIL; - ust_consumerd64_fd = -EINVAL; + uatomic_set(&ust_consumerd64_fd, -EINVAL); goto error; } - ust_consumerd64_fd = ustconsumer64_data.cmd_sock; + uatomic_set(&ust_consumerd64_fd, ustconsumer64_data.cmd_sock); uatomic_set(&ust_consumerd_state, CONSUMER_STARTED); } else { pthread_mutex_unlock(&ustconsumer64_data.pid_mutex); } + + /* + * Setup socket for consumer 64 bit. No need for atomic access + * since it was set above and can ONLY be set in this thread. + */ + if (ust_consumerd64_fd >= 0) { + rcu_read_lock(); + socket = consumer_find_socket(uatomic_read(&ust_consumerd64_fd), + cmd_ctx->session->ust_session->consumer); + rcu_read_unlock(); + if (socket == NULL) { + socket = consumer_allocate_socket(ust_consumerd64_fd); + if (socket == NULL) { + goto error; + } + socket->lock = &ustconsumer32_data.lock; + + rcu_read_lock(); + consumer_add_socket(socket, + cmd_ctx->session->ust_session->consumer); + rcu_read_unlock(); + } + DBG3("UST consumer 64 bit socket set to %d", socket->fd); + } + /* 32-bit */ if (consumerd32_bin[0] != '\0' && ustconsumer32_data.pid == 0 && @@ -4244,15 +4434,39 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock, ret = start_consumerd(&ustconsumer32_data); if (ret < 0) { ret = LTTCOMM_UST_CONSUMER32_FAIL; - ust_consumerd32_fd = -EINVAL; + uatomic_set(&ust_consumerd32_fd, -EINVAL); goto error; } - ust_consumerd32_fd = ustconsumer32_data.cmd_sock; + uatomic_set(&ust_consumerd32_fd, ustconsumer32_data.cmd_sock); uatomic_set(&ust_consumerd_state, CONSUMER_STARTED); } else { pthread_mutex_unlock(&ustconsumer32_data.pid_mutex); } + + /* + * Setup socket for consumer 64 bit. No need for atomic access + * since it was set above and can ONLY be set in this thread. + */ + if (ust_consumerd32_fd >= 0) { + rcu_read_lock(); + socket = consumer_find_socket(uatomic_read(&ust_consumerd64_fd), + cmd_ctx->session->ust_session->consumer); + rcu_read_unlock(); + if (socket == NULL) { + socket = consumer_allocate_socket(ust_consumerd32_fd); + if (socket == NULL) { + goto error; + } + socket->lock = &ustconsumer32_data.lock; + + rcu_read_lock(); + consumer_add_socket(socket, + cmd_ctx->session->ust_session->consumer); + rcu_read_unlock(); + } + DBG3("UST consumer 32 bit socket set to %d", socket->fd); + } } break; } @@ -4638,7 +4852,7 @@ init_setup_error: */ static void *thread_manage_health(void *data) { - int sock = -1, new_sock, ret, i, pollfd; + int sock = -1, new_sock, ret, i, pollfd, err = -1; uint32_t revents, nb_fd; struct lttng_poll_event events; struct lttcomm_health_msg msg; @@ -4702,7 +4916,8 @@ restart: /* Thread quit pipe has been closed. Killing thread. */ ret = check_thread_quit_pipe(pollfd, revents); if (ret) { - goto error; + err = 0; + goto exit; } /* Event on the registration socket */ @@ -4737,6 +4952,9 @@ restart: case LTTNG_HEALTH_CMD: reply.ret_code = health_check_state(&health_thread_cmd); break; + case LTTNG_HEALTH_APP_MANAGE: + reply.ret_code = health_check_state(&health_thread_app_manage); + break; case LTTNG_HEALTH_APP_REG: reply.ret_code = health_check_state(&health_thread_app_reg); break; @@ -4747,13 +4965,12 @@ restart: reply.ret_code = check_consumer_health(); break; case LTTNG_HEALTH_ALL: - ret = check_consumer_health(); - reply.ret_code = - health_check_state(&health_thread_app_reg) & - health_check_state(&health_thread_cmd) & - health_check_state(&health_thread_kernel) & - ret; + health_check_state(&health_thread_app_manage) && + health_check_state(&health_thread_app_reg) && + health_check_state(&health_thread_cmd) && + health_check_state(&health_thread_kernel) && + check_consumer_health(); break; default: reply.ret_code = LTTCOMM_UND; @@ -4785,7 +5002,11 @@ restart: new_sock = -1; } +exit: error: + if (err) { + ERR("Health error occurred in %s", __func__); + } DBG("Health check thread dying"); unlink(health_unix_sock_path); if (sock >= 0) { @@ -4813,7 +5034,7 @@ error: */ static void *thread_manage_clients(void *data) { - int sock = -1, ret, i, pollfd; + int sock = -1, ret, i, pollfd, err = -1; int sock_error; uint32_t revents, nb_fd; struct command_ctx *cmd_ctx = NULL; @@ -4884,7 +5105,8 @@ static void *thread_manage_clients(void *data) /* Thread quit pipe has been closed. Killing thread. */ ret = check_thread_quit_pipe(pollfd, revents); if (ret) { - goto error; + err = 0; + goto exit; } /* Event on the registration socket */ @@ -5004,8 +5226,13 @@ static void *thread_manage_clients(void *data) health_code_update(&health_thread_cmd); } +exit: error: - health_reset(&health_thread_cmd); + if (err) { + health_error(&health_thread_cmd); + ERR("Health error occurred in %s", __func__); + } + health_exit(&health_thread_cmd); DBG("Client thread dying"); unlink(client_unix_sock_path); @@ -5751,6 +5978,7 @@ int main(int argc, char **argv) /* Init all health thread counters. */ health_init(&health_thread_cmd); health_init(&health_thread_kernel); + health_init(&health_thread_app_manage); health_init(&health_thread_app_reg); /*