- consumer_data->cmd_unix_sock_path);
- if (consumer_data->cmd_sock < 0 || consumer_data->metadata_fd < 0) {
- PERROR("consumer connect cmd socket");
- /* On error, signal condition and quit. */
- signal_consumer_condition(consumer_data, -1);
- goto error;
- }
-
- consumer_data->metadata_sock.fd_ptr = &consumer_data->metadata_fd;
-
- /* Create metadata socket lock. */
- consumer_data->metadata_sock.lock = zmalloc(sizeof(pthread_mutex_t));
- if (consumer_data->metadata_sock.lock == NULL) {
- PERROR("zmalloc pthread mutex");
- goto error;
- }
- pthread_mutex_init(consumer_data->metadata_sock.lock, NULL);
-
- DBG("Consumer command socket ready (fd: %d", consumer_data->cmd_sock);
- DBG("Consumer metadata socket ready (fd: %d)",
- consumer_data->metadata_fd);
-
- /*
- * Remove the consumerd error sock since we've established a connection.
- */
- ret = lttng_poll_del(&events, consumer_data->err_sock);
- if (ret < 0) {
- goto error;
- }
-
- /* Add new accepted error socket. */
- ret = lttng_poll_add(&events, sock, LPOLLIN | LPOLLRDHUP);
- if (ret < 0) {
- goto error;
- }
-
- /* Add metadata socket that is successfully connected. */
- ret = lttng_poll_add(&events, consumer_data->metadata_fd,
- LPOLLIN | LPOLLRDHUP);
- if (ret < 0) {
- goto error;
- }
-
- health_code_update();
-
- /*
- * Transfer the write-end of the channel monitoring and rotate pipe
- * to the consumer by issuing a SET_CHANNEL_MONITOR_PIPE command.
- */
- cmd_socket_wrapper = consumer_allocate_socket(&consumer_data->cmd_sock);
- if (!cmd_socket_wrapper) {
- goto error;
- }
- cmd_socket_wrapper->lock = &consumer_data->lock;
-
- ret = consumer_send_channel_monitor_pipe(cmd_socket_wrapper,
- consumer_data->channel_monitor_pipe);
- if (ret) {
- goto error;
- }
-
- /* Discard the socket wrapper as it is no longer needed. */
- consumer_destroy_socket(cmd_socket_wrapper);
- cmd_socket_wrapper = NULL;
-
- /* The thread is completely initialized, signal that it is ready. */
- signal_consumer_condition(consumer_data, 1);
-
- /* Infinite blocking call, waiting for transmission */
-restart_poll:
- while (1) {
- health_code_update();
-
- /* Exit the thread because the thread quit pipe has been triggered. */
- if (should_quit) {
- /* Not a health error. */
- err = 0;
- goto exit;
- }
-
- health_poll_entry();
- ret = lttng_poll_wait(&events, -1);
- health_poll_exit();
- if (ret < 0) {
- /*
- * Restart interrupted system call.
- */
- if (errno == EINTR) {
- goto restart_poll;
- }
- goto error;
- }
-
- nb_fd = ret;
-
- for (i = 0; i < nb_fd; i++) {
- /* Fetch once the poll data */
- revents = LTTNG_POLL_GETEV(&events, i);
- pollfd = LTTNG_POLL_GETFD(&events, i);
-
- health_code_update();
-
- if (!revents) {
- /* No activity for this FD (poll implementation). */
- continue;
- }
-
- /*
- * Thread quit pipe has been triggered, flag that we should stop
- * but continue the current loop to handle potential data from
- * consumer.
- */
- should_quit = sessiond_check_thread_quit_pipe(pollfd, revents);
-
- if (pollfd == sock) {
- /* Event on the consumerd socket */
- if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)
- && !(revents & LPOLLIN)) {
- ERR("consumer err socket second poll error");
- goto error;
- }
- health_code_update();
- /* Wait for any kconsumerd error */
- ret = lttcomm_recv_unix_sock(sock, &code,
- sizeof(enum lttcomm_return_code));
- if (ret <= 0) {
- ERR("consumer closed the command socket");
- goto error;
- }
-
- ERR("consumer return code : %s",
- lttcomm_get_readable_code(-code));
-
- goto exit;
- } else if (pollfd == consumer_data->metadata_fd) {
- if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)
- && !(revents & LPOLLIN)) {
- ERR("consumer err metadata socket second poll error");
- goto error;
- }
- /* UST metadata requests */
- ret = ust_consumer_metadata_request(
- &consumer_data->metadata_sock);
- if (ret < 0) {
- ERR("Handling metadata request");
- goto error;
- }
- }
- /* No need for an else branch all FDs are tested prior. */
- }
- health_code_update();
- }
-
-exit:
-error:
- /*
- * We lock here because we are about to close the sockets and some other
- * thread might be using them so get exclusive access which will abort all
- * other consumer command by other threads.
- */
- pthread_mutex_lock(&consumer_data->lock);
-
- /* Immediately set the consumerd state to stopped */
- if (consumer_data->type == LTTNG_CONSUMER_KERNEL) {
- uatomic_set(&kernel_consumerd_state, CONSUMER_ERROR);
- } else if (consumer_data->type == LTTNG_CONSUMER64_UST ||
- consumer_data->type == LTTNG_CONSUMER32_UST) {
- uatomic_set(&ust_consumerd_state, CONSUMER_ERROR);
- } else {
- /* Code flow error... */
- assert(0);
- }
-
- if (consumer_data->err_sock >= 0) {
- ret = close(consumer_data->err_sock);
- if (ret) {
- PERROR("close");
- }
- consumer_data->err_sock = -1;
- }
- if (consumer_data->cmd_sock >= 0) {
- ret = close(consumer_data->cmd_sock);
- if (ret) {
- PERROR("close");
- }
- consumer_data->cmd_sock = -1;
- }
- if (consumer_data->metadata_sock.fd_ptr &&
- *consumer_data->metadata_sock.fd_ptr >= 0) {
- ret = close(*consumer_data->metadata_sock.fd_ptr);
- if (ret) {
- PERROR("close");
- }
- }
- if (sock >= 0) {
- ret = close(sock);
- if (ret) {
- PERROR("close");
- }
- }
-
- unlink(consumer_data->err_unix_sock_path);
- unlink(consumer_data->cmd_unix_sock_path);
- pthread_mutex_unlock(&consumer_data->lock);
-
- /* Cleanup metadata socket mutex. */
- if (consumer_data->metadata_sock.lock) {
- pthread_mutex_destroy(consumer_data->metadata_sock.lock);
- free(consumer_data->metadata_sock.lock);
- }
- lttng_poll_clean(&events);
-
- if (cmd_socket_wrapper) {
- consumer_destroy_socket(cmd_socket_wrapper);
- }
-error_poll:
- if (err) {
- health_error();
- ERR("Health error occurred in %s", __func__);
- }
- health_unregister(health_sessiond);
- DBG("consumer thread cleanup completed");
-
- rcu_thread_offline();
- rcu_unregister_thread();
-
- return NULL;
-}
-
-/*
- * This thread receives application command sockets (FDs) on the
- * apps_cmd_pipe and waits (polls) on them until they are closed
- * or an error occurs.
- *
- * At that point, it flushes the data (tracing and metadata) associated
- * with this application and tears down ust app sessions and other
- * associated data structures through ust_app_unregister().
- *
- * Note that this thread never sends commands to the applications
- * through the command sockets; it merely listens for hang-ups
- * and errors on those sockets and cleans-up as they occur.
- */
-static void *thread_manage_apps(void *data)
-{
- int i, ret, pollfd, err = -1;
- ssize_t size_ret;
- uint32_t revents, nb_fd;
- struct lttng_poll_event events;
-
- DBG("[thread] Manage application started");
-
- rcu_register_thread();
- rcu_thread_online();
-
- health_register(health_sessiond, HEALTH_SESSIOND_TYPE_APP_MANAGE);
-
- if (testpoint(sessiond_thread_manage_apps)) {
- goto error_testpoint;
- }
-
- health_code_update();
-
- ret = sessiond_set_thread_pollset(&events, 2);
- if (ret < 0) {
- goto error_poll_create;
- }
-
- ret = lttng_poll_add(&events, apps_cmd_pipe[0], LPOLLIN | LPOLLRDHUP);
- if (ret < 0) {
- goto error;
- }
-
- if (testpoint(sessiond_thread_manage_apps_before_loop)) {
- goto error;
- }
-
- health_code_update();
-
- while (1) {
- DBG("Apps thread polling");
-
- /* Inifinite blocking call, waiting for transmission */
- restart:
- health_poll_entry();
- ret = lttng_poll_wait(&events, -1);
- DBG("Apps thread return from poll on %d fds",
- LTTNG_POLL_GETNB(&events));
- health_poll_exit();
- if (ret < 0) {
- /*
- * Restart interrupted system call.
- */
- if (errno == EINTR) {
- goto restart;
- }
- goto error;
- }
-
- nb_fd = ret;
-
- for (i = 0; i < nb_fd; i++) {
- /* Fetch once the poll data */
- revents = LTTNG_POLL_GETEV(&events, i);
- pollfd = LTTNG_POLL_GETFD(&events, i);
-
- health_code_update();
-
- if (!revents) {
- /* No activity for this FD (poll implementation). */
- continue;
- }
-
- /* Thread quit pipe has been closed. Killing thread. */
- ret = sessiond_check_thread_quit_pipe(pollfd, revents);
- if (ret) {
- err = 0;
- goto exit;
- }
-
- /* Inspect the apps cmd pipe */
- if (pollfd == apps_cmd_pipe[0]) {
- if (revents & LPOLLIN) {
- int sock;
-
- /* Empty pipe */
- size_ret = lttng_read(apps_cmd_pipe[0], &sock, sizeof(sock));
- if (size_ret < sizeof(sock)) {
- PERROR("read apps cmd pipe");
- goto error;
- }
-
- health_code_update();
-
- /*
- * Since this is a command socket (write then read),
- * we only monitor the error events of the socket.
- */
- ret = lttng_poll_add(&events, sock,
- LPOLLERR | LPOLLHUP | LPOLLRDHUP);
- if (ret < 0) {
- goto error;
- }
-
- DBG("Apps with sock %d added to poll set", sock);
- } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
- ERR("Apps command pipe error");
- goto error;
- } else {
- ERR("Unknown poll events %u for sock %d", revents, pollfd);
- goto error;
- }
- } else {
- /*
- * At this point, we know that a registered application made
- * the event at poll_wait.
- */
- if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
- /* Removing from the poll set */
- ret = lttng_poll_del(&events, pollfd);
- if (ret < 0) {
- goto error;
- }
-
- /* Socket closed on remote end. */
- ust_app_unregister(pollfd);
- } else {
- ERR("Unexpected poll events %u for sock %d", revents, pollfd);
- goto error;
- }
- }
-
- health_code_update();
- }
- }
-
-exit:
-error:
- lttng_poll_clean(&events);
-error_poll_create:
-error_testpoint:
- utils_close_pipe(apps_cmd_pipe);
- apps_cmd_pipe[0] = apps_cmd_pipe[1] = -1;
-
- /*
- * We don't clean the UST app hash table here since already registered
- * applications can still be controlled so let them be until the session
- * daemon dies or the applications stop.
- */
-
- if (err) {
- health_error();
- ERR("Health error occurred in %s", __func__);
- }
- health_unregister(health_sessiond);
- DBG("Application communication apps thread cleanup complete");
- rcu_thread_offline();
- rcu_unregister_thread();
- return NULL;
-}
-
-/*
- * Send a socket to a thread This is called from the dispatch UST registration
- * thread once all sockets are set for the application.
- *
- * The sock value can be invalid, we don't really care, the thread will handle
- * it and make the necessary cleanup if so.
- *
- * On success, return 0 else a negative value being the errno message of the
- * write().
- */
-static int send_socket_to_thread(int fd, int sock)
-{
- ssize_t ret;
-
- /*
- * It's possible that the FD is set as invalid with -1 concurrently just
- * before calling this function being a shutdown state of the thread.
- */
- if (fd < 0) {
- ret = -EBADF;
- goto error;
- }
-
- ret = lttng_write(fd, &sock, sizeof(sock));
- if (ret < sizeof(sock)) {
- PERROR("write apps pipe %d", fd);
- if (ret < 0) {
- ret = -errno;
- }
- goto error;
- }
-
- /* All good. Don't send back the write positive ret value. */
- ret = 0;
-error:
- return (int) ret;
-}
-
-/*
- * Sanitize the wait queue of the dispatch registration thread meaning removing
- * invalid nodes from it. This is to avoid memory leaks for the case the UST
- * notify socket is never received.
- */
-static void sanitize_wait_queue(struct ust_reg_wait_queue *wait_queue)
-{
- int ret, nb_fd = 0, i;
- unsigned int fd_added = 0;
- struct lttng_poll_event events;
- struct ust_reg_wait_node *wait_node = NULL, *tmp_wait_node;
-
- assert(wait_queue);
-
- lttng_poll_init(&events);
-
- /* Just skip everything for an empty queue. */
- if (!wait_queue->count) {
- goto end;
- }
-
- ret = lttng_poll_create(&events, wait_queue->count, LTTNG_CLOEXEC);
- if (ret < 0) {
- goto error_create;
- }
-
- cds_list_for_each_entry_safe(wait_node, tmp_wait_node,
- &wait_queue->head, head) {
- assert(wait_node->app);
- ret = lttng_poll_add(&events, wait_node->app->sock,
- LPOLLHUP | LPOLLERR);
- if (ret < 0) {
- goto error;
- }
-
- fd_added = 1;
- }
-
- if (!fd_added) {
- goto end;
- }
-
- /*
- * Poll but don't block so we can quickly identify the faulty events and
- * clean them afterwards from the wait queue.
- */
- ret = lttng_poll_wait(&events, 0);
- if (ret < 0) {
- goto error;
- }
- nb_fd = ret;
-
- for (i = 0; i < nb_fd; i++) {
- /* Get faulty FD. */
- uint32_t revents = LTTNG_POLL_GETEV(&events, i);
- int pollfd = LTTNG_POLL_GETFD(&events, i);
-
- if (!revents) {
- /* No activity for this FD (poll implementation). */
- continue;
- }
-
- cds_list_for_each_entry_safe(wait_node, tmp_wait_node,
- &wait_queue->head, head) {
- if (pollfd == wait_node->app->sock &&
- (revents & (LPOLLHUP | LPOLLERR))) {
- cds_list_del(&wait_node->head);
- wait_queue->count--;
- ust_app_destroy(wait_node->app);
- free(wait_node);
- /*
- * Silence warning of use-after-free in
- * cds_list_for_each_entry_safe which uses
- * __typeof__(*wait_node).
- */
- wait_node = NULL;
- break;
- } else {
- ERR("Unexpected poll events %u for sock %d", revents, pollfd);
- goto error;
- }
- }
- }
-
- if (nb_fd > 0) {
- DBG("Wait queue sanitized, %d node were cleaned up", nb_fd);
- }
-
-end:
- lttng_poll_clean(&events);
- return;
-
-error:
- lttng_poll_clean(&events);
-error_create:
- ERR("Unable to sanitize wait queue");
- return;
-}
-
-/*
- * Dispatch request from the registration threads to the application
- * communication thread.
- */
-static void *thread_dispatch_ust_registration(void *data)
-{
- int ret, err = -1;
- struct cds_wfcq_node *node;
- struct ust_command *ust_cmd = NULL;
- struct ust_reg_wait_node *wait_node = NULL, *tmp_wait_node;
- struct ust_reg_wait_queue wait_queue = {
- .count = 0,
- };
-
- rcu_register_thread();
-
- health_register(health_sessiond, HEALTH_SESSIOND_TYPE_APP_REG_DISPATCH);
-
- if (testpoint(sessiond_thread_app_reg_dispatch)) {
- goto error_testpoint;
- }
-
- health_code_update();
-
- CDS_INIT_LIST_HEAD(&wait_queue.head);
-
- DBG("[thread] Dispatch UST command started");
-
- for (;;) {
- health_code_update();
-
- /* Atomically prepare the queue futex */
- futex_nto1_prepare(&ust_cmd_queue.futex);
-
- if (CMM_LOAD_SHARED(dispatch_thread_exit)) {
- break;
- }
-
- do {
- struct ust_app *app = NULL;
- ust_cmd = NULL;
-
- /*
- * Make sure we don't have node(s) that have hung up before receiving
- * the notify socket. This is to clean the list in order to avoid
- * memory leaks from notify socket that are never seen.
- */
- sanitize_wait_queue(&wait_queue);
-
- health_code_update();
- /* Dequeue command for registration */
- node = cds_wfcq_dequeue_blocking(&ust_cmd_queue.head, &ust_cmd_queue.tail);
- if (node == NULL) {
- DBG("Woken up but nothing in the UST command queue");
- /* Continue thread execution */
- break;
- }
-
- ust_cmd = caa_container_of(node, struct ust_command, node);
-
- DBG("Dispatching UST registration pid:%d ppid:%d uid:%d"
- " gid:%d sock:%d name:%s (version %d.%d)",
- ust_cmd->reg_msg.pid, ust_cmd->reg_msg.ppid,
- ust_cmd->reg_msg.uid, ust_cmd->reg_msg.gid,
- ust_cmd->sock, ust_cmd->reg_msg.name,
- ust_cmd->reg_msg.major, ust_cmd->reg_msg.minor);
-
- if (ust_cmd->reg_msg.type == USTCTL_SOCKET_CMD) {
- wait_node = zmalloc(sizeof(*wait_node));
- if (!wait_node) {
- PERROR("zmalloc wait_node dispatch");
- ret = close(ust_cmd->sock);
- if (ret < 0) {
- PERROR("close ust sock dispatch %d", ust_cmd->sock);
- }
- lttng_fd_put(LTTNG_FD_APPS, 1);
- free(ust_cmd);
- goto error;
- }
- CDS_INIT_LIST_HEAD(&wait_node->head);
-
- /* Create application object if socket is CMD. */
- wait_node->app = ust_app_create(&ust_cmd->reg_msg,
- ust_cmd->sock);
- if (!wait_node->app) {
- ret = close(ust_cmd->sock);
- if (ret < 0) {
- PERROR("close ust sock dispatch %d", ust_cmd->sock);
- }
- lttng_fd_put(LTTNG_FD_APPS, 1);
- free(wait_node);
- free(ust_cmd);
- continue;
- }
- /*
- * Add application to the wait queue so we can set the notify
- * socket before putting this object in the global ht.
- */
- cds_list_add(&wait_node->head, &wait_queue.head);
- wait_queue.count++;
-
- free(ust_cmd);
- /*
- * We have to continue here since we don't have the notify
- * socket and the application MUST be added to the hash table
- * only at that moment.
- */
- continue;
- } else {
- /*
- * Look for the application in the local wait queue and set the
- * notify socket if found.
- */
- cds_list_for_each_entry_safe(wait_node, tmp_wait_node,
- &wait_queue.head, head) {
- health_code_update();
- if (wait_node->app->pid == ust_cmd->reg_msg.pid) {
- wait_node->app->notify_sock = ust_cmd->sock;
- cds_list_del(&wait_node->head);
- wait_queue.count--;
- app = wait_node->app;
- free(wait_node);
- DBG3("UST app notify socket %d is set", ust_cmd->sock);
- break;
- }
- }
-
- /*
- * With no application at this stage the received socket is
- * basically useless so close it before we free the cmd data
- * structure for good.
- */
- if (!app) {
- ret = close(ust_cmd->sock);
- if (ret < 0) {
- PERROR("close ust sock dispatch %d", ust_cmd->sock);
- }
- lttng_fd_put(LTTNG_FD_APPS, 1);
- }
- free(ust_cmd);
- }
-
- if (app) {
- /*
- * @session_lock_list
- *
- * Lock the global session list so from the register up to the
- * registration done message, no thread can see the application
- * and change its state.
- */
- session_lock_list();
- rcu_read_lock();
-
- /*
- * Add application to the global hash table. This needs to be
- * done before the update to the UST registry can locate the
- * application.
- */
- ust_app_add(app);
-
- /* Set app version. This call will print an error if needed. */
- (void) ust_app_version(app);
-
- /* Send notify socket through the notify pipe. */
- ret = send_socket_to_thread(apps_cmd_notify_pipe[1],
- app->notify_sock);
- if (ret < 0) {
- rcu_read_unlock();
- session_unlock_list();
- /*
- * No notify thread, stop the UST tracing. However, this is
- * not an internal error of the this thread thus setting
- * the health error code to a normal exit.
- */
- err = 0;
- goto error;
- }
-
- /*
- * Update newly registered application with the tracing
- * registry info already enabled information.
- */
- update_ust_app(app->sock);
-
- /*
- * Don't care about return value. Let the manage apps threads
- * handle app unregistration upon socket close.
- */
- (void) ust_app_register_done(app);
-
- /*
- * Even if the application socket has been closed, send the app
- * to the thread and unregistration will take place at that
- * place.
- */
- ret = send_socket_to_thread(apps_cmd_pipe[1], app->sock);
- if (ret < 0) {
- rcu_read_unlock();
- session_unlock_list();
- /*
- * No apps. thread, stop the UST tracing. However, this is
- * not an internal error of the this thread thus setting
- * the health error code to a normal exit.
- */
- err = 0;
- goto error;
- }
-
- rcu_read_unlock();
- session_unlock_list();
- }
- } while (node != NULL);
-
- health_poll_entry();
- /* Futex wait on queue. Blocking call on futex() */
- futex_nto1_wait(&ust_cmd_queue.futex);
- health_poll_exit();