/*
- * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
*
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License, version 2 only, as
- * published by the Free Software Foundation.
+ * SPDX-License-Identifier: GPL-2.0-only
*
- * This program is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
- * more details.
- *
- * You should have received a copy of the GNU General Public License along with
- * this program; if not, write to the Free Software Foundation, Inc., 51
- * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#define _LGPL_SOURCE
#include <common/utils.h>
#include <common/align.h>
#include <common/time.h>
-#include <sys/eventfd.h>
#include <sys/stat.h>
#include <time.h>
#include <signal.h>
#include "notification-thread-commands.h"
#include "lttng-sessiond.h"
#include "health-sessiond.h"
+#include "thread.h"
+#include "testpoint.h"
+
+#include "kernel.h"
+#include <common/kernel-ctl/kernel-ctl.h>
#include <urcu.h>
#include <urcu/list.h>
#include <urcu/rculfhash.h>
+
+int notifier_consumption_paused;
/*
* Destroy the thread data previously created by the init function.
*/
assert(cds_list_empty(&handle->cmd_queue.list));
pthread_mutex_destroy(&handle->cmd_queue.lock);
+ sem_destroy(&handle->ready);
if (handle->cmd_queue.event_pipe) {
lttng_pipe_destroy(handle->cmd_queue.event_pipe);
PERROR("close kernel consumer channel monitoring pipe");
}
}
+
end:
free(handle);
}
goto end;
}
+ sem_init(&handle->ready, 0, 0);
+
event_pipe = lttng_pipe_open(FD_CLOEXEC);
if (!event_pipe) {
ERR("event_pipe creation");
} else {
handle->channel_monitoring_pipes.kernel_consumer = -1;
}
+
end:
return handle;
error:
goto error;
}
} else {
- char *home_path = utils_get_home_dir();
+ const char *home_path = utils_get_home_dir();
if (!home_path) {
ERR("Can't get HOME directory for socket creation");
int ret;
char *sock_path = get_notification_channel_sock_path();
- DBG("[notification-thread] Destroying notification channel socket");
+ DBG("Destroying notification channel socket");
if (sock_path) {
ret = unlink(sock_path);
int fd = -1, ret;
char *sock_path = get_notification_channel_sock_path();
- DBG("[notification-thread] Creating notification channel UNIX socket at %s",
+ DBG("Creating notification channel UNIX socket at %s",
sock_path);
ret = lttcomm_create_unix_sock(sock_path);
if (ret < 0) {
- ERR("[notification-thread] Failed to create notification socket");
+ ERR("Failed to create notification socket");
goto error;
}
fd = ret;
}
if (getuid() == 0) {
- ret = chown(sock_path, 0,
- utils_get_group_id(config.tracing_group_name.value));
+ gid_t gid;
+
+ ret = utils_get_group_id(the_config.tracing_group_name.value,
+ true, &gid);
+ if (ret) {
+ /* Default to root group. */
+ gid = 0;
+ }
+
+ ret = chown(sock_path, 0, gid);
if (ret) {
ERR("Failed to set the notification channel socket's group");
ret = -1;
}
}
- DBG("[notification-thread] Notification channel UNIX socket created (fd = %i)",
+ DBG("Notification channel UNIX socket created (fd = %i)",
fd);
free(sock_path);
return fd;
ret = lttng_poll_add(poll_set, notification_channel_socket,
LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP);
if (ret < 0) {
- ERR("[notification-thread] Failed to add notification channel socket to pollset");
+ ERR("Failed to add notification channel socket to pollset");
goto error;
}
ret = lttng_poll_add(poll_set, lttng_pipe_get_readfd(handle->cmd_queue.event_pipe),
LPOLLIN | LPOLLERR);
if (ret < 0) {
- ERR("[notification-thread] Failed to add notification command queue event fd to pollset");
+ ERR("Failed to add notification command queue event fd to pollset");
goto error;
}
ret = lttng_poll_add(poll_set,
handle->channel_monitoring_pipes.ust32_consumer,
LPOLLIN | LPOLLERR);
if (ret < 0) {
- ERR("[notification-thread] Failed to add ust-32 channel monitoring pipe fd to pollset");
+ ERR("Failed to add ust-32 channel monitoring pipe fd to pollset");
goto error;
}
ret = lttng_poll_add(poll_set,
handle->channel_monitoring_pipes.ust64_consumer,
LPOLLIN | LPOLLERR);
if (ret < 0) {
- ERR("[notification-thread] Failed to add ust-64 channel monitoring pipe fd to pollset");
+ ERR("Failed to add ust-64 channel monitoring pipe fd to pollset");
goto error;
}
if (handle->channel_monitoring_pipes.kernel_consumer < 0) {
handle->channel_monitoring_pipes.kernel_consumer,
LPOLLIN | LPOLLERR);
if (ret < 0) {
- ERR("[notification-thread] Failed to add kernel channel monitoring pipe fd to pollset");
+ ERR("Failed to add kernel channel monitoring pipe fd to pollset");
goto error;
}
end:
ret = cds_lfht_destroy(state->client_socket_ht, NULL);
assert(!ret);
}
+ if (state->client_id_ht) {
+ ret = cds_lfht_destroy(state->client_id_ht, NULL);
+ assert(!ret);
+ }
if (state->triggers_ht) {
ret = handle_notification_thread_trigger_unregister_all(state);
assert(!ret);
assert(!ret);
}
if (state->channels_ht) {
- ret = cds_lfht_destroy(state->channels_ht,
- NULL);
+ ret = cds_lfht_destroy(state->channels_ht, NULL);
+ assert(!ret);
+ }
+ if (state->sessions_ht) {
+ ret = cds_lfht_destroy(state->sessions_ht, NULL);
+ assert(!ret);
+ }
+ if (state->triggers_by_name_uid_ht) {
+ ret = cds_lfht_destroy(state->triggers_by_name_uid_ht, NULL);
+ assert(!ret);
+ }
+ if (state->trigger_tokens_ht) {
+ ret = cds_lfht_destroy(state->trigger_tokens_ht, NULL);
+ assert(!ret);
+ }
+ /*
+ * Must be destroyed after all channels have been destroyed.
+ * See comment in struct lttng_session_trigger_list.
+ */
+ if (state->session_triggers_ht) {
+ ret = cds_lfht_destroy(state->session_triggers_ht, NULL);
assert(!ret);
}
-
if (state->notification_channel_socket >= 0) {
notification_channel_socket_destroy(
state->notification_channel_socket);
}
+
+ assert(cds_list_empty(&state->tracer_event_sources_list));
+
+ if (state->executor) {
+ action_executor_destroy(state->executor);
+ }
lttng_poll_clean(&state->events);
}
+static
+void mark_thread_as_ready(struct notification_thread_handle *handle)
+{
+ DBG("Marking notification thread as ready");
+ sem_post(&handle->ready);
+}
+
+static
+void wait_until_thread_is_ready(struct notification_thread_handle *handle)
+{
+ DBG("Waiting for notification thread to be ready");
+ sem_wait(&handle->ready);
+ DBG("Notification thread is ready");
+}
+
static
int init_thread_state(struct notification_thread_handle *handle,
struct notification_thread_state *state)
memset(state, 0, sizeof(*state));
state->notification_channel_socket = -1;
+ state->trigger_id.next_tracer_token = 1;
lttng_poll_init(&state->events);
ret = notification_channel_socket_create();
goto end;
}
- DBG("[notification-thread] Listening on notification channel socket");
+ DBG("Listening on notification channel socket");
ret = lttcomm_listen_unix_sock(state->notification_channel_socket);
if (ret < 0) {
- ERR("[notification-thread] Listen failed on notification channel socket");
+ ERR("Listen failed on notification channel socket");
goto error;
}
goto error;
}
+ state->client_id_ht = cds_lfht_new(DEFAULT_HT_SIZE, 1, 0,
+ CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
+ if (!state->client_id_ht) {
+ goto error;
+ }
+
state->channel_triggers_ht = cds_lfht_new(DEFAULT_HT_SIZE, 1, 0,
CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
if (!state->channel_triggers_ht) {
goto error;
}
+ state->session_triggers_ht = cds_lfht_new(DEFAULT_HT_SIZE, 1, 0,
+ CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
+ if (!state->session_triggers_ht) {
+ goto error;
+ }
+
state->channel_state_ht = cds_lfht_new(DEFAULT_HT_SIZE, 1, 0,
CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
if (!state->channel_state_ht) {
if (!state->channels_ht) {
goto error;
}
-
+ state->sessions_ht = cds_lfht_new(DEFAULT_HT_SIZE,
+ 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
+ if (!state->sessions_ht) {
+ goto error;
+ }
state->triggers_ht = cds_lfht_new(DEFAULT_HT_SIZE,
1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
if (!state->triggers_ht) {
goto error;
}
+ state->triggers_by_name_uid_ht = cds_lfht_new(DEFAULT_HT_SIZE,
+ 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
+ if (!state->triggers_by_name_uid_ht) {
+ goto error;
+ }
+
+ state->trigger_tokens_ht = cds_lfht_new(DEFAULT_HT_SIZE,
+ 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
+ if (!state->trigger_tokens_ht) {
+ goto error;
+ }
+
+ CDS_INIT_LIST_HEAD(&state->tracer_event_sources_list);
+
+ state->executor = action_executor_create(handle);
+ if (!state->executor) {
+ goto error;
+ }
+
+ state->restart_poll = false;
+
+ mark_thread_as_ready(handle);
end:
return 0;
error:
if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
ret = lttng_poll_del(&state->events, fd);
if (ret) {
- ERR("[notification-thread] Failed to remove consumer monitoring pipe from poll set");
+ ERR("Failed to remove consumer monitoring pipe from poll set");
}
goto end;
}
ret = handle_notification_thread_channel_sample(
state, fd, domain);
if (ret) {
- ERR("[notification-thread] Consumer sample handling error occurred");
+ ERR("Consumer sample handling error occurred");
+ ret = -1;
+ goto end;
+ }
+end:
+ return ret;
+}
+
+static int handle_event_notification_pipe(int event_source_fd,
+ enum lttng_domain_type domain,
+ uint32_t revents,
+ struct notification_thread_state *state)
+{
+ int ret = 0;
+
+ if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+ ret = handle_notification_thread_tracer_event_source_died(
+ state, event_source_fd);
+ if (ret) {
+ ERR("Failed to remove event notification pipe from poll set: fd = %d",
+ event_source_fd);
+ }
+ goto end;
+ }
+
+ if (testpoint(sessiond_handle_notifier_event_pipe)) {
+ ret = 0;
+ goto end;
+ }
+
+ if (caa_unlikely(notifier_consumption_paused)) {
+ DBG("Event notifier notification consumption paused, sleeping...");
+ sleep(1);
+ goto end;
+ }
+
+ ret = handle_notification_thread_event_notification(
+ state, event_source_fd, domain);
+ if (ret) {
+ ERR("Event notification handling error occurred for fd: %d",
+ event_source_fd);
ret = -1;
goto end;
}
+
end:
return ret;
}
+/*
+ * Return the event source domain type via parameter.
+ */
+static bool fd_is_event_notification_source(const struct notification_thread_state *state,
+ int fd,
+ enum lttng_domain_type *domain)
+{
+ struct notification_event_tracer_event_source_element *source_element;
+
+ assert(domain);
+
+ cds_list_for_each_entry(source_element,
+ &state->tracer_event_sources_list, node) {
+ if (source_element->fd != fd) {
+ continue;
+ }
+
+ *domain = source_element->domain;
+ return true;
+ }
+
+ return false;
+}
+
/*
* This thread services notification channel clients and commands received
* from various lttng-sessiond components over a command queue.
*/
+static
void *thread_notification(void *data)
{
int ret;
struct notification_thread_handle *handle = data;
struct notification_thread_state state;
+ enum lttng_domain_type domain;
+
+ DBG("Started notification thread");
- DBG("[notification-thread] Started notification thread");
+ health_register(the_health_sessiond, HEALTH_SESSIOND_TYPE_NOTIFICATION);
+ rcu_register_thread();
+ rcu_thread_online();
if (!handle) {
- ERR("[notification-thread] Invalid thread context provided");
+ ERR("Invalid thread context provided");
goto end;
}
- rcu_register_thread();
- rcu_thread_online();
-
- health_register(health_sessiond, HEALTH_SESSIOND_TYPE_NOTIFICATION);
health_code_update();
ret = init_thread_state(handle, &state);
goto end;
}
- /* Ready to handle client connections. */
- sessiond_notify_ready();
+ if (testpoint(sessiond_thread_notification)) {
+ goto end;
+ }
while (true) {
int fd_count, i;
health_poll_entry();
- DBG("[notification-thread] Entering poll wait");
+ DBG("Entering poll wait");
ret = lttng_poll_wait(&state.events, -1);
- DBG("[notification-thread] Poll wait returned (%i)", ret);
+ DBG("Poll wait returned (%i)", ret);
health_poll_exit();
if (ret < 0) {
/*
if (errno == EINTR) {
continue;
}
- ERR("[notification-thread] Error encountered during lttng_poll_wait (%i)", ret);
+ ERR("Error encountered during lttng_poll_wait (%i)", ret);
goto error;
}
+ /*
+ * Reset restart_poll flag so that calls below might turn it
+ * on.
+ */
+ state.restart_poll = false;
+
fd_count = ret;
for (i = 0; i < fd_count; i++) {
int fd = LTTNG_POLL_GETFD(&state.events, i);
uint32_t revents = LTTNG_POLL_GETEV(&state.events, i);
- if (!revents) {
- continue;
- }
- DBG("[notification-thread] Handling fd (%i) activity (%u)", fd, revents);
+ DBG("Handling fd (%i) activity (%u)", fd, revents);
if (fd == state.notification_channel_socket) {
if (revents & LPOLLIN) {
}
} else if (revents &
(LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
- ERR("[notification-thread] Notification socket poll error");
+ ERR("Notification socket poll error");
goto error;
} else {
- ERR("[notification-thread] Unexpected poll events %u for notification socket %i", revents, fd);
+ ERR("Unexpected poll events %u for notification socket %i", revents, fd);
goto error;
}
} else if (fd == lttng_pipe_get_readfd(handle->cmd_queue.event_pipe)) {
ret = handle_notification_thread_command(handle,
&state);
if (ret < 0) {
- DBG("[notification-thread] Error encountered while servicing command queue");
+ DBG("Error encountered while servicing command queue");
goto error;
} else if (ret > 0) {
goto exit;
if (ret) {
goto error;
}
+ } else if (fd_is_event_notification_source(&state, fd, &domain)) {
+ ret = handle_event_notification_pipe(fd, domain, revents, &state);
+ if (ret) {
+ goto error;
+ }
} else {
/* Activity on a client's socket. */
if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
}
}
}
+
+ /*
+ * Calls above might have changed the state of the
+ * FDs in `state.events`. Call _poll_wait() again to
+ * ensure we have a consistent state.
+ */
+ if (state.restart_poll) {
+ break;
+ }
}
}
exit:
error:
fini_thread_state(&state);
- health_unregister(health_sessiond);
+end:
rcu_thread_offline();
rcu_unregister_thread();
-end:
+ health_unregister(the_health_sessiond);
+ return NULL;
+}
+
+static
+bool shutdown_notification_thread(void *thread_data)
+{
+ struct notification_thread_handle *handle = thread_data;
+
+ notification_thread_command_quit(handle);
+ return true;
+}
+
+struct lttng_thread *launch_notification_thread(
+ struct notification_thread_handle *handle)
+{
+ struct lttng_thread *thread;
+
+ thread = lttng_thread_create("Notification",
+ thread_notification,
+ shutdown_notification_thread,
+ NULL,
+ handle);
+ if (!thread) {
+ goto error;
+ }
+
+ /*
+ * Wait for the thread to be marked as "ready" before returning
+ * as other subsystems depend on the notification subsystem
+ * (e.g. rotation thread).
+ */
+ wait_until_thread_is_ready(handle);
+ return thread;
+error:
return NULL;
}