X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fmain.c;h=d0b1e17e544ef21b110381ea266f5037172e80f0;hp=a15721d76ae529b0fa45bfcfebec8b481931bdee;hb=8d3113b21fe5ed84af58fbfdff9ad7742db507d5;hpb=00e2e675d54dc726a7c8f8887c889cc8ef022003 diff --git a/src/bin/lttng-sessiond/main.c b/src/bin/lttng-sessiond/main.c index a15721d76..d0b1e17e5 100644 --- a/src/bin/lttng-sessiond/main.c +++ b/src/bin/lttng-sessiond/main.c @@ -44,6 +44,7 @@ #include #include #include +#include #include "lttng-sessiond.h" #include "channel.h" @@ -58,6 +59,8 @@ #include "ust-consumer.h" #include "utils.h" #include "fd-limit.h" +#include "filter.h" +#include "health.h" #define CONSUMERD_FILE "lttng-consumerd" @@ -84,6 +87,8 @@ static struct consumer_data kconsumer_data = { .cmd_unix_sock_path = DEFAULT_KCONSUMERD_CMD_SOCK_PATH, .err_sock = -1, .cmd_sock = -1, + .pid_mutex = PTHREAD_MUTEX_INITIALIZER, + .lock = PTHREAD_MUTEX_INITIALIZER, }; static struct consumer_data ustconsumer64_data = { .type = LTTNG_CONSUMER64_UST, @@ -91,6 +96,8 @@ static struct consumer_data ustconsumer64_data = { .cmd_unix_sock_path = DEFAULT_USTCONSUMERD64_CMD_SOCK_PATH, .err_sock = -1, .cmd_sock = -1, + .pid_mutex = PTHREAD_MUTEX_INITIALIZER, + .lock = PTHREAD_MUTEX_INITIALIZER, }; static struct consumer_data ustconsumer32_data = { .type = LTTNG_CONSUMER32_UST, @@ -98,8 +105,11 @@ static struct consumer_data ustconsumer32_data = { .cmd_unix_sock_path = DEFAULT_USTCONSUMERD32_CMD_SOCK_PATH, .err_sock = -1, .cmd_sock = -1, + .pid_mutex = PTHREAD_MUTEX_INITIALIZER, + .lock = PTHREAD_MUTEX_INITIALIZER, }; +/* Shared between threads */ static int dispatch_thread_exit; /* Global application Unix socket path */ @@ -108,6 +118,8 @@ static char apps_unix_sock_path[PATH_MAX]; static char client_unix_sock_path[PATH_MAX]; /* global wait shm path for UST */ static char wait_shm_path[PATH_MAX]; +/* Global health check unix path */ +static char health_unix_sock_path[PATH_MAX]; /* Sockets and FDs */ static int client_sock = -1; @@ -133,6 +145,7 @@ static pthread_t reg_apps_thread; static pthread_t client_thread; static pthread_t kernel_thread; static pthread_t dispatch_thread; +static pthread_t health_thread; /* * UST registration command queue. This queue is tied with a futex and uses a N @@ -207,6 +220,12 @@ static enum consumerd_state kernel_consumerd_state; */ static unsigned int relayd_net_seq_idx; +/* Used for the health monitoring of the session daemon. See health.h */ +struct health_state health_thread_cmd; +struct health_state health_thread_app_manage; +struct health_state health_thread_app_reg; +struct health_state health_thread_kernel; + static void setup_consumerd_path(void) { @@ -352,23 +371,56 @@ error: */ static void teardown_kernel_session(struct ltt_session *session) { + int ret; + struct lttng_ht_iter iter; + struct ltt_kernel_session *ksess; + struct consumer_socket *socket; + if (!session->kernel_session) { DBG3("No kernel session when tearing down session"); return; } + ksess = session->kernel_session; + DBG("Tearing down kernel session"); + /* + * Destroy relayd associated with the session consumer. This action is + * valid since in order to destroy a session we must acquire the session + * lock. This means that there CAN NOT be stream(s) being sent to a + * consumer since this action also requires the session lock at any time. + * + * At this point, we are sure that not streams data will be lost after this + * command is issued. + */ + if (ksess->consumer && ksess->consumer->type == CONSUMER_DST_NET) { + cds_lfht_for_each_entry(ksess->consumer->socks->ht, &iter.iter, socket, + node.node) { + ret = consumer_send_destroy_relayd(socket, ksess->consumer); + if (ret < 0) { + ERR("Unable to send destroy relayd command to consumer"); + /* Continue since we MUST delete everything at this point. */ + } + } + } + /* * If a custom kernel consumer was registered, close the socket before * tearing down the complete kernel session structure */ - if (kconsumer_data.cmd_sock >= 0 && - session->kernel_session->consumer_fd != kconsumer_data.cmd_sock) { - lttcomm_close_unix_sock(session->kernel_session->consumer_fd); + cds_lfht_for_each_entry(ksess->consumer->socks->ht, &iter.iter, socket, + node.node) { + if (socket->fd != kconsumer_data.cmd_sock) { + rcu_read_lock(); + consumer_del_socket(socket, ksess->consumer); + lttcomm_close_unix_sock(socket->fd); + consumer_destroy_socket(socket); + rcu_read_unlock(); + } } - trace_kernel_destroy_session(session->kernel_session); + trace_kernel_destroy_session(ksess); } /* @@ -378,20 +430,44 @@ static void teardown_kernel_session(struct ltt_session *session) static void teardown_ust_session(struct ltt_session *session) { int ret; + struct lttng_ht_iter iter; + struct ltt_ust_session *usess; + struct consumer_socket *socket; if (!session->ust_session) { DBG3("No UST session when tearing down session"); return; } + usess = session->ust_session; DBG("Tearing down UST session(s)"); - ret = ust_app_destroy_trace_all(session->ust_session); + /* + * Destroy relayd associated with the session consumer. This action is + * valid since in order to destroy a session we must acquire the session + * lock. This means that there CAN NOT be stream(s) being sent to a + * consumer since this action also requires the session lock at any time. + * + * At this point, we are sure that no data will be lost after this command + * is issued. + */ + if (usess->consumer && usess->consumer->type == CONSUMER_DST_NET) { + cds_lfht_for_each_entry(usess->consumer->socks->ht, &iter.iter, socket, + node.node) { + ret = consumer_send_destroy_relayd(socket, usess->consumer); + if (ret < 0) { + ERR("Unable to send destroy relayd command to consumer"); + /* Continue since we MUST delete everything at this point. */ + } + } + } + + ret = ust_app_destroy_trace_all(usess); if (ret) { ERR("Error in ust_app_destroy_trace_all"); } - trace_ust_destroy_session(session->ust_session); + trace_ust_destroy_session(usess); } /* @@ -409,7 +485,7 @@ static void stop_threads(void) } /* Dispatch thread */ - dispatch_thread_exit = 1; + CMM_STORE_SHARED(dispatch_thread_exit, 1); futex_nto1_wake(&ust_cmd_queue.futex); } @@ -455,8 +531,6 @@ static void cleanup(void) DBG("Closing all UST sockets"); ust_app_clean_list(); - pthread_mutex_destroy(&kconsumer_data.pid_mutex); - if (is_root && !opt_no_kernel) { DBG2("Closing kernel fd"); if (kernel_tracer_fd >= 0) { @@ -621,6 +695,7 @@ static int update_kernel_stream(struct consumer_data *consumer_data, int fd) { int ret = 0; struct ltt_session *session; + struct ltt_kernel_session *ksess; struct ltt_kernel_channel *channel; DBG("Updating kernel streams for channel fd %d", fd); @@ -632,14 +707,9 @@ static int update_kernel_stream(struct consumer_data *consumer_data, int fd) session_unlock(session); continue; } + ksess = session->kernel_session; - /* This is not suppose to be -1 but this is an extra security check */ - if (session->kernel_session->consumer_fd < 0) { - session->kernel_session->consumer_fd = consumer_data->cmd_sock; - } - - cds_list_for_each_entry(channel, - &session->kernel_session->channel_list.head, list) { + cds_list_for_each_entry(channel, &ksess->channel_list.head, list) { if (channel->fd == fd) { DBG("Channel found, updating kernel streams"); ret = kernel_open_channel_stream(channel); @@ -652,13 +722,23 @@ static int update_kernel_stream(struct consumer_data *consumer_data, int fd) * that tracing is started so it is safe to send our updated * stream fds. */ - if (session->kernel_session->consumer_fds_sent == 1 && - session->kernel_session->consumer != NULL) { - ret = kernel_consumer_send_channel_stream( - session->kernel_session->consumer_fd, channel, - session->kernel_session); - if (ret < 0) { - goto error; + if (ksess->consumer_fds_sent == 1 && ksess->consumer != NULL) { + struct lttng_ht_iter iter; + struct consumer_socket *socket; + + + cds_lfht_for_each_entry(ksess->consumer->socks->ht, + &iter.iter, socket, node.node) { + /* Code flow error */ + assert(socket->fd >= 0); + + pthread_mutex_lock(socket->lock); + ret = kernel_consumer_send_channel_stream(socket->fd, + channel, ksess); + pthread_mutex_unlock(socket->lock); + if (ret < 0) { + goto error; + } } } goto error; @@ -704,13 +784,15 @@ static void update_ust_app(int app_sock) */ static void *thread_manage_kernel(void *data) { - int ret, i, pollfd, update_poll_flag = 1; + int ret, i, pollfd, update_poll_flag = 1, err = -1; uint32_t revents, nb_fd; char tmp; struct lttng_poll_event events; DBG("Thread manage kernel started"); + health_code_update(&health_thread_kernel); + ret = create_thread_poll_set(&events, 2); if (ret < 0) { goto error_poll_create; @@ -722,6 +804,8 @@ static void *thread_manage_kernel(void *data) } while (1) { + health_code_update(&health_thread_kernel); + if (update_poll_flag == 1) { /* * Reset number of fd in the poll set. Always 2 since there is the thread @@ -745,7 +829,9 @@ static void *thread_manage_kernel(void *data) /* Poll infinite value of time */ restart: + health_poll_update(&health_thread_kernel); ret = lttng_poll_wait(&events, -1); + health_poll_update(&health_thread_kernel); if (ret < 0) { /* * Restart interrupted system call. @@ -766,10 +852,13 @@ static void *thread_manage_kernel(void *data) revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); + health_code_update(&health_thread_kernel); + /* Thread quit pipe has been closed. Killing thread. */ ret = check_thread_quit_pipe(pollfd, revents); if (ret) { - goto error; + err = 0; + goto exit; } /* Check for data on kernel pipe */ @@ -797,9 +886,15 @@ static void *thread_manage_kernel(void *data) } } +exit: error: lttng_poll_clean(&events); error_poll_create: + if (err) { + health_error(&health_thread_kernel); + ERR("Health error occurred in %s", __func__); + } + health_exit(&health_thread_kernel); DBG("Kernel thread dying"); return NULL; } @@ -809,7 +904,7 @@ error_poll_create: */ static void *thread_manage_consumer(void *data) { - int sock = -1, i, ret, pollfd; + int sock = -1, i, ret, pollfd, err = -1; uint32_t revents, nb_fd; enum lttcomm_return_code code; struct lttng_poll_event events; @@ -817,6 +912,8 @@ static void *thread_manage_consumer(void *data) DBG("[thread] Manage consumer started"); + health_code_update(&consumer_data->health); + ret = lttcomm_listen_unix_sock(consumer_data->err_sock); if (ret < 0) { goto error_listen; @@ -838,9 +935,13 @@ static void *thread_manage_consumer(void *data) nb_fd = LTTNG_POLL_GETNB(&events); + health_code_update(&consumer_data->health); + /* Inifinite blocking call, waiting for transmission */ restart: + health_poll_update(&consumer_data->health); ret = lttng_poll_wait(&events, -1); + health_poll_update(&consumer_data->health); if (ret < 0) { /* * Restart interrupted system call. @@ -856,10 +957,13 @@ restart: revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); + health_code_update(&consumer_data->health); + /* Thread quit pipe has been closed. Killing thread. */ ret = check_thread_quit_pipe(pollfd, revents); if (ret) { - goto error; + err = 0; + goto exit; } /* Event on the registration socket */ @@ -876,6 +980,8 @@ restart: goto error; } + health_code_update(&consumer_data->health); + DBG2("Receiving code from consumer err_sock"); /* Getting status code from kconsumerd */ @@ -885,6 +991,8 @@ restart: goto error; } + health_code_update(&consumer_data->health); + if (code == CONSUMERD_COMMAND_SOCK_READY) { consumer_data->cmd_sock = lttcomm_connect_unix_sock(consumer_data->cmd_unix_sock_path); @@ -913,12 +1021,16 @@ restart: goto error; } + health_code_update(&consumer_data->health); + /* Update number of fd */ nb_fd = LTTNG_POLL_GETNB(&events); /* Inifinite blocking call, waiting for transmission */ restart_poll: + health_poll_update(&consumer_data->health); ret = lttng_poll_wait(&events, -1); + health_poll_update(&consumer_data->health); if (ret < 0) { /* * Restart interrupted system call. @@ -934,10 +1046,13 @@ restart_poll: revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); + health_code_update(&consumer_data->health); + /* Thread quit pipe has been closed. Killing thread. */ ret = check_thread_quit_pipe(pollfd, revents); if (ret) { - goto error; + err = 0; + goto exit; } /* Event on the kconsumerd socket */ @@ -949,6 +1064,8 @@ restart_poll: } } + health_code_update(&consumer_data->health); + /* Wait for any kconsumerd error */ ret = lttcomm_recv_unix_sock(sock, &code, sizeof(enum lttcomm_return_code)); @@ -959,6 +1076,7 @@ restart_poll: ERR("consumer return code : %s", lttcomm_get_readable_code(-code)); +exit: error: /* Immediately set the consumerd state to stopped */ if (consumer_data->type == LTTNG_CONSUMER_KERNEL) { @@ -997,6 +1115,11 @@ error: lttng_poll_clean(&events); error_poll: error_listen: + if (err) { + health_error(&consumer_data->health); + ERR("Health error occurred in %s", __func__); + } + health_exit(&consumer_data->health); DBG("consumer thread cleanup completed"); return NULL; @@ -1007,7 +1130,7 @@ error_listen: */ static void *thread_manage_apps(void *data) { - int i, ret, pollfd; + int i, ret, pollfd, err = -1; uint32_t revents, nb_fd; struct ust_command ust_cmd; struct lttng_poll_event events; @@ -1017,6 +1140,8 @@ static void *thread_manage_apps(void *data) rcu_register_thread(); rcu_thread_online(); + health_code_update(&health_thread_app_manage); + ret = create_thread_poll_set(&events, 2); if (ret < 0) { goto error_poll_create; @@ -1027,6 +1152,8 @@ static void *thread_manage_apps(void *data) goto error; } + health_code_update(&health_thread_app_manage); + while (1) { /* Zeroed the events structure */ lttng_poll_reset(&events); @@ -1037,7 +1164,9 @@ static void *thread_manage_apps(void *data) /* Inifinite blocking call, waiting for transmission */ restart: + health_poll_update(&health_thread_app_manage); ret = lttng_poll_wait(&events, -1); + health_poll_update(&health_thread_app_manage); if (ret < 0) { /* * Restart interrupted system call. @@ -1053,10 +1182,13 @@ static void *thread_manage_apps(void *data) revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); + health_code_update(&health_thread_app_manage); + /* Thread quit pipe has been closed. Killing thread. */ ret = check_thread_quit_pipe(pollfd, revents); if (ret) { - goto error; + err = 0; + goto exit; } /* Inspect the apps cmd pipe */ @@ -1072,6 +1204,8 @@ static void *thread_manage_apps(void *data) goto error; } + health_code_update(&health_thread_app_manage); + /* Register applicaton to the session daemon */ ret = ust_app_register(&ust_cmd.reg_msg, ust_cmd.sock); @@ -1081,6 +1215,8 @@ static void *thread_manage_apps(void *data) break; } + health_code_update(&health_thread_app_manage); + /* * Validate UST version compatibility. */ @@ -1093,6 +1229,8 @@ static void *thread_manage_apps(void *data) update_ust_app(ust_cmd.sock); } + health_code_update(&health_thread_app_manage); + ret = ust_app_register_done(ust_cmd.sock); if (ret < 0) { /* @@ -1116,6 +1254,8 @@ static void *thread_manage_apps(void *data) ust_cmd.sock); } + health_code_update(&health_thread_app_manage); + break; } } else { @@ -1135,12 +1275,20 @@ static void *thread_manage_apps(void *data) break; } } + + health_code_update(&health_thread_app_manage); } } +exit: error: lttng_poll_clean(&events); error_poll_create: + if (err) { + health_error(&health_thread_app_manage); + ERR("Health error occurred in %s", __func__); + } + health_exit(&health_thread_app_manage); DBG("Application communication apps thread cleanup complete"); rcu_thread_offline(); rcu_unregister_thread(); @@ -1159,7 +1307,7 @@ static void *thread_dispatch_ust_registration(void *data) DBG("[thread] Dispatch UST command started"); - while (!dispatch_thread_exit) { + while (!CMM_LOAD_SHARED(dispatch_thread_exit)) { /* Atomically prepare the queue futex */ futex_nto1_prepare(&ust_cmd_queue.futex); @@ -1216,7 +1364,7 @@ error: */ static void *thread_registration_apps(void *data) { - int sock = -1, i, ret, pollfd; + int sock = -1, i, ret, pollfd, err = -1; uint32_t revents, nb_fd; struct lttng_poll_event events; /* @@ -1262,7 +1410,9 @@ static void *thread_registration_apps(void *data) /* Inifinite blocking call, waiting for transmission */ restart: + health_poll_update(&health_thread_app_reg); ret = lttng_poll_wait(&events, -1); + health_poll_update(&health_thread_app_reg); if (ret < 0) { /* * Restart interrupted system call. @@ -1274,6 +1424,8 @@ static void *thread_registration_apps(void *data) } for (i = 0; i < nb_fd; i++) { + health_code_update(&health_thread_app_reg); + /* Fetch once the poll data */ revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); @@ -1281,7 +1433,8 @@ static void *thread_registration_apps(void *data) /* Thread quit pipe has been closed. Killing thread. */ ret = check_thread_quit_pipe(pollfd, revents); if (ret) { - goto error; + err = 0; + goto exit; } /* Event on the registration socket */ @@ -1317,6 +1470,7 @@ static void *thread_registration_apps(void *data) sock = -1; continue; } + health_code_update(&health_thread_app_reg); ret = lttcomm_recv_unix_sock(sock, &ust_cmd->reg_msg, sizeof(struct ust_register_msg)); if (ret < 0 || ret < sizeof(struct ust_register_msg)) { @@ -1334,6 +1488,7 @@ static void *thread_registration_apps(void *data) sock = -1; continue; } + health_code_update(&health_thread_app_reg); ust_cmd->sock = sock; sock = -1; @@ -1361,7 +1516,14 @@ static void *thread_registration_apps(void *data) } } +exit: error: + if (err) { + health_error(&health_thread_app_reg); + ERR("Health error occurred in %s", __func__); + } + health_exit(&health_thread_app_reg); + /* Notify that the registration thread is gone */ notify_ust_apps(0); @@ -1467,7 +1629,8 @@ static int join_consumer_thread(struct consumer_data *consumer_data) void *status; int ret; - if (consumer_data->pid != 0) { + /* Consumer pid must be a real one. */ + if (consumer_data->pid > 0) { ret = kill(consumer_data->pid, SIGTERM); if (ret) { ERR("Error killing consumer daemon"); @@ -1680,6 +1843,23 @@ error: return ret; } +/* + * Compute health status of each consumer. If one of them is zero (bad + * state), we return 0. + */ +static int check_consumer_health(void) +{ + int ret; + + ret = health_check_state(&kconsumer_data.health) && + health_check_state(&ustconsumer32_data.health) && + health_check_state(&ustconsumer64_data.health); + + DBG3("Health consumer check %d", ret); + + return ret; +} + /* * Check version of the lttng-modules. */ @@ -1757,21 +1937,24 @@ error: static int init_kernel_tracing(struct ltt_kernel_session *session) { int ret = 0; + struct lttng_ht_iter iter; + struct consumer_socket *socket; - if (session->consumer_fds_sent == 0 && session->consumer != NULL) { - /* - * Assign default kernel consumer socket if no consumer assigned to the - * kernel session. At this point, it's NOT supposed to be -1 but this is - * an extra security check. - */ - if (session->consumer_fd < 0) { - session->consumer_fd = kconsumer_data.cmd_sock; - } + assert(session); - ret = kernel_consumer_send_session(session->consumer_fd, session); - if (ret < 0) { - ret = LTTCOMM_KERN_CONSUMER_FAIL; - goto error; + if (session->consumer_fds_sent == 0 && session->consumer != NULL) { + cds_lfht_for_each_entry(session->consumer->socks->ht, &iter.iter, + socket, node.node) { + /* Code flow error */ + assert(socket->fd >= 0); + + pthread_mutex_lock(socket->lock); + ret = kernel_consumer_send_session(socket->fd, session); + pthread_mutex_unlock(socket->lock); + if (ret < 0) { + ret = LTTCOMM_KERN_CONSUMER_FAIL; + goto error; + } } } @@ -1880,25 +2063,12 @@ static int send_socket_relayd_consumer(int domain, struct ltt_session *session, session->net_handle = 1; } - switch (domain) { - case LTTNG_DOMAIN_KERNEL: - /* Send relayd socket to consumer. */ - ret = kernel_consumer_send_relayd_socket(consumer_fd, sock, - consumer, relayd_uri->stype); - if (ret < 0) { - ret = LTTCOMM_ENABLE_CONSUMER_FAIL; - goto close_sock; - } - break; - case LTTNG_DOMAIN_UST: - /* Send relayd socket to consumer. */ - ret = ust_consumer_send_relayd_socket(consumer_fd, sock, - consumer, relayd_uri->stype); - if (ret < 0) { - ret = LTTCOMM_ENABLE_CONSUMER_FAIL; - goto close_sock; - } - break; + /* Send relayd socket to consumer. */ + ret = consumer_send_relayd_socket(consumer_fd, sock, + consumer, relayd_uri->stype); + if (ret < 0) { + ret = LTTCOMM_ENABLE_CONSUMER_FAIL; + goto close_sock; } ret = LTTCOMM_OK; @@ -1927,6 +2097,15 @@ static int send_sockets_relayd_consumer(int domain, { int ret; + assert(session); + assert(consumer); + + /* Don't resend the sockets to the consumer. */ + if (consumer->dst.net.relayd_socks_sent) { + ret = LTTCOMM_OK; + goto error; + } + /* Sending control relayd socket. */ ret = send_socket_relayd_consumer(domain, session, &consumer->dst.net.control, consumer, fd); @@ -1941,6 +2120,9 @@ static int send_sockets_relayd_consumer(int domain, goto error; } + /* Flag that all relayd sockets were sent to the consumer. */ + consumer->dst.net.relayd_socks_sent = 1; + error: return ret; } @@ -1955,6 +2137,8 @@ static int setup_relayd(struct ltt_session *session) int ret = LTTCOMM_OK; struct ltt_ust_session *usess; struct ltt_kernel_session *ksess; + struct consumer_socket *socket; + struct lttng_ht_iter iter; assert(session); @@ -1963,40 +2147,93 @@ static int setup_relayd(struct ltt_session *session) DBG2("Setting relayd for session %s", session->name); - if (usess && usess->consumer->sock == -1 && - usess->consumer->type == CONSUMER_DST_NET && - usess->consumer->enabled) { - /* Setup relayd for 64 bits consumer */ - if (ust_consumerd64_fd >= 0) { + if (usess && usess->consumer && usess->consumer->type == CONSUMER_DST_NET + && usess->consumer->enabled) { + /* For each consumer socket, send relayd sockets */ + cds_lfht_for_each_entry(usess->consumer->socks->ht, &iter.iter, + socket, node.node) { + /* Code flow error */ + assert(socket->fd >= 0); + + pthread_mutex_lock(socket->lock); send_sockets_relayd_consumer(LTTNG_DOMAIN_UST, session, - usess->consumer, ust_consumerd64_fd); + usess->consumer, socket->fd); + pthread_mutex_unlock(socket->lock); if (ret != LTTCOMM_OK) { goto error; } } + } - /* Setup relayd for 32 bits consumer */ - if (ust_consumerd32_fd >= 0) { - send_sockets_relayd_consumer(LTTNG_DOMAIN_UST, session, - usess->consumer, ust_consumerd32_fd); + if (ksess && ksess->consumer && ksess->consumer->type == CONSUMER_DST_NET + && ksess->consumer->enabled) { + cds_lfht_for_each_entry(ksess->consumer->socks->ht, &iter.iter, + socket, node.node) { + /* Code flow error */ + assert(socket->fd >= 0); + + pthread_mutex_lock(socket->lock); + send_sockets_relayd_consumer(LTTNG_DOMAIN_KERNEL, session, + ksess->consumer, socket->fd); + pthread_mutex_unlock(socket->lock); if (ret != LTTCOMM_OK) { goto error; } } - } else if (ksess && ksess->consumer->sock == -1 && - ksess->consumer->type == CONSUMER_DST_NET && - ksess->consumer->enabled) { - send_sockets_relayd_consumer(LTTNG_DOMAIN_KERNEL, session, - ksess->consumer, ksess->consumer_fd); - if (ret != LTTCOMM_OK) { - goto error; - } } error: return ret; } +/* + * Set consumer subdirectory using the session name and a generated datetime if + * needed. This is appended to the current subdirectory. + */ +static int set_consumer_subdir(struct consumer_output *consumer, + const char *session_name) +{ + int ret = 0; + unsigned int have_default_name = 0; + char datetime[16], tmp_path[PATH_MAX]; + time_t rawtime; + struct tm *timeinfo; + + assert(consumer); + assert(session_name); + + memset(tmp_path, 0, sizeof(tmp_path)); + + /* Flag if we have a default session. */ + if (strncmp(session_name, DEFAULT_SESSION_NAME "-", + strlen(DEFAULT_SESSION_NAME) + 1) == 0) { + have_default_name = 1; + } else { + /* Get date and time for session path */ + time(&rawtime); + timeinfo = localtime(&rawtime); + strftime(datetime, sizeof(datetime), "%Y%m%d-%H%M%S", timeinfo); + } + + if (have_default_name) { + ret = snprintf(tmp_path, sizeof(tmp_path), + "%s/%s", consumer->subdir, session_name); + } else { + ret = snprintf(tmp_path, sizeof(tmp_path), + "%s/%s-%s/", consumer->subdir, session_name, datetime); + } + if (ret < 0) { + PERROR("snprintf session name date"); + goto error; + } + + strncpy(consumer->subdir, tmp_path, sizeof(consumer->subdir)); + DBG2("Consumer subdir set to %s", consumer->subdir); + +error: + return ret; +} + /* * Copy consumer output from the tracing session to the domain session. The * function also applies the right modification on a per domain basis for the @@ -2008,6 +2245,9 @@ static int copy_session_consumer(int domain, struct ltt_session *session) const char *dir_name; struct consumer_output *consumer; + assert(session); + assert(session->consumer); + switch (domain) { case LTTNG_DOMAIN_KERNEL: DBG3("Copying tracing session consumer output in kernel session"); @@ -2030,16 +2270,16 @@ static int copy_session_consumer(int domain, struct ltt_session *session) goto error; } + ret = set_consumer_subdir(session->consumer, session->name); + if (ret < 0) { + ret = LTTCOMM_FATAL; + goto error; + } + /* Append correct directory to subdir */ strncat(consumer->subdir, dir_name, sizeof(consumer->subdir)); DBG3("Copy session consumer subdir %s", consumer->subdir); - /* Add default trace directory name */ - if (consumer->type == CONSUMER_DST_LOCAL) { - strncat(consumer->dst.trace_path, dir_name, - sizeof(consumer->dst.trace_path)); - } - ret = LTTCOMM_OK; error: @@ -2056,6 +2296,7 @@ static int create_ust_session(struct ltt_session *session, struct ltt_ust_session *lus = NULL; assert(session); + assert(domain); assert(session->consumer); switch (domain->type) { @@ -2075,18 +2316,6 @@ static int create_ust_session(struct ltt_session *session, goto error; } - if (session->consumer->type == CONSUMER_DST_LOCAL) { - ret = run_as_mkdir_recursive(lus->pathname, S_IRWXU | S_IRWXG, - session->uid, session->gid); - if (ret < 0) { - if (ret != -EEXIST) { - ERR("Trace directory creation error"); - ret = LTTCOMM_UST_SESS_FAIL; - goto error; - } - } - } - lus->uid = session->uid; lus->gid = session->gid; session->ust_session = lus; @@ -2120,10 +2349,8 @@ static int create_kernel_session(struct ltt_session *session) goto error; } - /* Set kernel consumer socket fd */ - if (kconsumer_data.cmd_sock >= 0) { - session->kernel_session->consumer_fd = kconsumer_data.cmd_sock; - } + /* Code flow safety */ + assert(session->kernel_session); /* Copy session output to the newly created Kernel session */ ret = copy_session_consumer(LTTNG_DOMAIN_KERNEL, session); @@ -2132,7 +2359,8 @@ static int create_kernel_session(struct ltt_session *session) } /* Create directory(ies) on local filesystem. */ - if (session->consumer->type == CONSUMER_DST_LOCAL) { + if (session->kernel_session->consumer->type == CONSUMER_DST_LOCAL && + strlen(session->kernel_session->consumer->dst.trace_path) > 0) { ret = run_as_mkdir_recursive( session->kernel_session->consumer->dst.trace_path, S_IRWXU | S_IRWXG, session->uid, session->gid); @@ -2349,6 +2577,9 @@ static int list_lttng_ust_global_events(char *channel_name, tmp[i].loglevel_type = LTTNG_EVENT_LOGLEVEL_SINGLE; break; } + if (uevent->filter) { + tmp[i].filter = 1; + } i++; } @@ -2431,6 +2662,85 @@ error: return ret; } + +/* + * Add URI so the consumer output object. Set the correct path depending on the + * domain adding the default trace directory. + */ +static int add_uri_to_consumer(struct consumer_output *consumer, + struct lttng_uri *uri, int domain, const char *session_name) +{ + int ret = LTTCOMM_OK; + const char *default_trace_dir; + + assert(uri); + + if (consumer == NULL) { + DBG("No consumer detected. Don't add URI. Stopping."); + ret = LTTCOMM_NO_CONSUMER; + goto error; + } + + switch (domain) { + case LTTNG_DOMAIN_KERNEL: + default_trace_dir = DEFAULT_KERNEL_TRACE_DIR; + break; + case LTTNG_DOMAIN_UST: + default_trace_dir = DEFAULT_UST_TRACE_DIR; + break; + default: + /* + * This case is possible is we try to add the URI to the global tracing + * session consumer object which in this case there is no subdir. + */ + default_trace_dir = ""; + } + + switch (uri->dtype) { + case LTTNG_DST_IPV4: + case LTTNG_DST_IPV6: + DBG2("Setting network URI to consumer"); + + /* Set URI into consumer output object */ + ret = consumer_set_network_uri(consumer, uri); + if (ret < 0) { + ret = LTTCOMM_FATAL; + goto error; + } + + if (uri->stype == LTTNG_STREAM_CONTROL && strlen(uri->subdir) == 0) { + ret = set_consumer_subdir(consumer, session_name); + if (ret < 0) { + ret = LTTCOMM_FATAL; + goto error; + } + } + + if (uri->stype == LTTNG_STREAM_CONTROL) { + /* On a new subdir, reappend the default trace dir. */ + strncat(consumer->subdir, default_trace_dir, sizeof(consumer->subdir)); + DBG3("Append domain trace name to subdir %s", consumer->subdir); + } + + break; + case LTTNG_DST_PATH: + DBG2("Setting trace directory path from URI to %s", uri->dst.path); + memset(consumer->dst.trace_path, 0, + sizeof(consumer->dst.trace_path)); + strncpy(consumer->dst.trace_path, uri->dst.path, + sizeof(consumer->dst.trace_path)); + /* Append default trace dir */ + strncat(consumer->dst.trace_path, default_trace_dir, + sizeof(consumer->dst.trace_path)); + /* Flag consumer as local. */ + consumer->type = CONSUMER_DST_LOCAL; + break; + } + +error: + return ret; +} + /* * Command LTTNG_DISABLE_CHANNEL processed by the client thread. */ @@ -2732,38 +3042,78 @@ error: } /* - * Command LTTNG_ENABLE_EVENT processed by the client thread. + * Command LTTNG_SET_FILTER processed by the client thread. */ -static int cmd_enable_event(struct ltt_session *session, int domain, - char *channel_name, struct lttng_event *event) +static int cmd_set_filter(struct ltt_session *session, int domain, + char *channel_name, char *event_name, + struct lttng_filter_bytecode *bytecode) { int ret; - struct lttng_channel *attr; - struct ltt_ust_session *usess = session->ust_session; switch (domain) { case LTTNG_DOMAIN_KERNEL: + ret = LTTCOMM_FATAL; + break; + case LTTNG_DOMAIN_UST: { - struct ltt_kernel_channel *kchan; - - kchan = trace_kernel_get_channel_by_name(channel_name, - session->kernel_session); - if (kchan == NULL) { - attr = channel_new_default_attr(domain); - if (attr == NULL) { - ret = LTTCOMM_FATAL; - goto error; - } - snprintf(attr->name, NAME_MAX, "%s", channel_name); + struct ltt_ust_session *usess = session->ust_session; - /* This call will notify the kernel thread */ - ret = channel_kernel_create(session->kernel_session, - attr, kernel_poll_pipe[1]); - if (ret != LTTCOMM_OK) { - free(attr); - goto error; - } - free(attr); + ret = filter_ust_set(usess, domain, bytecode, event_name, channel_name); + if (ret != LTTCOMM_OK) { + goto error; + } + break; + } +#if 0 + case LTTNG_DOMAIN_UST_EXEC_NAME: + case LTTNG_DOMAIN_UST_PID: + case LTTNG_DOMAIN_UST_PID_FOLLOW_CHILDREN: +#endif + default: + ret = LTTCOMM_UND; + goto error; + } + + ret = LTTCOMM_OK; + +error: + return ret; + +} + +/* + * Command LTTNG_ENABLE_EVENT processed by the client thread. + */ +static int cmd_enable_event(struct ltt_session *session, int domain, + char *channel_name, struct lttng_event *event) +{ + int ret; + struct lttng_channel *attr; + struct ltt_ust_session *usess = session->ust_session; + + switch (domain) { + case LTTNG_DOMAIN_KERNEL: + { + struct ltt_kernel_channel *kchan; + + kchan = trace_kernel_get_channel_by_name(channel_name, + session->kernel_session); + if (kchan == NULL) { + attr = channel_new_default_attr(domain); + if (attr == NULL) { + ret = LTTCOMM_FATAL; + goto error; + } + snprintf(attr->name, NAME_MAX, "%s", channel_name); + + /* This call will notify the kernel thread */ + ret = channel_kernel_create(session->kernel_session, + attr, kernel_poll_pipe[1]); + if (ret != LTTCOMM_OK) { + free(attr); + goto error; + } + free(attr); } /* Get the newly created kernel channel pointer */ @@ -3076,8 +3426,7 @@ static int cmd_start_trace(struct ltt_session *session) if (ksession != NULL) { /* Open kernel metadata */ if (ksession->metadata == NULL) { - ret = kernel_open_metadata(ksession, - ksession->consumer->dst.trace_path); + ret = kernel_open_metadata(ksession); if (ret < 0) { ret = LTTCOMM_KERN_META_FAIL; goto error; @@ -3209,110 +3558,207 @@ error: } /* - * Command LTTNG_CREATE_SESSION_URI processed by the client thread. + * Command LTTNG_SET_CONSUMER_URI processed by the client thread. */ -static int cmd_create_session_uri(char *name, struct lttng_uri *ctrl_uri, - struct lttng_uri *data_uri, unsigned int enable_consumer, - lttng_sock_cred *creds) +static int cmd_set_consumer_uri(int domain, struct ltt_session *session, + size_t nb_uri, struct lttng_uri *uris) { - int ret; - char *path = NULL; - struct ltt_session *session; - struct consumer_output *consumer; + int ret, i; + struct ltt_kernel_session *ksess = session->kernel_session; + struct ltt_ust_session *usess = session->ust_session; + struct consumer_output *consumer = NULL; - /* Verify if the session already exist */ - session = session_find_by_name(name); - if (session != NULL) { - ret = LTTCOMM_EXIST_SESS; + assert(session); + assert(uris); + assert(nb_uri > 0); + + /* Can't enable consumer after session started. */ + if (session->enabled) { + ret = LTTCOMM_TRACE_ALREADY_STARTED; goto error; } - /* TODO: validate URIs */ - - /* Create default consumer output */ - consumer = consumer_create_output(CONSUMER_DST_LOCAL); - if (consumer == NULL) { - ret = LTTCOMM_FATAL; + if (!session->start_consumer) { + ret = LTTCOMM_NO_CONSUMER; goto error; } - strncpy(consumer->subdir, ctrl_uri->subdir, sizeof(consumer->subdir)); - DBG2("Consumer subdir set to %s", consumer->subdir); - switch (ctrl_uri->dtype) { - case LTTNG_DST_IPV4: - case LTTNG_DST_IPV6: - /* Set control URI into consumer output object */ - ret = consumer_set_network_uri(consumer, ctrl_uri); - if (ret < 0) { - ret = LTTCOMM_FATAL; - goto error; - } + /* + * This case switch makes sure the domain session has a temporary consumer + * so the URL can be set. + */ + switch (domain) { + case 0: + /* Code flow error. A session MUST always have a consumer object */ + assert(session->consumer); + /* + * The URL will be added to the tracing session consumer instead of a + * specific domain consumer. + */ + consumer = session->consumer; + break; + case LTTNG_DOMAIN_KERNEL: + /* Code flow error if we don't have a kernel session here. */ + assert(ksess); - /* Set data URI into consumer output object */ - ret = consumer_set_network_uri(consumer, data_uri); - if (ret < 0) { - ret = LTTCOMM_FATAL; - goto error; + /* Create consumer output if none exists */ + consumer = ksess->tmp_consumer; + if (consumer == NULL) { + consumer = consumer_copy_output(ksess->consumer); + if (consumer == NULL) { + ret = LTTCOMM_FATAL; + goto error; + } + /* Trash the consumer subdir, we are about to set a new one. */ + memset(consumer->subdir, 0, sizeof(consumer->subdir)); + ksess->tmp_consumer = consumer; } - /* Empty path since the session is network */ - path = ""; break; - case LTTNG_DST_PATH: - /* Very volatile pointer. Only used for the create session. */ - path = ctrl_uri->dst.path; - strncpy(consumer->dst.trace_path, path, - sizeof(consumer->dst.trace_path)); + case LTTNG_DOMAIN_UST: + /* Code flow error if we don't have a kernel session here. */ + assert(usess); + + /* Create consumer output if none exists */ + consumer = usess->tmp_consumer; + if (consumer == NULL) { + consumer = consumer_copy_output(usess->consumer); + if (consumer == NULL) { + ret = LTTCOMM_FATAL; + goto error; + } + /* Trash the consumer subdir, we are about to set a new one. */ + memset(consumer->subdir, 0, sizeof(consumer->subdir)); + usess->tmp_consumer = consumer; + } + break; } - /* Set if the consumer is enabled or not */ - consumer->enabled = enable_consumer; + for (i = 0; i < nb_uri; i++) { + struct consumer_socket *socket; + struct lttng_ht_iter iter; - ret = session_create(name, path, LTTNG_SOCK_GET_UID_CRED(creds), - LTTNG_SOCK_GET_GID_CRED(creds)); - if (ret != LTTCOMM_OK) { - goto consumer_error; - } + ret = add_uri_to_consumer(consumer, &uris[i], domain, session->name); + if (ret < 0) { + goto error; + } - /* Get the newly created session pointer back */ - session = session_find_by_name(name); - assert(session); + /* + * Don't send relayd socket if URI is NOT remote or if the relayd + * sockets for the session are already sent. + */ + if (uris[i].dtype == LTTNG_DST_PATH || + consumer->dst.net.relayd_socks_sent) { + continue; + } - /* Assign consumer to session */ - session->consumer = consumer; + /* Try to send relayd URI to the consumer if exist. */ + cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, + socket, node.node) { - return LTTCOMM_OK; + /* A socket in the HT should never have a negative fd */ + assert(socket->fd >= 0); + + pthread_mutex_lock(socket->lock); + ret = send_socket_relayd_consumer(domain, session, &uris[i], + consumer, socket->fd); + pthread_mutex_unlock(socket->lock); + if (ret != LTTCOMM_OK) { + goto error; + } + } + } + + /* All good! */ + ret = LTTCOMM_OK; -consumer_error: - consumer_destroy_output(consumer); error: return ret; } + /* * Command LTTNG_CREATE_SESSION processed by the client thread. */ -static int cmd_create_session(char *name, char *path, lttng_sock_cred *creds) +static int cmd_create_session_uri(char *name, struct lttng_uri *uris, + size_t nb_uri, lttng_sock_cred *creds) { int ret; - struct lttng_uri uri; + char *path = NULL; + struct ltt_session *session; + //struct consumer_output *consumer = NULL; + //struct lttng_uri *ctrl_uri, *data_uri = NULL; + + assert(name); + + /* + * Verify if the session already exist + * + * XXX: There is no need for the session lock list here since the caller + * (process_client_msg) is holding it. We might want to change that so a + * single command does not lock the entire session list. + */ + session = session_find_by_name(name); + if (session != NULL) { + ret = LTTCOMM_EXIST_SESS; + goto find_error; + } + + /* Create tracing session in the registry */ + ret = session_create(name, path, LTTNG_SOCK_GET_UID_CRED(creds), + LTTNG_SOCK_GET_GID_CRED(creds)); + if (ret != LTTCOMM_OK) { + goto session_error; + } + + /* + * Get the newly created session pointer back + * + * XXX: There is no need for the session lock list here since the caller + * (process_client_msg) is holding it. We might want to change that so a + * single command does not lock the entire session list. + */ + session = session_find_by_name(name); + assert(session); - /* Zeroed temporary URI */ - memset(&uri, 0, sizeof(uri)); + /* Create default consumer output for the session not yet created. */ + session->consumer = consumer_create_output(CONSUMER_DST_LOCAL); + if (session->consumer == NULL) { + ret = LTTCOMM_FATAL; + goto consumer_error; + } - uri.dtype = LTTNG_DST_PATH; - uri.utype = LTTNG_URI_DST; - strncpy(uri.dst.path, path, sizeof(uri.dst.path)); + /* + * This means that the lttng_create_session call was called with the _path_ + * argument set to NULL. + */ + if (uris == NULL) { + /* + * At this point, we'll skip the consumer URI setup and create a + * session with a NULL path which will flag the session to NOT spawn a + * consumer. + */ + DBG("Create session %s with NO uri, skipping consumer setup", name); + goto end; + } - /* TODO: Strip date-time from path and put it in uri's subdir */ + session->start_consumer = 1; - ret = cmd_create_session_uri(name, &uri, NULL, 1, creds); + ret = cmd_set_consumer_uri(0, session, nb_uri, uris); if (ret != LTTCOMM_OK) { - goto error; + goto consumer_error; } -error: + session->consumer->enabled = 1; + +end: + return LTTCOMM_OK; + +consumer_error: + session_destroy(session); +session_error: +find_error: return ret; } @@ -3323,6 +3769,9 @@ static int cmd_destroy_session(struct ltt_session *session, char *name) { int ret; + /* Safety net */ + assert(session); + /* Clean kernel session teardown */ teardown_kernel_session(session); /* UST session teardown */ @@ -3392,6 +3841,7 @@ static int cmd_register_consumer(struct ltt_session *session, int domain, char *sock_path) { int ret, sock; + struct consumer_socket *socket; switch (domain) { case LTTNG_DOMAIN_KERNEL: @@ -3407,7 +3857,29 @@ static int cmd_register_consumer(struct ltt_session *session, int domain, goto error; } - session->kernel_session->consumer_fd = sock; + socket = consumer_allocate_socket(sock); + if (socket == NULL) { + ret = LTTCOMM_FATAL; + close(sock); + goto error; + } + + socket->lock = zmalloc(sizeof(pthread_mutex_t)); + if (socket->lock == NULL) { + PERROR("zmalloc pthread mutex"); + ret = LTTCOMM_FATAL; + goto error; + } + pthread_mutex_init(socket->lock, NULL); + + rcu_read_lock(); + consumer_add_socket(socket, session->kernel_session->consumer); + rcu_read_unlock(); + + pthread_mutex_lock(&kconsumer_data.pid_mutex); + kconsumer_data.pid = -1; + pthread_mutex_unlock(&kconsumer_data.pid_mutex); + break; default: /* TODO: Userspace tracing */ @@ -3544,152 +4016,6 @@ error: return ret; } -/* - * Command LTTNG_SET_CONSUMER_URI processed by the client thread. - */ -static int cmd_set_consumer_uri(int domain, struct ltt_session *session, - struct lttng_uri *uri) -{ - int ret; - struct ltt_kernel_session *ksess = session->kernel_session; - struct ltt_ust_session *usess = session->ust_session; - struct consumer_output *consumer; - - /* Can't enable consumer after session started. */ - if (session->enabled) { - ret = LTTCOMM_TRACE_ALREADY_STARTED; - goto error; - } - - switch (domain) { - case LTTNG_DOMAIN_KERNEL: - /* Code flow error if we don't have a kernel session here. */ - assert(ksess); - - /* Create consumer output if none exists */ - consumer = ksess->tmp_consumer; - if (consumer == NULL) { - consumer = consumer_copy_output(ksess->consumer); - if (consumer == NULL) { - ret = LTTCOMM_FATAL; - goto error; - } - /* Reassign new pointer */ - ksess->tmp_consumer = consumer; - } - - switch (uri->dtype) { - case LTTNG_DST_IPV4: - case LTTNG_DST_IPV6: - DBG2("Setting network URI for kernel session %s", session->name); - - /* Set URI into consumer output object */ - ret = consumer_set_network_uri(consumer, uri); - if (ret < 0) { - ret = LTTCOMM_FATAL; - goto error; - } - - /* On a new subdir, reappend the default trace dir. */ - if (strlen(uri->subdir) != 0) { - strncat(consumer->subdir, DEFAULT_KERNEL_TRACE_DIR, - sizeof(consumer->subdir)); - } - - ret = send_socket_relayd_consumer(domain, session, uri, consumer, - ksess->consumer_fd); - if (ret != LTTCOMM_OK) { - goto error; - } - break; - case LTTNG_DST_PATH: - DBG2("Setting trace directory path from URI to %s", uri->dst.path); - memset(consumer->dst.trace_path, 0, - sizeof(consumer->dst.trace_path)); - strncpy(consumer->dst.trace_path, uri->dst.path, - sizeof(consumer->dst.trace_path)); - /* Append default kernel trace dir */ - strncat(consumer->dst.trace_path, DEFAULT_KERNEL_TRACE_DIR, - sizeof(consumer->dst.trace_path)); - break; - } - - /* All good! */ - break; - case LTTNG_DOMAIN_UST: - /* Code flow error if we don't have a kernel session here. */ - assert(usess); - - /* Create consumer output if none exists */ - consumer = usess->tmp_consumer; - if (consumer == NULL) { - consumer = consumer_copy_output(usess->consumer); - if (consumer == NULL) { - ret = LTTCOMM_FATAL; - goto error; - } - /* Reassign new pointer */ - usess->tmp_consumer = consumer; - } - - switch (uri->dtype) { - case LTTNG_DST_IPV4: - case LTTNG_DST_IPV6: - { - DBG2("Setting network URI for UST session %s", session->name); - - /* Set URI into consumer object */ - ret = consumer_set_network_uri(consumer, uri); - if (ret < 0) { - ret = LTTCOMM_FATAL; - goto error; - } - - /* On a new subdir, reappend the default trace dir. */ - if (strlen(uri->subdir) != 0) { - strncat(consumer->subdir, DEFAULT_UST_TRACE_DIR, - sizeof(consumer->subdir)); - } - - if (ust_consumerd64_fd >= 0) { - ret = send_socket_relayd_consumer(domain, session, uri, - consumer, ust_consumerd64_fd); - if (ret != LTTCOMM_OK) { - goto error; - } - } - - if (ust_consumerd32_fd >= 0) { - ret = send_socket_relayd_consumer(domain, session, uri, - consumer, ust_consumerd32_fd); - if (ret != LTTCOMM_OK) { - goto error; - } - } - - break; - } - case LTTNG_DST_PATH: - DBG2("Setting trace directory path from URI to %s", uri->dst.path); - memset(consumer->dst.trace_path, 0, - sizeof(consumer->dst.trace_path)); - strncpy(consumer->dst.trace_path, uri->dst.path, - sizeof(consumer->dst.trace_path)); - /* Append default UST trace dir */ - strncat(consumer->dst.trace_path, DEFAULT_UST_TRACE_DIR, - sizeof(consumer->dst.trace_path)); - break; - } - break; - } - - /* All good! */ - ret = LTTCOMM_OK; - -error: - return ret; -} - /* * Command LTTNG_DISABLE_CONSUMER processed by the client thread. */ @@ -3700,13 +4026,24 @@ static int cmd_disable_consumer(int domain, struct ltt_session *session) struct ltt_ust_session *usess = session->ust_session; struct consumer_output *consumer; + assert(session); + if (session->enabled) { /* Can't disable consumer on an already started session */ ret = LTTCOMM_TRACE_ALREADY_STARTED; goto error; } + if (!session->start_consumer) { + ret = LTTCOMM_NO_CONSUMER; + goto error; + } + switch (domain) { + case 0: + DBG("Disable tracing session %s consumer", session->name); + consumer = session->consumer; + break; case LTTNG_DOMAIN_KERNEL: /* Code flow error if we don't have a kernel session here. */ assert(ksess); @@ -3728,11 +4065,13 @@ static int cmd_disable_consumer(int domain, struct ltt_session *session) goto error; } - assert(consumer); - consumer->enabled = 0; - - /* Success at this point */ - ret = LTTCOMM_OK; + if (consumer) { + consumer->enabled = 0; + /* Success at this point */ + ret = LTTCOMM_OK; + } else { + ret = LTTCOMM_NO_CONSUMER; + } error: return ret; @@ -3746,7 +4085,9 @@ static int cmd_enable_consumer(int domain, struct ltt_session *session) int ret; struct ltt_kernel_session *ksess = session->kernel_session; struct ltt_ust_session *usess = session->ust_session; - struct consumer_output *tmp_out; + struct consumer_output *consumer = NULL; + + assert(session); /* Can't enable consumer after session started. */ if (session->enabled) { @@ -3754,7 +4095,16 @@ static int cmd_enable_consumer(int domain, struct ltt_session *session) goto error; } + if (!session->start_consumer) { + ret = LTTCOMM_NO_CONSUMER; + goto error; + } + switch (domain) { + case 0: + assert(session->consumer); + consumer = session->consumer; + break; case LTTNG_DOMAIN_KERNEL: /* Code flow error if we don't have a kernel session here. */ assert(ksess); @@ -3769,20 +4119,21 @@ static int cmd_enable_consumer(int domain, struct ltt_session *session) goto error; } - tmp_out = ksess->tmp_consumer; - if (tmp_out == NULL) { + consumer = ksess->tmp_consumer; + if (consumer == NULL) { + ret = LTTCOMM_OK; /* No temp. consumer output exists. Using the current one. */ DBG3("No temporary consumer. Using default"); - ret = LTTCOMM_OK; + consumer = ksess->consumer; goto error; } - switch (tmp_out->type) { + switch (consumer->type) { case CONSUMER_DST_LOCAL: DBG2("Consumer output is local. Creating directory(ies)"); /* Create directory(ies) */ - ret = run_as_mkdir_recursive(tmp_out->dst.trace_path, + ret = run_as_mkdir_recursive(consumer->dst.trace_path, S_IRWXU | S_IRWXG, session->uid, session->gid); if (ret < 0) { if (ret != -EEXIST) { @@ -3795,13 +4146,13 @@ static int cmd_enable_consumer(int domain, struct ltt_session *session) case CONSUMER_DST_NET: DBG2("Consumer output is network. Validating URIs"); /* Validate if we have both control and data path set. */ - if (!tmp_out->dst.net.control_isset) { - ret = LTTCOMM_URI_CTRL_MISS; + if (!consumer->dst.net.control_isset) { + ret = LTTCOMM_URL_CTRL_MISS; goto error; } - if (!tmp_out->dst.net.data_isset) { - ret = LTTCOMM_URI_DATA_MISS; + if (!consumer->dst.net.data_isset) { + ret = LTTCOMM_URL_DATA_MISS; goto error; } @@ -3812,13 +4163,13 @@ static int cmd_enable_consumer(int domain, struct ltt_session *session) goto error; } - /* Append default kernel trace dir to subdir */ - strncat(ksess->consumer->subdir, DEFAULT_KERNEL_TRACE_DIR, - sizeof(ksess->consumer->subdir)); - break; } + /* Append default kernel trace dir to subdir */ + strncat(ksess->consumer->subdir, DEFAULT_KERNEL_TRACE_DIR, + sizeof(ksess->consumer->subdir)); + /* * @session-lock * This is race free for now since the session lock is acquired before @@ -3826,8 +4177,10 @@ static int cmd_enable_consumer(int domain, struct ltt_session *session) * session without this lock hence freeing the consumer output object * is valid. */ + rcu_read_lock(); consumer_destroy_output(ksess->consumer); - ksess->consumer = tmp_out; + rcu_read_unlock(); + ksess->consumer = consumer; ksess->tmp_consumer = NULL; break; @@ -3845,20 +4198,21 @@ static int cmd_enable_consumer(int domain, struct ltt_session *session) goto error; } - tmp_out = usess->tmp_consumer; - if (tmp_out == NULL) { + consumer = usess->tmp_consumer; + if (consumer == NULL) { + ret = LTTCOMM_OK; /* No temp. consumer output exists. Using the current one. */ DBG3("No temporary consumer. Using default"); - ret = LTTCOMM_OK; + consumer = usess->consumer; goto error; } - switch (tmp_out->type) { + switch (consumer->type) { case CONSUMER_DST_LOCAL: DBG2("Consumer output is local. Creating directory(ies)"); /* Create directory(ies) */ - ret = run_as_mkdir_recursive(tmp_out->dst.trace_path, + ret = run_as_mkdir_recursive(consumer->dst.trace_path, S_IRWXU | S_IRWXG, session->uid, session->gid); if (ret < 0) { if (ret != -EEXIST) { @@ -3871,13 +4225,13 @@ static int cmd_enable_consumer(int domain, struct ltt_session *session) case CONSUMER_DST_NET: DBG2("Consumer output is network. Validating URIs"); /* Validate if we have both control and data path set. */ - if (!tmp_out->dst.net.control_isset) { - ret = LTTCOMM_URI_CTRL_MISS; + if (!consumer->dst.net.control_isset) { + ret = LTTCOMM_URL_CTRL_MISS; goto error; } - if (!tmp_out->dst.net.data_isset) { - ret = LTTCOMM_URI_DATA_MISS; + if (!consumer->dst.net.data_isset) { + ret = LTTCOMM_URL_DATA_MISS; goto error; } @@ -3888,19 +4242,19 @@ static int cmd_enable_consumer(int domain, struct ltt_session *session) goto error; } - if (tmp_out->net_seq_index == -1) { + if (consumer->net_seq_index == -1) { ret = LTTCOMM_ENABLE_CONSUMER_FAIL; DBG2("Network index is not set on the consumer"); goto error; } - /* Append default kernel trace dir to subdir */ - strncat(usess->consumer->subdir, DEFAULT_UST_TRACE_DIR, - sizeof(usess->consumer->subdir)); - break; } + /* Append default kernel trace dir to subdir */ + strncat(usess->consumer->subdir, DEFAULT_UST_TRACE_DIR, + sizeof(usess->consumer->subdir)); + /* * @session-lock * This is race free for now since the session lock is acquired before @@ -3908,15 +4262,24 @@ static int cmd_enable_consumer(int domain, struct ltt_session *session) * session without this lock hence freeing the consumer output object * is valid. */ + rcu_read_lock(); consumer_destroy_output(usess->consumer); - usess->consumer = tmp_out; + rcu_read_unlock(); + usess->consumer = consumer; usess->tmp_consumer = NULL; break; } - /* Success at this point */ - ret = LTTCOMM_OK; + /* Enable it */ + if (consumer) { + consumer->enabled = 1; + /* Success at this point */ + ret = LTTCOMM_OK; + } else { + /* Should not really happend... */ + ret = LTTCOMM_NO_CONSUMER; + } error: return ret; @@ -3928,8 +4291,11 @@ error: * is set and ready for transmission before returning. * * Return any error encountered or 0 for success. + * + * "sock" is only used for special-case var. len data. */ -static int process_client_msg(struct command_ctx *cmd_ctx) +static int process_client_msg(struct command_ctx *cmd_ctx, int sock, + int *sock_error) { int ret = LTTCOMM_OK; int need_tracing_session = 1; @@ -3937,9 +4303,10 @@ static int process_client_msg(struct command_ctx *cmd_ctx) DBG("Processing client command %d", cmd_ctx->lsm->cmd_type); + *sock_error = 0; + switch (cmd_ctx->lsm->cmd_type) { case LTTNG_CREATE_SESSION: - case LTTNG_CREATE_SESSION_URI: case LTTNG_DESTROY_SESSION: case LTTNG_LIST_SESSIONS: case LTTNG_LIST_DOMAINS: @@ -3961,6 +4328,16 @@ static int process_client_msg(struct command_ctx *cmd_ctx) goto error; } + /* Deny register consumer if we already have a spawned consumer. */ + if (cmd_ctx->lsm->cmd_type == LTTNG_REGISTER_CONSUMER) { + pthread_mutex_lock(&kconsumer_data.pid_mutex); + if (kconsumer_data.pid > 0) { + ret = LTTCOMM_KERN_CONSUMER_FAIL; + goto error; + } + pthread_mutex_unlock(&kconsumer_data.pid_mutex); + } + /* * Check for command that don't needs to allocate a returned payload. We do * this here so we don't have to make the call for no payload at each @@ -3986,7 +4363,6 @@ static int process_client_msg(struct command_ctx *cmd_ctx) /* Commands that DO NOT need a session. */ switch (cmd_ctx->lsm->cmd_type) { case LTTNG_CREATE_SESSION: - case LTTNG_CREATE_SESSION_URI: case LTTNG_CALIBRATE: case LTTNG_LIST_SESSIONS: case LTTNG_LIST_TRACEPOINTS: @@ -4020,6 +4396,7 @@ static int process_client_msg(struct command_ctx *cmd_ctx) if (!need_domain) { goto skip_domain; } + /* * Check domain type for specific "pre-action". */ @@ -4058,7 +4435,8 @@ static int process_client_msg(struct command_ctx *cmd_ctx) /* Start the kernel consumer daemon */ pthread_mutex_lock(&kconsumer_data.pid_mutex); if (kconsumer_data.pid == 0 && - cmd_ctx->lsm->cmd_type != LTTNG_REGISTER_CONSUMER) { + cmd_ctx->lsm->cmd_type != LTTNG_REGISTER_CONSUMER && + cmd_ctx->session->start_consumer) { pthread_mutex_unlock(&kconsumer_data.pid_mutex); ret = start_consumerd(&kconsumer_data); if (ret < 0) { @@ -4066,13 +4444,19 @@ static int process_client_msg(struct command_ctx *cmd_ctx) goto error; } uatomic_set(&kernel_consumerd_state, CONSUMER_STARTED); - - /* Set consumer fd of the session */ - cmd_ctx->session->kernel_session->consumer_fd = - kconsumer_data.cmd_sock; } else { pthread_mutex_unlock(&kconsumer_data.pid_mutex); } + + /* + * The consumer was just spawned so we need to add the socket to + * the consumer output of the session if exist. + */ + ret = consumer_create_socket(&kconsumer_data, + cmd_ctx->session->kernel_session->consumer); + if (ret < 0) { + goto error; + } } break; @@ -4085,6 +4469,7 @@ static int process_client_msg(struct command_ctx *cmd_ctx) } if (need_tracing_session) { + /* Create UST session if none exist. */ if (cmd_ctx->session->ust_session == NULL) { ret = create_ust_session(cmd_ctx->session, &cmd_ctx->lsm->domain); @@ -4098,37 +4483,60 @@ static int process_client_msg(struct command_ctx *cmd_ctx) pthread_mutex_lock(&ustconsumer64_data.pid_mutex); if (consumerd64_bin[0] != '\0' && ustconsumer64_data.pid == 0 && - cmd_ctx->lsm->cmd_type != LTTNG_REGISTER_CONSUMER) { + cmd_ctx->lsm->cmd_type != LTTNG_REGISTER_CONSUMER && + cmd_ctx->session->start_consumer) { pthread_mutex_unlock(&ustconsumer64_data.pid_mutex); ret = start_consumerd(&ustconsumer64_data); if (ret < 0) { ret = LTTCOMM_UST_CONSUMER64_FAIL; - ust_consumerd64_fd = -EINVAL; + uatomic_set(&ust_consumerd64_fd, -EINVAL); goto error; } - ust_consumerd64_fd = ustconsumer64_data.cmd_sock; + uatomic_set(&ust_consumerd64_fd, ustconsumer64_data.cmd_sock); uatomic_set(&ust_consumerd_state, CONSUMER_STARTED); } else { pthread_mutex_unlock(&ustconsumer64_data.pid_mutex); } + + /* + * Setup socket for consumer 64 bit. No need for atomic access + * since it was set above and can ONLY be set in this thread. + */ + ret = consumer_create_socket(&ustconsumer64_data, + cmd_ctx->session->ust_session->consumer); + if (ret < 0) { + goto error; + } + /* 32-bit */ if (consumerd32_bin[0] != '\0' && ustconsumer32_data.pid == 0 && - cmd_ctx->lsm->cmd_type != LTTNG_REGISTER_CONSUMER) { + cmd_ctx->lsm->cmd_type != LTTNG_REGISTER_CONSUMER && + cmd_ctx->session->start_consumer) { pthread_mutex_unlock(&ustconsumer32_data.pid_mutex); ret = start_consumerd(&ustconsumer32_data); if (ret < 0) { ret = LTTCOMM_UST_CONSUMER32_FAIL; - ust_consumerd32_fd = -EINVAL; + uatomic_set(&ust_consumerd32_fd, -EINVAL); goto error; } - ust_consumerd32_fd = ustconsumer32_data.cmd_sock; + uatomic_set(&ust_consumerd32_fd, ustconsumer32_data.cmd_sock); uatomic_set(&ust_consumerd_state, CONSUMER_STARTED); } else { pthread_mutex_unlock(&ustconsumer32_data.pid_mutex); } + + /* + * Setup socket for consumer 64 bit. No need for atomic access + * since it was set above and can ONLY be set in this thread. + */ + ret = consumer_create_socket(&ustconsumer32_data, + cmd_ctx->session->ust_session->consumer); + if (ret < 0) { + goto error; + } } break; } @@ -4213,7 +4621,26 @@ skip_domain: } case LTTNG_ENABLE_CONSUMER: { + /* + * XXX: 0 means that this URI should be applied on the session. Should + * be a DOMAIN enuam. + */ ret = cmd_enable_consumer(cmd_ctx->lsm->domain.type, cmd_ctx->session); + if (ret != LTTCOMM_OK) { + goto error; + } + + if (cmd_ctx->lsm->domain.type == 0) { + /* Add the URI for the UST session if a consumer is present. */ + if (cmd_ctx->session->ust_session && + cmd_ctx->session->ust_session->consumer) { + ret = cmd_enable_consumer(LTTNG_DOMAIN_UST, cmd_ctx->session); + } else if (cmd_ctx->session->kernel_session && + cmd_ctx->session->kernel_session->consumer) { + ret = cmd_enable_consumer(LTTNG_DOMAIN_KERNEL, + cmd_ctx->session); + } + } break; } case LTTNG_ENABLE_EVENT: @@ -4267,7 +4694,8 @@ skip_domain: struct lttng_event_field *fields; ssize_t nb_fields; - nb_fields = cmd_list_tracepoint_fields(cmd_ctx->lsm->domain.type, &fields); + nb_fields = cmd_list_tracepoint_fields(cmd_ctx->lsm->domain.type, + &fields); if (nb_fields < 0) { ret = -nb_fields; goto error; @@ -4277,7 +4705,8 @@ skip_domain: * Setup lttng message with payload size set to the event list size in * bytes and then copy list into the llm payload. */ - ret = setup_lttng_msg(cmd_ctx, sizeof(struct lttng_event_field) * nb_fields); + ret = setup_lttng_msg(cmd_ctx, + sizeof(struct lttng_event_field) * nb_fields); if (ret < 0) { free(fields); goto setup_error; @@ -4294,8 +4723,56 @@ skip_domain: } case LTTNG_SET_CONSUMER_URI: { + size_t nb_uri, len; + struct lttng_uri *uris; + + nb_uri = cmd_ctx->lsm->u.uri.size; + len = nb_uri * sizeof(struct lttng_uri); + + if (nb_uri == 0) { + ret = LTTCOMM_INVALID; + goto error; + } + + uris = zmalloc(len); + if (uris == NULL) { + ret = LTTCOMM_FATAL; + goto error; + } + + /* Receive variable len data */ + DBG("Receiving %zu URI(s) from client ...", nb_uri); + ret = lttcomm_recv_unix_sock(sock, uris, len); + if (ret <= 0) { + DBG("No URIs received from client... continuing"); + *sock_error = 1; + ret = LTTCOMM_SESSION_FAIL; + goto error; + } + ret = cmd_set_consumer_uri(cmd_ctx->lsm->domain.type, cmd_ctx->session, - &cmd_ctx->lsm->u.uri); + nb_uri, uris); + if (ret != LTTCOMM_OK) { + goto error; + } + + /* + * XXX: 0 means that this URI should be applied on the session. Should + * be a DOMAIN enuam. + */ + if (cmd_ctx->lsm->domain.type == 0) { + /* Add the URI for the UST session if a consumer is present. */ + if (cmd_ctx->session->ust_session && + cmd_ctx->session->ust_session->consumer) { + ret = cmd_set_consumer_uri(LTTNG_DOMAIN_UST, cmd_ctx->session, + nb_uri, uris); + } else if (cmd_ctx->session->kernel_session && + cmd_ctx->session->kernel_session->consumer) { + ret = cmd_set_consumer_uri(LTTNG_DOMAIN_KERNEL, + cmd_ctx->session, nb_uri, uris); + } + } + break; } case LTTNG_START_TRACE: @@ -4310,26 +4787,47 @@ skip_domain: } case LTTNG_CREATE_SESSION: { - ret = cmd_create_session(cmd_ctx->lsm->session.name, - cmd_ctx->lsm->session.path, &cmd_ctx->creds); - break; - } - case LTTNG_CREATE_SESSION_URI: - { - ret = cmd_create_session_uri(cmd_ctx->lsm->session.name, - &cmd_ctx->lsm->u.create_uri.ctrl_uri, - &cmd_ctx->lsm->u.create_uri.data_uri, - cmd_ctx->lsm->u.create_uri.enable_consumer, &cmd_ctx->creds); + size_t nb_uri, len; + struct lttng_uri *uris = NULL; + + nb_uri = cmd_ctx->lsm->u.uri.size; + len = nb_uri * sizeof(struct lttng_uri); + + if (nb_uri > 0) { + uris = zmalloc(len); + if (uris == NULL) { + ret = LTTCOMM_FATAL; + goto error; + } + + /* Receive variable len data */ + DBG("Waiting for %zu URIs from client ...", nb_uri); + ret = lttcomm_recv_unix_sock(sock, uris, len); + if (ret <= 0) { + DBG("No URIs received from client... continuing"); + *sock_error = 1; + ret = LTTCOMM_SESSION_FAIL; + goto error; + } + + if (nb_uri == 1 && uris[0].dtype != LTTNG_DST_PATH) { + DBG("Creating session with ONE network URI is a bad call"); + ret = LTTCOMM_SESSION_FAIL; + goto error; + } + } + + ret = cmd_create_session_uri(cmd_ctx->lsm->session.name, uris, nb_uri, + &cmd_ctx->creds); + break; } case LTTNG_DESTROY_SESSION: { ret = cmd_destroy_session(cmd_ctx->session, cmd_ctx->lsm->session.name); - /* - * Set session to NULL so we do not unlock it after - * free. - */ + + /* Set session to NULL so we do not unlock it after free. */ cmd_ctx->session = NULL; break; } @@ -4447,6 +4945,43 @@ skip_domain: cmd_ctx->lsm->u.reg.path); break; } + case LTTNG_SET_FILTER: + { + struct lttng_filter_bytecode *bytecode; + + if (cmd_ctx->lsm->u.filter.bytecode_len > 65336) { + ret = LTTCOMM_FILTER_INVAL; + goto error; + } + bytecode = zmalloc(cmd_ctx->lsm->u.filter.bytecode_len); + if (!bytecode) { + ret = LTTCOMM_FILTER_NOMEM; + goto error; + } + /* Receive var. len. data */ + DBG("Receiving var len data from client ..."); + ret = lttcomm_recv_unix_sock(sock, bytecode, + cmd_ctx->lsm->u.filter.bytecode_len); + if (ret <= 0) { + DBG("Nothing recv() from client var len data... continuing"); + *sock_error = 1; + ret = LTTCOMM_FILTER_INVAL; + goto error; + } + + if (bytecode->len + sizeof(*bytecode) + != cmd_ctx->lsm->u.filter.bytecode_len) { + free(bytecode); + ret = LTTCOMM_FILTER_INVAL; + goto error; + } + + ret = cmd_set_filter(cmd_ctx->session, cmd_ctx->lsm->domain.type, + cmd_ctx->lsm->u.filter.channel_name, + cmd_ctx->lsm->u.filter.event_name, + bytecode); + break; + } default: ret = LTTCOMM_UND; break; @@ -4472,13 +5007,195 @@ init_setup_error: return ret; } +/* + * Thread managing health check socket. + */ +static void *thread_manage_health(void *data) +{ + int sock = -1, new_sock = -1, ret, i, pollfd, err = -1; + uint32_t revents, nb_fd; + struct lttng_poll_event events; + struct lttcomm_health_msg msg; + struct lttcomm_health_data reply; + + DBG("[thread] Manage health check started"); + + rcu_register_thread(); + + /* Create unix socket */ + sock = lttcomm_create_unix_sock(health_unix_sock_path); + if (sock < 0) { + ERR("Unable to create health check Unix socket"); + ret = -1; + goto error; + } + + ret = lttcomm_listen_unix_sock(sock); + if (ret < 0) { + goto error; + } + + /* + * Pass 2 as size here for the thread quit pipe and client_sock. Nothing + * more will be added to this poll set. + */ + ret = create_thread_poll_set(&events, 2); + if (ret < 0) { + goto error; + } + + /* Add the application registration socket */ + ret = lttng_poll_add(&events, sock, LPOLLIN | LPOLLPRI); + if (ret < 0) { + goto error; + } + + while (1) { + DBG("Health check ready"); + + nb_fd = LTTNG_POLL_GETNB(&events); + + /* Inifinite blocking call, waiting for transmission */ +restart: + ret = lttng_poll_wait(&events, -1); + if (ret < 0) { + /* + * Restart interrupted system call. + */ + if (errno == EINTR) { + goto restart; + } + goto error; + } + + for (i = 0; i < nb_fd; i++) { + /* Fetch once the poll data */ + revents = LTTNG_POLL_GETEV(&events, i); + pollfd = LTTNG_POLL_GETFD(&events, i); + + /* Thread quit pipe has been closed. Killing thread. */ + ret = check_thread_quit_pipe(pollfd, revents); + if (ret) { + err = 0; + goto exit; + } + + /* Event on the registration socket */ + if (pollfd == sock) { + if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + ERR("Health socket poll error"); + goto error; + } + } + } + + new_sock = lttcomm_accept_unix_sock(sock); + if (new_sock < 0) { + goto error; + } + + DBG("Receiving data from client for health..."); + ret = lttcomm_recv_unix_sock(new_sock, (void *)&msg, sizeof(msg)); + if (ret <= 0) { + DBG("Nothing recv() from client... continuing"); + ret = close(new_sock); + if (ret) { + PERROR("close"); + } + new_sock = -1; + continue; + } + + rcu_thread_online(); + + switch (msg.component) { + case LTTNG_HEALTH_CMD: + reply.ret_code = health_check_state(&health_thread_cmd); + break; + case LTTNG_HEALTH_APP_MANAGE: + reply.ret_code = health_check_state(&health_thread_app_manage); + break; + case LTTNG_HEALTH_APP_REG: + reply.ret_code = health_check_state(&health_thread_app_reg); + break; + case LTTNG_HEALTH_KERNEL: + reply.ret_code = health_check_state(&health_thread_kernel); + break; + case LTTNG_HEALTH_CONSUMER: + reply.ret_code = check_consumer_health(); + break; + case LTTNG_HEALTH_ALL: + reply.ret_code = + health_check_state(&health_thread_app_manage) && + health_check_state(&health_thread_app_reg) && + health_check_state(&health_thread_cmd) && + health_check_state(&health_thread_kernel) && + check_consumer_health(); + break; + default: + reply.ret_code = LTTCOMM_UND; + break; + } + + /* + * Flip ret value since 0 is a success and 1 indicates a bad health for + * the client where in the sessiond it is the opposite. Again, this is + * just to make things easier for us poor developer which enjoy a lot + * lazyness. + */ + if (reply.ret_code == 0 || reply.ret_code == 1) { + reply.ret_code = !reply.ret_code; + } + + DBG2("Health check return value %d", reply.ret_code); + + ret = send_unix_sock(new_sock, (void *) &reply, sizeof(reply)); + if (ret < 0) { + ERR("Failed to send health data back to client"); + } + + /* End of transmission */ + ret = close(new_sock); + if (ret) { + PERROR("close"); + } + new_sock = -1; + } + +exit: +error: + if (err) { + ERR("Health error occurred in %s", __func__); + } + DBG("Health check thread dying"); + unlink(health_unix_sock_path); + if (sock >= 0) { + ret = close(sock); + if (ret) { + PERROR("close"); + } + } + if (new_sock >= 0) { + ret = close(new_sock); + if (ret) { + PERROR("close"); + } + } + + lttng_poll_clean(&events); + + rcu_unregister_thread(); + return NULL; +} + /* * This thread manage all clients request using the unix client socket for * communication. */ static void *thread_manage_clients(void *data) { - int sock = -1, ret, i, pollfd; + int sock = -1, ret, i, pollfd, err = -1; + int sock_error; uint32_t revents, nb_fd; struct command_ctx *cmd_ctx = NULL; struct lttng_poll_event events; @@ -4487,6 +5204,8 @@ static void *thread_manage_clients(void *data) rcu_register_thread(); + health_code_update(&health_thread_cmd); + ret = lttcomm_listen_unix_sock(client_sock); if (ret < 0) { goto error; @@ -4514,6 +5233,8 @@ static void *thread_manage_clients(void *data) kill(ppid, SIGUSR1); } + health_code_update(&health_thread_cmd); + while (1) { DBG("Accepting client command ..."); @@ -4521,7 +5242,9 @@ static void *thread_manage_clients(void *data) /* Inifinite blocking call, waiting for transmission */ restart: + health_poll_update(&health_thread_cmd); ret = lttng_poll_wait(&events, -1); + health_poll_update(&health_thread_cmd); if (ret < 0) { /* * Restart interrupted system call. @@ -4537,10 +5260,13 @@ static void *thread_manage_clients(void *data) revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); + health_code_update(&health_thread_cmd); + /* Thread quit pipe has been closed. Killing thread. */ ret = check_thread_quit_pipe(pollfd, revents); if (ret) { - goto error; + err = 0; + goto exit; } /* Event on the registration socket */ @@ -4554,6 +5280,8 @@ static void *thread_manage_clients(void *data) DBG("Wait for client response"); + health_code_update(&health_thread_cmd); + sock = lttcomm_accept_unix_sock(client_sock); if (sock < 0) { goto error; @@ -4582,6 +5310,8 @@ static void *thread_manage_clients(void *data) cmd_ctx->llm = NULL; cmd_ctx->session = NULL; + health_code_update(&health_thread_cmd); + /* * Data is received from the lttng client. The struct * lttcomm_session_msg (lsm) contains the command and data request of @@ -4601,6 +5331,8 @@ static void *thread_manage_clients(void *data) continue; } + health_code_update(&health_thread_cmd); + // TODO: Validate cmd_ctx including sanity check for // security purpose. @@ -4611,18 +5343,29 @@ static void *thread_manage_clients(void *data) * informations for the client. The command context struct contains * everything this function may needs. */ - ret = process_client_msg(cmd_ctx); + ret = process_client_msg(cmd_ctx, sock, &sock_error); rcu_thread_offline(); if (ret < 0) { + if (sock_error) { + ret = close(sock); + if (ret) { + PERROR("close"); + } + sock = -1; + } /* * TODO: Inform client somehow of the fatal error. At * this point, ret < 0 means that a zmalloc failed - * (ENOMEM). Error detected but still accept command. + * (ENOMEM). Error detected but still accept + * command, unless a socket error has been + * detected. */ clean_command_ctx(&cmd_ctx); continue; } + health_code_update(&health_thread_cmd); + DBG("Sending response (size: %d, retcode: %s)", cmd_ctx->lttng_msg_size, lttng_strerror(-cmd_ctx->llm->ret_code)); @@ -4639,9 +5382,18 @@ static void *thread_manage_clients(void *data) sock = -1; clean_command_ctx(&cmd_ctx); + + health_code_update(&health_thread_cmd); } +exit: error: + if (err) { + health_error(&health_thread_cmd); + ERR("Health error occurred in %s", __func__); + } + health_exit(&health_thread_cmd); + DBG("Client thread dying"); unlink(client_unix_sock_path); if (client_sock >= 0) { @@ -5190,6 +5942,11 @@ int main(int argc, char **argv) DEFAULT_GLOBAL_APPS_WAIT_SHM_PATH); } + if (strlen(health_unix_sock_path) == 0) { + snprintf(health_unix_sock_path, sizeof(health_unix_sock_path), + DEFAULT_GLOBAL_HEALTH_UNIX_SOCK); + } + /* Setup kernel consumerd path */ snprintf(kconsumer_data.err_unix_sock_path, PATH_MAX, DEFAULT_KCONSUMERD_ERR_SOCK_PATH, rundir); @@ -5240,6 +5997,12 @@ int main(int argc, char **argv) snprintf(wait_shm_path, PATH_MAX, DEFAULT_HOME_APPS_WAIT_SHM_PATH, geteuid()); } + + /* Set health check Unix path */ + if (strlen(health_unix_sock_path) == 0) { + snprintf(health_unix_sock_path, sizeof(health_unix_sock_path), + DEFAULT_HOME_HEALTH_UNIX_SOCK, home_path); + } } /* Set consumer initial state */ @@ -5372,6 +6135,32 @@ int main(int argc, char **argv) */ uatomic_set(&relayd_net_seq_idx, 1); + /* Init all health thread counters. */ + health_init(&health_thread_cmd); + health_init(&health_thread_kernel); + health_init(&health_thread_app_manage); + health_init(&health_thread_app_reg); + + /* + * Init health counters of the consumer thread. We do a quick hack here to + * the state of the consumer health is fine even if the thread is not + * started. This is simply to ease our life and has no cost what so ever. + */ + health_init(&kconsumer_data.health); + health_poll_update(&kconsumer_data.health); + health_init(&ustconsumer32_data.health); + health_poll_update(&ustconsumer32_data.health); + health_init(&ustconsumer64_data.health); + health_poll_update(&ustconsumer64_data.health); + + /* Create thread to manage the client socket */ + ret = pthread_create(&health_thread, NULL, + thread_manage_health, (void *) NULL); + if (ret != 0) { + PERROR("pthread_create health"); + goto exit_health; + } + /* Create thread to manage the client socket */ ret = pthread_create(&client_thread, NULL, thread_manage_clients, (void *) NULL); @@ -5453,6 +6242,7 @@ exit_dispatch: } exit_client: +exit_health: exit: /* * cleanup() is called when no other thread is running.