#include "ht-cleanup.h"
#include "sessiond-config.h"
#include "timer.h"
+#include "thread.h"
static const char *help_msg =
#ifdef LTTNG_EMBED_HELP
/* Sockets and FDs */
static int client_sock = -1;
static int apps_sock = -1;
-static int kernel_poll_pipe[2] = { -1, -1 };
/*
* This pipe is used to inform the thread managing application communication
static pthread_t client_thread;
static pthread_t kernel_thread;
static pthread_t dispatch_thread;
-static pthread_t health_thread;
-static pthread_t ht_cleanup_thread;
static pthread_t agent_reg_thread;
static pthread_t load_session_thread;
-static pthread_t notification_thread;
-static pthread_t rotation_thread;
-static pthread_t timer_thread;
/*
* UST registration command queue. This queue is tied with a futex and uses a N
*/
static struct ust_cmd_queue ust_cmd_queue;
-/*
- * Pointer initialized before thread creation.
- *
- * This points to the tracing session list containing the session count and a
- * mutex lock. The lock MUST be taken if you iterate over the list. The lock
- * MUST NOT be taken if you call a public function in session.c.
- *
- * The lock is nested inside the structure: session_list_ptr->lock. Please use
- * session_lock_list and session_unlock_list for lock acquisition.
- */
-static struct ltt_session_list *session_list_ptr;
-
static const char *module_proc_lttng = "/proc/lttng";
/*
/* Am I root or not. Set to 1 if the daemon is running as root */
static int is_root;
-/* Rotation thread handle. */
-static struct rotation_thread_handle *rotation_thread_handle;
-
/*
* Stop all threads by closing the thread quit pipe.
*/
static void sessiond_cleanup(void)
{
int ret;
- struct ltt_session *sess, *stmp;
+ struct ltt_session_list *session_list = session_get_list();
DBG("Cleanup sessiond");
DBG("Removing directory %s", config.consumerd64_path.value);
(void) rmdir(config.consumerd64_path.value);
- DBG("Cleaning up all sessions");
-
- /* Destroy session list mutex */
- if (session_list_ptr != NULL) {
- pthread_mutex_destroy(&session_list_ptr->lock);
-
- /* Cleanup ALL session */
- cds_list_for_each_entry_safe(sess, stmp,
- &session_list_ptr->head, list) {
- cmd_destroy_session(sess, kernel_poll_pipe[1],
- notification_thread_handle);
- }
- }
+ pthread_mutex_destroy(&session_list->lock);
wait_consumer(&kconsumer_data);
wait_consumer(&ustconsumer64_data);
static int update_kernel_poll(struct lttng_poll_event *events)
{
int ret;
- struct ltt_session *session;
struct ltt_kernel_channel *channel;
+ struct ltt_session *session;
+ const struct ltt_session_list *session_list = session_get_list();
DBG("Updating kernel poll set");
session_lock_list();
- cds_list_for_each_entry(session, &session_list_ptr->head, list) {
+ cds_list_for_each_entry(session, &session_list->head, list) {
+ if (!session_get(session)) {
+ continue;
+ }
session_lock(session);
if (session->kernel_session == NULL) {
session_unlock(session);
+ session_put(session);
continue;
}
ret = lttng_poll_add(events, channel->fd, LPOLLIN | LPOLLRDNORM);
if (ret < 0) {
session_unlock(session);
+ session_put(session);
goto error;
}
DBG("Channel fd %d added to kernel set", channel->fd);
struct ltt_session *session;
struct ltt_kernel_session *ksess;
struct ltt_kernel_channel *channel;
+ const struct ltt_session_list *session_list = session_get_list();
DBG("Updating kernel streams for channel fd %d", fd);
session_lock_list();
- cds_list_for_each_entry(session, &session_list_ptr->head, list) {
+ cds_list_for_each_entry(session, &session_list->head, list) {
+ if (!session_get(session)) {
+ continue;
+ }
session_lock(session);
if (session->kernel_session == NULL) {
session_unlock(session);
+ session_put(session);
continue;
}
ksess = session->kernel_session;
rcu_read_unlock();
}
session_unlock(session);
+ session_put(session);
}
session_unlock_list();
return ret;
error:
session_unlock(session);
+ session_put(session);
session_unlock_list();
return ret;
}
static void update_ust_app(int app_sock)
{
struct ltt_session *sess, *stmp;
+ const struct ltt_session_list *session_list = session_get_list();
/* Consumer is in an ERROR state. Stop any application update. */
if (uatomic_read(&ust_consumerd_state) == CONSUMER_ERROR) {
}
/* For all tracing session(s) */
- cds_list_for_each_entry_safe(sess, stmp, &session_list_ptr->head, list) {
+ cds_list_for_each_entry_safe(sess, stmp, &session_list->head, list) {
struct ust_app *app;
+ if (!session_get(sess)) {
+ continue;
+ }
session_lock(sess);
if (!sess->ust_session) {
goto unlock_session;
rcu_read_unlock();
unlock_session:
session_unlock(sess);
+ session_put(sess);
}
}
{
unsigned int i = 0;
struct ltt_session *session;
+ const struct ltt_session_list *session_list = session_get_list();
DBG("Counting number of available session for UID %d GID %d",
uid, gid);
- cds_list_for_each_entry(session, &session_list_ptr->head, list) {
- /*
- * Only list the sessions the user can control.
- */
- if (!session_access_ok(session, uid, gid)) {
+ cds_list_for_each_entry(session, &session_list->head, list) {
+ if (!session_get(session)) {
continue;
}
- i++;
+ session_lock(session);
+ /* Only count the sessions the user can control. */
+ if (session_access_ok(session, uid, gid) &&
+ !session->destroyed) {
+ i++;
+ }
+ session_unlock(session);
+ session_put(session);
}
return i;
}
if (need_tracing_session) {
if (!session_access_ok(cmd_ctx->session,
LTTNG_SOCK_GET_UID_CRED(&cmd_ctx->creds),
- LTTNG_SOCK_GET_GID_CRED(&cmd_ctx->creds))) {
+ LTTNG_SOCK_GET_GID_CRED(&cmd_ctx->creds)) ||
+ cmd_ctx->session->destroyed) {
ret = LTTNG_ERR_EPERM;
goto error;
}
}
case LTTNG_DESTROY_SESSION:
{
- ret = cmd_destroy_session(cmd_ctx->session, kernel_poll_pipe[1],
+ ret = cmd_destroy_session(cmd_ctx->session,
notification_thread_handle);
-
- /* Set session to NULL so we do not unlock it after free. */
- cmd_ctx->session = NULL;
break;
}
case LTTNG_LIST_DOMAINS:
setup_error:
if (cmd_ctx->session) {
session_unlock(cmd_ctx->session);
+ session_put(cmd_ctx->session);
}
if (need_tracing_session) {
session_unlock_list();
return ret;
}
-/*
- * Thread managing health check socket.
- */
-static void *thread_manage_health(void *data)
-{
- int sock = -1, new_sock = -1, ret, i, pollfd, err = -1;
- uint32_t revents, nb_fd;
- struct lttng_poll_event events;
- struct health_comm_msg msg;
- struct health_comm_reply reply;
-
- DBG("[thread] Manage health check started");
-
- rcu_register_thread();
-
- /* We might hit an error path before this is created. */
- lttng_poll_init(&events);
-
- /* Create unix socket */
- sock = lttcomm_create_unix_sock(config.health_unix_sock_path.value);
- if (sock < 0) {
- ERR("Unable to create health check Unix socket");
- goto error;
- }
-
- if (is_root) {
- /* lttng health client socket path permissions */
- ret = chown(config.health_unix_sock_path.value, 0,
- utils_get_group_id(config.tracing_group_name.value));
- if (ret < 0) {
- ERR("Unable to set group on %s", config.health_unix_sock_path.value);
- PERROR("chown");
- goto error;
- }
-
- ret = chmod(config.health_unix_sock_path.value,
- S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
- if (ret < 0) {
- ERR("Unable to set permissions on %s", config.health_unix_sock_path.value);
- PERROR("chmod");
- goto error;
- }
- }
-
- /*
- * Set the CLOEXEC flag. Return code is useless because either way, the
- * show must go on.
- */
- (void) utils_set_fd_cloexec(sock);
-
- ret = lttcomm_listen_unix_sock(sock);
- if (ret < 0) {
- goto error;
- }
-
- /*
- * Pass 2 as size here for the thread quit pipe and client_sock. Nothing
- * more will be added to this poll set.
- */
- ret = sessiond_set_thread_pollset(&events, 2);
- if (ret < 0) {
- goto error;
- }
-
- /* Add the application registration socket */
- ret = lttng_poll_add(&events, sock, LPOLLIN | LPOLLPRI);
- if (ret < 0) {
- goto error;
- }
-
- sessiond_notify_ready();
-
- while (1) {
- DBG("Health check ready");
-
- /* Inifinite blocking call, waiting for transmission */
-restart:
- ret = lttng_poll_wait(&events, -1);
- if (ret < 0) {
- /*
- * Restart interrupted system call.
- */
- if (errno == EINTR) {
- goto restart;
- }
- goto error;
- }
-
- nb_fd = ret;
-
- for (i = 0; i < nb_fd; i++) {
- /* Fetch once the poll data */
- revents = LTTNG_POLL_GETEV(&events, i);
- pollfd = LTTNG_POLL_GETFD(&events, i);
-
- if (!revents) {
- /* No activity for this FD (poll implementation). */
- continue;
- }
-
- /* Thread quit pipe has been closed. Killing thread. */
- ret = sessiond_check_thread_quit_pipe(pollfd, revents);
- if (ret) {
- err = 0;
- goto exit;
- }
-
- /* Event on the registration socket */
- if (pollfd == sock) {
- if (revents & LPOLLIN) {
- continue;
- } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
- ERR("Health socket poll error");
- goto error;
- } else {
- ERR("Unexpected poll events %u for sock %d", revents, pollfd);
- goto error;
- }
- }
- }
-
- new_sock = lttcomm_accept_unix_sock(sock);
- if (new_sock < 0) {
- goto error;
- }
-
- /*
- * Set the CLOEXEC flag. Return code is useless because either way, the
- * show must go on.
- */
- (void) utils_set_fd_cloexec(new_sock);
-
- DBG("Receiving data from client for health...");
- ret = lttcomm_recv_unix_sock(new_sock, (void *)&msg, sizeof(msg));
- if (ret <= 0) {
- DBG("Nothing recv() from client... continuing");
- ret = close(new_sock);
- if (ret) {
- PERROR("close");
- }
- continue;
- }
-
- rcu_thread_online();
-
- memset(&reply, 0, sizeof(reply));
- for (i = 0; i < NR_HEALTH_SESSIOND_TYPES; i++) {
- /*
- * health_check_state returns 0 if health is
- * bad.
- */
- if (!health_check_state(health_sessiond, i)) {
- reply.ret_code |= 1ULL << i;
- }
- }
-
- DBG2("Health check return value %" PRIx64, reply.ret_code);
-
- ret = send_unix_sock(new_sock, (void *) &reply, sizeof(reply));
- if (ret < 0) {
- ERR("Failed to send health data back to client");
- }
-
- /* End of transmission */
- ret = close(new_sock);
- if (ret) {
- PERROR("close");
- }
- }
-
-exit:
-error:
- if (err) {
- ERR("Health error occurred in %s", __func__);
- }
- DBG("Health check thread dying");
- unlink(config.health_unix_sock_path.value);
- if (sock >= 0) {
- ret = close(sock);
- if (ret) {
- PERROR("close");
- }
- }
-
- lttng_poll_clean(&events);
- stop_threads();
- rcu_unregister_thread();
- return NULL;
-}
-
/*
* This thread manage all clients request using the unix client socket for
* communication.
health_code_update();
+ /* Set state as running. */
+ sessiond_set_client_thread_state(true);
+
while (1) {
const struct cmd_completion_handler *cmd_completion_handler;
errno = ret;
PERROR("join_consumer ust64");
}
+
+ /* Set state as non-running. */
+ sessiond_set_client_thread_state(false);
return NULL;
}
return ret;
}
+static void destroy_all_sessions_and_wait(void)
+{
+ struct ltt_session *session, *tmp;
+ struct ltt_session_list *session_list;
+
+ session_list = session_get_list();
+ DBG("Initiating destruction of all sessions");
+
+ if (!session_list) {
+ return;
+ }
+
+ /*
+ * Ensure that the client thread is no longer accepting new commands,
+ * which could cause new sessions to be created.
+ */
+ sessiond_wait_client_thread_stopped();
+
+ session_lock_list();
+ /* Initiate the destruction of all sessions. */
+ cds_list_for_each_entry_safe(session, tmp,
+ &session_list->head, list) {
+ if (!session_get(session)) {
+ continue;
+ }
+
+ session_lock(session);
+ if (session->destroyed) {
+ goto unlock_session;
+ }
+ (void) cmd_destroy_session(session,
+ notification_thread_handle);
+ unlock_session:
+ session_unlock(session);
+ session_put(session);
+ }
+ session_unlock_list();
+
+ /* Wait for the destruction of all sessions to complete. */
+ DBG("Waiting for the destruction of all sessions to complete");
+ session_list_wait_empty();
+ DBG("Destruction of all sessions completed");
+}
+
/*
* main
*/
struct lttng_pipe *ust32_channel_monitor_pipe = NULL,
*ust64_channel_monitor_pipe = NULL,
*kernel_channel_monitor_pipe = NULL;
- bool notification_thread_launched = false;
- bool rotation_thread_launched = false;
- bool timer_thread_launched = false;
- struct timer_thread_parameters timer_thread_ctx;
+ struct lttng_thread *ht_cleanup_thread = NULL;
+ struct timer_thread_parameters timer_thread_parameters;
+ /* Rotation thread handle. */
+ struct rotation_thread_handle *rotation_thread_handle = NULL;
/* Queue of rotation jobs populated by the sessiond-timer. */
struct rotation_thread_timer_queue *rotation_timer_queue = NULL;
- sem_t notification_thread_ready;
init_kernel_workarounds();
}
/* Create thread to clean up RCU hash tables */
- if (init_ht_cleanup_thread(&ht_cleanup_thread)) {
+ ht_cleanup_thread = launch_ht_cleanup_thread();
+ if (!ht_cleanup_thread) {
retval = -1;
goto exit_ht_cleanup;
}
retval = -1;
goto exit_init_data;
}
- timer_thread_ctx.rotation_thread_job_queue = rotation_timer_queue;
+ timer_thread_parameters.rotation_thread_job_queue =
+ rotation_timer_queue;
ust64_channel_monitor_pipe = lttng_pipe_open(0);
if (!ust64_channel_monitor_pipe) {
/* Init UST command queue. */
cds_wfcq_init(&ust_cmd_queue.head, &ust_cmd_queue.tail);
- /*
- * Get session list pointer. This pointer MUST NOT be free'd. This list
- * is statically declared in session.c
- */
- session_list_ptr = session_get_list();
-
cmd_init();
/* Check for the application socket timeout env variable. */
load_info->path = config.load_session_path.value;
/* Create health-check thread. */
- ret = pthread_create(&health_thread, default_pthread_attr(),
- thread_manage_health, (void *) NULL);
- if (ret) {
- errno = ret;
- PERROR("pthread_create health");
+ if (!launch_health_management_thread()) {
retval = -1;
goto exit_health;
}
- /*
- * The rotation thread needs the notification thread to be ready before
- * creating the rotate_notification_channel, so we use this semaphore as
- * a rendez-vous point.
- */
- sem_init(¬ification_thread_ready, 0, 0);
-
/* notification_thread_data acquires the pipes' read side. */
notification_thread_handle = notification_thread_handle_create(
ust32_channel_monitor_pipe,
ust64_channel_monitor_pipe,
- kernel_channel_monitor_pipe,
- ¬ification_thread_ready);
+ kernel_channel_monitor_pipe);
if (!notification_thread_handle) {
retval = -1;
ERR("Failed to create notification thread shared data");
- stop_threads();
goto exit_notification;
}
/* Create notification thread. */
- ret = pthread_create(¬ification_thread, default_pthread_attr(),
- thread_notification, notification_thread_handle);
- if (ret) {
- errno = ret;
- PERROR("pthread_create notification");
+ if (!launch_notification_thread(notification_thread_handle)) {
retval = -1;
- stop_threads();
goto exit_notification;
}
- notification_thread_launched = true;
/* Create timer thread. */
- ret = pthread_create(&timer_thread, default_pthread_attr(),
- timer_thread_func, &timer_thread_ctx);
- if (ret) {
- errno = ret;
- PERROR("pthread_create timer");
+ if (!launch_timer_thread(&timer_thread_parameters)) {
retval = -1;
- stop_threads();
goto exit_notification;
}
- timer_thread_launched = true;
/* rotation_thread_data acquires the pipes' read side. */
rotation_thread_handle = rotation_thread_handle_create(
rotation_timer_queue,
- notification_thread_handle,
- ¬ification_thread_ready);
+ notification_thread_handle);
if (!rotation_thread_handle) {
retval = -1;
ERR("Failed to create rotation thread shared data");
}
/* Create rotation thread. */
- ret = pthread_create(&rotation_thread, default_pthread_attr(),
- thread_rotation, rotation_thread_handle);
- if (ret) {
- errno = ret;
- PERROR("pthread_create rotation");
+ if (!launch_rotation_thread(rotation_thread_handle)) {
retval = -1;
- stop_threads();
goto exit_rotation;
}
- rotation_thread_launched = true;
/* Create thread to manage the client socket */
ret = pthread_create(&client_thread, default_pthread_attr(),
PERROR("pthread_join load_session_thread");
retval = -1;
}
+
+ /* Initiate teardown once activity occurs on the quit pipe. */
+ sessiond_wait_for_quit_pipe(-1U);
+ destroy_all_sessions_and_wait();
exit_load_session:
if (is_root && !config.no_kernel) {
exit_client:
exit_rotation:
exit_notification:
- sem_destroy(¬ification_thread_ready);
- ret = pthread_join(health_thread, &status);
- if (ret) {
- errno = ret;
- PERROR("pthread_join health thread");
- retval = -1;
- }
-
+ lttng_thread_list_shutdown_orphans();
exit_health:
exit_init_data:
/*
*/
rcu_barrier();
- /*
- * The teardown of the notification system is performed after the
- * session daemon's teardown in order to allow it to be notified
- * of the active session and channels at the moment of the teardown.
- */
- if (notification_thread_handle) {
- if (notification_thread_launched) {
- notification_thread_command_quit(
- notification_thread_handle);
- ret = pthread_join(notification_thread, &status);
- if (ret) {
- errno = ret;
- PERROR("pthread_join notification thread");
- retval = -1;
- }
- }
- notification_thread_handle_destroy(notification_thread_handle);
+ if (ht_cleanup_thread) {
+ lttng_thread_shutdown(ht_cleanup_thread);
+ lttng_thread_put(ht_cleanup_thread);
}
+ rcu_thread_offline();
+ rcu_unregister_thread();
+
if (rotation_thread_handle) {
- if (rotation_thread_launched) {
- ret = pthread_join(rotation_thread, &status);
- if (ret) {
- errno = ret;
- PERROR("pthread_join rotation thread");
- retval = -1;
- }
- }
rotation_thread_handle_destroy(rotation_thread_handle);
}
- if (timer_thread_launched) {
- timer_exit();
- ret = pthread_join(timer_thread, &status);
- if (ret) {
- errno = ret;
- PERROR("pthread_join timer thread");
- retval = -1;
- }
- }
-
/*
* After the rotation and timer thread have quit, we can safely destroy
* the rotation_timer_queue.
*/
rotation_thread_timer_queue_destroy(rotation_timer_queue);
-
- rcu_thread_offline();
- rcu_unregister_thread();
-
- ret = fini_ht_cleanup_thread(&ht_cleanup_thread);
- if (ret) {
- retval = -1;
+ /*
+ * The teardown of the notification system is performed after the
+ * session daemon's teardown in order to allow it to be notified
+ * of the active session and channels at the moment of the teardown.
+ */
+ if (notification_thread_handle) {
+ notification_thread_handle_destroy(notification_thread_handle);
}
lttng_pipe_destroy(ust32_channel_monitor_pipe);
lttng_pipe_destroy(ust64_channel_monitor_pipe);