X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fmain.c;h=4b6a39dd5ecea5ef176139e7da2c6058f5884094;hp=f6d560c3325fe376a74c084b29b9d195396b1b62;hb=e8209f6b352b3aa279d8d452e396adef6f7159c7;hpb=139ac87245fd1ca18d60a0efca32b50e4c1d8730 diff --git a/src/bin/lttng-sessiond/main.c b/src/bin/lttng-sessiond/main.c index f6d560c33..4b6a39dd5 100644 --- a/src/bin/lttng-sessiond/main.c +++ b/src/bin/lttng-sessiond/main.c @@ -87,6 +87,8 @@ static struct consumer_data kconsumer_data = { .cmd_unix_sock_path = DEFAULT_KCONSUMERD_CMD_SOCK_PATH, .err_sock = -1, .cmd_sock = -1, + .pid_mutex = PTHREAD_MUTEX_INITIALIZER, + .lock = PTHREAD_MUTEX_INITIALIZER, }; static struct consumer_data ustconsumer64_data = { .type = LTTNG_CONSUMER64_UST, @@ -94,6 +96,8 @@ static struct consumer_data ustconsumer64_data = { .cmd_unix_sock_path = DEFAULT_USTCONSUMERD64_CMD_SOCK_PATH, .err_sock = -1, .cmd_sock = -1, + .pid_mutex = PTHREAD_MUTEX_INITIALIZER, + .lock = PTHREAD_MUTEX_INITIALIZER, }; static struct consumer_data ustconsumer32_data = { .type = LTTNG_CONSUMER32_UST, @@ -101,6 +105,8 @@ static struct consumer_data ustconsumer32_data = { .cmd_unix_sock_path = DEFAULT_USTCONSUMERD32_CMD_SOCK_PATH, .err_sock = -1, .cmd_sock = -1, + .pid_mutex = PTHREAD_MUTEX_INITIALIZER, + .lock = PTHREAD_MUTEX_INITIALIZER, }; /* Shared between threads */ @@ -365,23 +371,56 @@ error: */ static void teardown_kernel_session(struct ltt_session *session) { + int ret; + struct lttng_ht_iter iter; + struct ltt_kernel_session *ksess; + struct consumer_socket *socket; + if (!session->kernel_session) { DBG3("No kernel session when tearing down session"); return; } + ksess = session->kernel_session; + DBG("Tearing down kernel session"); + /* + * Destroy relayd associated with the session consumer. This action is + * valid since in order to destroy a session we must acquire the session + * lock. This means that there CAN NOT be stream(s) being sent to a + * consumer since this action also requires the session lock at any time. + * + * At this point, we are sure that not streams data will be lost after this + * command is issued. + */ + if (ksess->consumer && ksess->consumer->type == CONSUMER_DST_NET) { + cds_lfht_for_each_entry(ksess->consumer->socks->ht, &iter.iter, socket, + node.node) { + ret = consumer_send_destroy_relayd(socket, ksess->consumer); + if (ret < 0) { + ERR("Unable to send destroy relayd command to consumer"); + /* Continue since we MUST delete everything at this point. */ + } + } + } + /* * If a custom kernel consumer was registered, close the socket before * tearing down the complete kernel session structure */ - if (kconsumer_data.cmd_sock >= 0 && - session->kernel_session->consumer_fd != kconsumer_data.cmd_sock) { - lttcomm_close_unix_sock(session->kernel_session->consumer_fd); + cds_lfht_for_each_entry(ksess->consumer->socks->ht, &iter.iter, socket, + node.node) { + if (socket->fd != kconsumer_data.cmd_sock) { + rcu_read_lock(); + consumer_del_socket(socket, ksess->consumer); + lttcomm_close_unix_sock(socket->fd); + consumer_destroy_socket(socket); + rcu_read_unlock(); + } } - trace_kernel_destroy_session(session->kernel_session); + trace_kernel_destroy_session(ksess); } /* @@ -391,20 +430,44 @@ static void teardown_kernel_session(struct ltt_session *session) static void teardown_ust_session(struct ltt_session *session) { int ret; + struct lttng_ht_iter iter; + struct ltt_ust_session *usess; + struct consumer_socket *socket; if (!session->ust_session) { DBG3("No UST session when tearing down session"); return; } + usess = session->ust_session; DBG("Tearing down UST session(s)"); - ret = ust_app_destroy_trace_all(session->ust_session); + /* + * Destroy relayd associated with the session consumer. This action is + * valid since in order to destroy a session we must acquire the session + * lock. This means that there CAN NOT be stream(s) being sent to a + * consumer since this action also requires the session lock at any time. + * + * At this point, we are sure that not streams data will be lost after this + * command is issued. + */ + if (usess->consumer && usess->consumer->type == CONSUMER_DST_NET) { + cds_lfht_for_each_entry(usess->consumer->socks->ht, &iter.iter, socket, + node.node) { + ret = consumer_send_destroy_relayd(socket, usess->consumer); + if (ret < 0) { + ERR("Unable to send destroy relayd command to consumer"); + /* Continue since we MUST delete everything at this point. */ + } + } + } + + ret = ust_app_destroy_trace_all(usess); if (ret) { ERR("Error in ust_app_destroy_trace_all"); } - trace_ust_destroy_session(session->ust_session); + trace_ust_destroy_session(usess); } /* @@ -468,8 +531,6 @@ static void cleanup(void) DBG("Closing all UST sockets"); ust_app_clean_list(); - pthread_mutex_destroy(&kconsumer_data.pid_mutex); - if (is_root && !opt_no_kernel) { DBG2("Closing kernel fd"); if (kernel_tracer_fd >= 0) { @@ -634,6 +695,7 @@ static int update_kernel_stream(struct consumer_data *consumer_data, int fd) { int ret = 0; struct ltt_session *session; + struct ltt_kernel_session *ksess; struct ltt_kernel_channel *channel; DBG("Updating kernel streams for channel fd %d", fd); @@ -645,14 +707,9 @@ static int update_kernel_stream(struct consumer_data *consumer_data, int fd) session_unlock(session); continue; } + ksess = session->kernel_session; - /* This is not suppose to be -1 but this is an extra security check */ - if (session->kernel_session->consumer_fd < 0) { - session->kernel_session->consumer_fd = consumer_data->cmd_sock; - } - - cds_list_for_each_entry(channel, - &session->kernel_session->channel_list.head, list) { + cds_list_for_each_entry(channel, &ksess->channel_list.head, list) { if (channel->fd == fd) { DBG("Channel found, updating kernel streams"); ret = kernel_open_channel_stream(channel); @@ -665,13 +722,23 @@ static int update_kernel_stream(struct consumer_data *consumer_data, int fd) * that tracing is started so it is safe to send our updated * stream fds. */ - if (session->kernel_session->consumer_fds_sent == 1 && - session->kernel_session->consumer != NULL) { - ret = kernel_consumer_send_channel_stream( - session->kernel_session->consumer_fd, channel, - session->kernel_session); - if (ret < 0) { - goto error; + if (ksess->consumer_fds_sent == 1 && ksess->consumer != NULL) { + struct lttng_ht_iter iter; + struct consumer_socket *socket; + + + cds_lfht_for_each_entry(ksess->consumer->socks->ht, + &iter.iter, socket, node.node) { + /* Code flow error */ + assert(socket->fd >= 0); + + pthread_mutex_lock(socket->lock); + ret = kernel_consumer_send_channel_stream(socket->fd, + channel, ksess); + pthread_mutex_unlock(socket->lock); + if (ret < 0) { + goto error; + } } } goto error; @@ -1562,7 +1629,8 @@ static int join_consumer_thread(struct consumer_data *consumer_data) void *status; int ret; - if (consumer_data->pid != 0) { + /* Consumer pid must be a real one. */ + if (consumer_data->pid > 0) { ret = kill(consumer_data->pid, SIGTERM); if (ret) { ERR("Error killing consumer daemon"); @@ -1869,21 +1937,24 @@ error: static int init_kernel_tracing(struct ltt_kernel_session *session) { int ret = 0; + struct lttng_ht_iter iter; + struct consumer_socket *socket; - if (session->consumer_fds_sent == 0 && session->consumer != NULL) { - /* - * Assign default kernel consumer socket if no consumer assigned to the - * kernel session. At this point, it's NOT supposed to be -1 but this is - * an extra security check. - */ - if (session->consumer_fd < 0) { - session->consumer_fd = kconsumer_data.cmd_sock; - } + assert(session); - ret = kernel_consumer_send_session(session->consumer_fd, session); - if (ret < 0) { - ret = LTTCOMM_KERN_CONSUMER_FAIL; - goto error; + if (session->consumer_fds_sent == 0 && session->consumer != NULL) { + cds_lfht_for_each_entry(session->consumer->socks->ht, &iter.iter, + socket, node.node) { + /* Code flow error */ + assert(socket->fd >= 0); + + pthread_mutex_lock(socket->lock); + ret = kernel_consumer_send_session(socket->fd, session); + pthread_mutex_unlock(socket->lock); + if (ret < 0) { + ret = LTTCOMM_KERN_CONSUMER_FAIL; + goto error; + } } } @@ -2054,6 +2125,8 @@ static int setup_relayd(struct ltt_session *session) int ret = LTTCOMM_OK; struct ltt_ust_session *usess; struct ltt_kernel_session *ksess; + struct consumer_socket *socket; + struct lttng_ht_iter iter; assert(session); @@ -2062,34 +2135,37 @@ static int setup_relayd(struct ltt_session *session) DBG2("Setting relayd for session %s", session->name); - if (usess && usess->consumer->sock == -1 && - usess->consumer->type == CONSUMER_DST_NET && + if (usess && usess->consumer->type == CONSUMER_DST_NET && usess->consumer->enabled) { - /* Setup relayd for 64 bits consumer */ - if (ust_consumerd64_fd >= 0) { + /* For each consumer socket, send relayd sockets */ + cds_lfht_for_each_entry(usess->consumer->socks->ht, &iter.iter, + socket, node.node) { + /* Code flow error */ + assert(socket->fd >= 0); + + pthread_mutex_lock(socket->lock); send_sockets_relayd_consumer(LTTNG_DOMAIN_UST, session, - usess->consumer, ust_consumerd64_fd); + usess->consumer, socket->fd); + pthread_mutex_unlock(socket->lock); if (ret != LTTCOMM_OK) { goto error; } } - - /* Setup relayd for 32 bits consumer */ - if (ust_consumerd32_fd >= 0) { - send_sockets_relayd_consumer(LTTNG_DOMAIN_UST, session, - usess->consumer, ust_consumerd32_fd); + } else if (ksess && ksess->consumer->type == CONSUMER_DST_NET && + ksess->consumer->enabled) { + cds_lfht_for_each_entry(ksess->consumer->socks->ht, &iter.iter, + socket, node.node) { + /* Code flow error */ + assert(socket->fd >= 0); + + pthread_mutex_lock(socket->lock); + send_sockets_relayd_consumer(LTTNG_DOMAIN_KERNEL, session, + ksess->consumer, socket->fd); + pthread_mutex_unlock(socket->lock); if (ret != LTTCOMM_OK) { goto error; } } - } else if (ksess && ksess->consumer->sock == -1 && - ksess->consumer->type == CONSUMER_DST_NET && - ksess->consumer->enabled) { - send_sockets_relayd_consumer(LTTNG_DOMAIN_KERNEL, session, - ksess->consumer, ksess->consumer_fd); - if (ret != LTTCOMM_OK) { - goto error; - } } error: @@ -2219,11 +2295,6 @@ static int create_kernel_session(struct ltt_session *session) goto error; } - /* Set kernel consumer socket fd */ - if (kconsumer_data.cmd_sock >= 0) { - session->kernel_session->consumer_fd = kconsumer_data.cmd_sock; - } - /* Copy session output to the newly created Kernel session */ ret = copy_session_consumer(LTTNG_DOMAIN_KERNEL, session); if (ret != LTTCOMM_OK) { @@ -3465,6 +3536,9 @@ static int cmd_destroy_session(struct ltt_session *session, char *name) { int ret; + /* Safety net */ + assert(session); + /* Clean kernel session teardown */ teardown_kernel_session(session); /* UST session teardown */ @@ -3534,6 +3608,7 @@ static int cmd_register_consumer(struct ltt_session *session, int domain, char *sock_path) { int ret, sock; + struct consumer_socket *socket; switch (domain) { case LTTNG_DOMAIN_KERNEL: @@ -3549,7 +3624,29 @@ static int cmd_register_consumer(struct ltt_session *session, int domain, goto error; } - session->kernel_session->consumer_fd = sock; + socket = consumer_allocate_socket(sock); + if (socket == NULL) { + ret = LTTCOMM_FATAL; + close(sock); + goto error; + } + + socket->lock = zmalloc(sizeof(pthread_mutex_t)); + if (socket->lock == NULL) { + PERROR("zmalloc pthread mutex"); + ret = LTTCOMM_FATAL; + goto error; + } + pthread_mutex_init(socket->lock, NULL); + + rcu_read_lock(); + consumer_add_socket(socket, session->kernel_session->consumer); + rcu_read_unlock(); + + pthread_mutex_lock(&kconsumer_data.pid_mutex); + kconsumer_data.pid = -1; + pthread_mutex_unlock(&kconsumer_data.pid_mutex); + break; default: /* TODO: Userspace tracing */ @@ -3705,6 +3802,10 @@ static int cmd_set_consumer_uri(int domain, struct ltt_session *session, switch (domain) { case LTTNG_DOMAIN_KERNEL: + { + struct lttng_ht_iter iter; + struct consumer_socket *socket; + /* Code flow error if we don't have a kernel session here. */ assert(ksess); @@ -3738,11 +3839,20 @@ static int cmd_set_consumer_uri(int domain, struct ltt_session *session, sizeof(consumer->subdir)); } - ret = send_socket_relayd_consumer(domain, session, uri, consumer, - ksess->consumer_fd); - if (ret != LTTCOMM_OK) { - goto error; + cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, + socket, node.node) { + /* Code flow error */ + assert(socket->fd >= 0); + + pthread_mutex_lock(socket->lock); + ret = send_socket_relayd_consumer(domain, session, uri, consumer, + socket->fd); + pthread_mutex_unlock(socket->lock); + if (ret != LTTCOMM_OK) { + goto error; + } } + break; case LTTNG_DST_PATH: DBG2("Setting trace directory path from URI to %s", uri->dst.path); @@ -3758,6 +3868,7 @@ static int cmd_set_consumer_uri(int domain, struct ltt_session *session, /* All good! */ break; + } case LTTNG_DOMAIN_UST: /* Code flow error if we don't have a kernel session here. */ assert(usess); @@ -3778,6 +3889,8 @@ static int cmd_set_consumer_uri(int domain, struct ltt_session *session, case LTTNG_DST_IPV4: case LTTNG_DST_IPV6: { + struct consumer_socket *socket; + DBG2("Setting network URI for UST session %s", session->name); /* Set URI into consumer object */ @@ -3793,22 +3906,31 @@ static int cmd_set_consumer_uri(int domain, struct ltt_session *session, sizeof(consumer->subdir)); } - if (ust_consumerd64_fd >= 0) { + rcu_read_lock(); + socket = consumer_find_socket(uatomic_read(&ust_consumerd64_fd), + consumer); + if (socket != NULL) { + pthread_mutex_lock(socket->lock); ret = send_socket_relayd_consumer(domain, session, uri, - consumer, ust_consumerd64_fd); + consumer, socket->fd); + pthread_mutex_unlock(socket->lock); if (ret != LTTCOMM_OK) { goto error; } } - if (ust_consumerd32_fd >= 0) { + socket = consumer_find_socket(uatomic_read(&ust_consumerd32_fd), + consumer); + if (socket != NULL) { + pthread_mutex_lock(socket->lock); ret = send_socket_relayd_consumer(domain, session, uri, - consumer, ust_consumerd32_fd); + consumer, socket->fd); + pthread_mutex_unlock(socket->lock); if (ret != LTTCOMM_OK) { goto error; } } - + rcu_read_unlock(); break; } case LTTNG_DST_PATH: @@ -4194,6 +4316,8 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock, /* Need a session for kernel command */ if (need_tracing_session) { + struct consumer_socket *socket; + if (cmd_ctx->session->kernel_session == NULL) { ret = create_kernel_session(cmd_ctx->session); if (ret < 0) { @@ -4213,13 +4337,29 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock, goto error; } uatomic_set(&kernel_consumerd_state, CONSUMER_STARTED); - - /* Set consumer fd of the session */ - cmd_ctx->session->kernel_session->consumer_fd = - kconsumer_data.cmd_sock; } else { pthread_mutex_unlock(&kconsumer_data.pid_mutex); } + + /* Set kernel consumer socket fd */ + if (kconsumer_data.cmd_sock >= 0) { + rcu_read_lock(); + socket = consumer_find_socket(kconsumer_data.cmd_sock, + cmd_ctx->session->kernel_session->consumer); + rcu_read_unlock(); + if (socket == NULL) { + socket = consumer_allocate_socket(kconsumer_data.cmd_sock); + if (socket == NULL) { + goto error; + } + + socket->lock = &kconsumer_data.lock; + rcu_read_lock(); + consumer_add_socket(socket, + cmd_ctx->session->kernel_session->consumer); + rcu_read_unlock(); + } + } } break; @@ -4232,6 +4372,8 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock, } if (need_tracing_session) { + struct consumer_socket *socket; + if (cmd_ctx->session->ust_session == NULL) { ret = create_ust_session(cmd_ctx->session, &cmd_ctx->lsm->domain); @@ -4250,15 +4392,40 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock, ret = start_consumerd(&ustconsumer64_data); if (ret < 0) { ret = LTTCOMM_UST_CONSUMER64_FAIL; - ust_consumerd64_fd = -EINVAL; + uatomic_set(&ust_consumerd64_fd, -EINVAL); goto error; } - ust_consumerd64_fd = ustconsumer64_data.cmd_sock; + uatomic_set(&ust_consumerd64_fd, ustconsumer64_data.cmd_sock); uatomic_set(&ust_consumerd_state, CONSUMER_STARTED); } else { pthread_mutex_unlock(&ustconsumer64_data.pid_mutex); } + + /* + * Setup socket for consumer 64 bit. No need for atomic access + * since it was set above and can ONLY be set in this thread. + */ + if (ust_consumerd64_fd >= 0) { + rcu_read_lock(); + socket = consumer_find_socket(uatomic_read(&ust_consumerd64_fd), + cmd_ctx->session->ust_session->consumer); + rcu_read_unlock(); + if (socket == NULL) { + socket = consumer_allocate_socket(ust_consumerd64_fd); + if (socket == NULL) { + goto error; + } + socket->lock = &ustconsumer32_data.lock; + + rcu_read_lock(); + consumer_add_socket(socket, + cmd_ctx->session->ust_session->consumer); + rcu_read_unlock(); + } + DBG3("UST consumer 64 bit socket set to %d", socket->fd); + } + /* 32-bit */ if (consumerd32_bin[0] != '\0' && ustconsumer32_data.pid == 0 && @@ -4267,15 +4434,39 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock, ret = start_consumerd(&ustconsumer32_data); if (ret < 0) { ret = LTTCOMM_UST_CONSUMER32_FAIL; - ust_consumerd32_fd = -EINVAL; + uatomic_set(&ust_consumerd32_fd, -EINVAL); goto error; } - ust_consumerd32_fd = ustconsumer32_data.cmd_sock; + uatomic_set(&ust_consumerd32_fd, ustconsumer32_data.cmd_sock); uatomic_set(&ust_consumerd_state, CONSUMER_STARTED); } else { pthread_mutex_unlock(&ustconsumer32_data.pid_mutex); } + + /* + * Setup socket for consumer 64 bit. No need for atomic access + * since it was set above and can ONLY be set in this thread. + */ + if (ust_consumerd32_fd >= 0) { + rcu_read_lock(); + socket = consumer_find_socket(uatomic_read(&ust_consumerd64_fd), + cmd_ctx->session->ust_session->consumer); + rcu_read_unlock(); + if (socket == NULL) { + socket = consumer_allocate_socket(ust_consumerd32_fd); + if (socket == NULL) { + goto error; + } + socket->lock = &ustconsumer32_data.lock; + + rcu_read_lock(); + consumer_add_socket(socket, + cmd_ctx->session->ust_session->consumer); + rcu_read_unlock(); + } + DBG3("UST consumer 32 bit socket set to %d", socket->fd); + } } break; }