X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fnotification-thread.c;h=c15e356ada4187f7e044897a3d0a23ae0d48791c;hp=e0937d8e58eb577d2f088968d7a16299fe5db4c8;hb=3a5f70173aa04d11ccb22694d5d31a702cad33ab;hpb=8ada111f6d3ab40d1c33cf1a7b2546de9a47d1d5 diff --git a/src/bin/lttng-sessiond/notification-thread.c b/src/bin/lttng-sessiond/notification-thread.c index e0937d8e5..c15e356ad 100644 --- a/src/bin/lttng-sessiond/notification-thread.c +++ b/src/bin/lttng-sessiond/notification-thread.c @@ -1,18 +1,8 @@ /* - * Copyright (C) 2017 - Jérémie Galarneau + * Copyright (C) 2017 Jérémie Galarneau * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License, version 2 only, as - * published by the Free Software Foundation. + * SPDX-License-Identifier: GPL-2.0-only * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for - * more details. - * - * You should have received a copy of the GNU General Public License along with - * this program; if not, write to the Free Software Foundation, Inc., 51 - * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ #define _LGPL_SOURCE @@ -27,7 +17,6 @@ #include #include #include -#include #include #include #include @@ -37,119 +26,18 @@ #include "notification-thread-commands.h" #include "lttng-sessiond.h" #include "health-sessiond.h" +#include "thread.h" +#include "testpoint.h" + +#include "kernel.h" +#include #include #include #include -/** - * This thread maintains an internal state associating clients and triggers. - * - * In order to speed-up and simplify queries, hash tables providing the - * following associations are maintained: - * - * - client_socket_ht: associate a client's socket (fd) to its "struct client" - * This hash table owns the "struct client" which must thus be - * disposed-of on removal from the hash table. - * - * - channel_triggers_ht: - * associates a channel key to a list of - * struct lttng_trigger_list_nodes. The triggers in this list are - * those that have conditions that apply to this channel. - * This hash table owns the list, but not the triggers themselves. - * - * - channel_state_ht: - * associates a pair (channel key, channel domain) to its last - * sampled state received from the consumer daemon - * (struct channel_state). - * This previous sample is kept to implement edge-triggered - * conditions as we need to detect the state transitions. - * This hash table owns the channel state. - * - * - notification_trigger_clients_ht: - * associates notification-emitting triggers to clients - * (struct notification_client_ht_node) subscribed to those - * conditions. - * The condition's hash and match functions are used directly since - * all triggers in this hash table have the "notify" action. - * This hash table holds no ownership. - * - * - channels_ht: - * associates a channel_key to a struct channel_info. The hash table - * holds the ownership of the struct channel_info. - * - * - triggers_ht: - * associated a condition to a struct lttng_trigger_ht_element. - * The hash table holds the ownership of the - * lttng_trigger_ht_elements along with the triggers themselves. - * - * The thread reacts to the following internal events: - * 1) creation of a tracing channel, - * 2) destruction of a tracing channel, - * 3) registration of a trigger, - * 4) unregistration of a trigger, - * 5) reception of a channel monitor sample from the consumer daemon. - * - * Events specific to notification-emitting triggers: - * 6) connection of a notification client, - * 7) disconnection of a notification client, - * 8) subscription of a client to a conditions' notifications, - * 9) unsubscription of a client from a conditions' notifications, - * - * - * 1) Creation of a tracing channel - * - notification_trigger_clients_ht is traversed to identify - * triggers which apply to this new channel, - * - triggers identified are added to the channel_triggers_ht. - * - add channel to channels_ht - * - * 2) Destruction of a tracing channel - * - remove entry from channel_triggers_ht, releasing the list wrapper and - * elements, - * - remove entry from the channel_state_ht. - * - remove channel from channels_ht - * - * 3) Registration of a trigger - * - if the trigger's action is of type "notify", - * - traverse the list of conditions of every client to build a list of - * clients which have to be notified when this trigger's condition is met, - * - add list of clients (even if it is empty) to the - * notification_trigger_clients_ht, - * - add trigger to channel_triggers_ht (if applicable), - * - add trigger to triggers_ht - * - * 4) Unregistration of a trigger - * - if the trigger's action is of type "notify", - * - remove the trigger from the notification_trigger_clients_ht, - * - remove trigger from channel_triggers_ht (if applicable), - * - remove trigger from triggers_ht - * - * 5) Reception of a channel monitor sample from the consumer daemon - * - evaluate the conditions associated with the triggers found in - * the channel_triggers_ht, - * - if a condition evaluates to "true" and the condition is of type - * "notify", query the notification_trigger_clients_ht and send - * a notification to the clients. - * - * 6) Connection of a client - * - add client socket to the client_socket_ht. - * - * 7) Disconnection of a client - * - remove client socket from the client_socket_ht, - * - traverse all conditions to which the client is subscribed and remove - * the client from the notification_trigger_clients_ht. - * - * 8) Subscription of a client to a condition's notifications - * - Add the condition to the client's list of subscribed conditions, - * - Look-up notification_trigger_clients_ht and add the client to - * list of clients. - * - * 9) Unsubscription of a client to a condition's notifications - * - Remove the condition from the client's list of subscribed conditions, - * - Look-up notification_trigger_clients_ht and remove the client - * from the list of clients. - */ +int notifier_consumption_paused; /* * Destroy the thread data previously created by the init function. */ @@ -162,17 +50,13 @@ void notification_thread_handle_destroy( goto end; } - if (handle->cmd_queue.event_fd < 0) { - goto end; - } - ret = close(handle->cmd_queue.event_fd); - if (ret < 0) { - PERROR("close notification command queue event_fd"); - } - - assert(cds_list_empty(&handle->cmd_queue.list)); + LTTNG_ASSERT(cds_list_empty(&handle->cmd_queue.list)); pthread_mutex_destroy(&handle->cmd_queue.lock); + sem_destroy(&handle->ready); + if (handle->cmd_queue.event_pipe) { + lttng_pipe_destroy(handle->cmd_queue.event_pipe); + } if (handle->channel_monitoring_pipes.ust32_consumer >= 0) { ret = close(handle->channel_monitoring_pipes.ust32_consumer); if (ret) { @@ -191,6 +75,7 @@ void notification_thread_handle_destroy( PERROR("close kernel consumer channel monitoring pipe"); } } + end: free(handle); } @@ -202,18 +87,24 @@ struct notification_thread_handle *notification_thread_handle_create( { int ret; struct notification_thread_handle *handle; + struct lttng_pipe *event_pipe = NULL; handle = zmalloc(sizeof(*handle)); if (!handle) { goto end; } - /* FIXME Replace eventfd by a pipe to support older kernels. */ - handle->cmd_queue.event_fd = eventfd(0, EFD_CLOEXEC); - if (handle->cmd_queue.event_fd < 0) { - PERROR("eventfd notification command queue"); + sem_init(&handle->ready, 0, 0); + + event_pipe = lttng_pipe_open(FD_CLOEXEC); + if (!event_pipe) { + ERR("event_pipe creation"); goto error; } + + handle->cmd_queue.event_pipe = event_pipe; + event_pipe = NULL; + CDS_INIT_LIST_HEAD(&handle->cmd_queue.list); ret = pthread_mutex_init(&handle->cmd_queue.lock, NULL); if (ret) { @@ -250,9 +141,11 @@ struct notification_thread_handle *notification_thread_handle_create( } else { handle->channel_monitoring_pipes.kernel_consumer = -1; } + end: return handle; error: + lttng_pipe_destroy(event_pipe); notification_thread_handle_destroy(handle); return NULL; } @@ -276,7 +169,7 @@ char *get_notification_channel_sock_path(void) goto error; } } else { - char *home_path = utils_get_home_dir(); + const char *home_path = utils_get_home_dir(); if (!home_path) { ERR("Can't get HOME directory for socket creation"); @@ -303,7 +196,7 @@ void notification_channel_socket_destroy(int fd) int ret; char *sock_path = get_notification_channel_sock_path(); - DBG("[notification-thread] Destroying notification channel socket"); + DBG("Destroying notification channel socket"); if (sock_path) { ret = unlink(sock_path); @@ -325,12 +218,12 @@ int notification_channel_socket_create(void) int fd = -1, ret; char *sock_path = get_notification_channel_sock_path(); - DBG("[notification-thread] Creating notification channel UNIX socket at %s", + DBG("Creating notification channel UNIX socket at %s", sock_path); ret = lttcomm_create_unix_sock(sock_path); if (ret < 0) { - ERR("[notification-thread] Failed to create notification socket"); + ERR("Failed to create notification socket"); goto error; } fd = ret; @@ -343,8 +236,16 @@ int notification_channel_socket_create(void) } if (getuid() == 0) { - ret = chown(sock_path, 0, - utils_get_group_id(tracing_group_name)); + gid_t gid; + + ret = utils_get_group_id(the_config.tracing_group_name.value, + true, &gid); + if (ret) { + /* Default to root group. */ + gid = 0; + } + + ret = chown(sock_path, 0, gid); if (ret) { ERR("Failed to set the notification channel socket's group"); ret = -1; @@ -352,7 +253,7 @@ int notification_channel_socket_create(void) } } - DBG("[notification-thread] Notification channel UNIX socket created (fd = %i)", + DBG("Notification channel UNIX socket created (fd = %i)", fd); free(sock_path); return fd; @@ -387,27 +288,27 @@ int init_poll_set(struct lttng_poll_event *poll_set, ret = lttng_poll_add(poll_set, notification_channel_socket, LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP); if (ret < 0) { - ERR("[notification-thread] Failed to add notification channel socket to pollset"); + ERR("Failed to add notification channel socket to pollset"); goto error; } - ret = lttng_poll_add(poll_set, handle->cmd_queue.event_fd, + ret = lttng_poll_add(poll_set, lttng_pipe_get_readfd(handle->cmd_queue.event_pipe), LPOLLIN | LPOLLERR); if (ret < 0) { - ERR("[notification-thread] Failed to add notification command queue event fd to pollset"); + ERR("Failed to add notification command queue event fd to pollset"); goto error; } ret = lttng_poll_add(poll_set, handle->channel_monitoring_pipes.ust32_consumer, LPOLLIN | LPOLLERR); if (ret < 0) { - ERR("[notification-thread] Failed to add ust-32 channel monitoring pipe fd to pollset"); + ERR("Failed to add ust-32 channel monitoring pipe fd to pollset"); goto error; } ret = lttng_poll_add(poll_set, handle->channel_monitoring_pipes.ust64_consumer, LPOLLIN | LPOLLERR); if (ret < 0) { - ERR("[notification-thread] Failed to add ust-64 channel monitoring pipe fd to pollset"); + ERR("Failed to add ust-64 channel monitoring pipe fd to pollset"); goto error; } if (handle->channel_monitoring_pipes.kernel_consumer < 0) { @@ -417,7 +318,7 @@ int init_poll_set(struct lttng_poll_event *poll_set, handle->channel_monitoring_pipes.kernel_consumer, LPOLLIN | LPOLLERR); if (ret < 0) { - ERR("[notification-thread] Failed to add kernel channel monitoring pipe fd to pollset"); + ERR("Failed to add kernel channel monitoring pipe fd to pollset"); goto error; } end: @@ -434,42 +335,85 @@ void fini_thread_state(struct notification_thread_state *state) if (state->client_socket_ht) { ret = handle_notification_thread_client_disconnect_all(state); - assert(!ret); + LTTNG_ASSERT(!ret); ret = cds_lfht_destroy(state->client_socket_ht, NULL); - assert(!ret); + LTTNG_ASSERT(!ret); + } + if (state->client_id_ht) { + ret = cds_lfht_destroy(state->client_id_ht, NULL); + LTTNG_ASSERT(!ret); } if (state->triggers_ht) { ret = handle_notification_thread_trigger_unregister_all(state); - assert(!ret); + LTTNG_ASSERT(!ret); ret = cds_lfht_destroy(state->triggers_ht, NULL); - assert(!ret); + LTTNG_ASSERT(!ret); } if (state->channel_triggers_ht) { ret = cds_lfht_destroy(state->channel_triggers_ht, NULL); - assert(!ret); + LTTNG_ASSERT(!ret); } if (state->channel_state_ht) { ret = cds_lfht_destroy(state->channel_state_ht, NULL); - assert(!ret); + LTTNG_ASSERT(!ret); } if (state->notification_trigger_clients_ht) { ret = cds_lfht_destroy(state->notification_trigger_clients_ht, NULL); - assert(!ret); + LTTNG_ASSERT(!ret); } if (state->channels_ht) { - ret = cds_lfht_destroy(state->channels_ht, - NULL); - assert(!ret); + ret = cds_lfht_destroy(state->channels_ht, NULL); + LTTNG_ASSERT(!ret); + } + if (state->sessions_ht) { + ret = cds_lfht_destroy(state->sessions_ht, NULL); + LTTNG_ASSERT(!ret); + } + if (state->triggers_by_name_uid_ht) { + ret = cds_lfht_destroy(state->triggers_by_name_uid_ht, NULL); + LTTNG_ASSERT(!ret); + } + if (state->trigger_tokens_ht) { + ret = cds_lfht_destroy(state->trigger_tokens_ht, NULL); + LTTNG_ASSERT(!ret); + } + /* + * Must be destroyed after all channels have been destroyed. + * See comment in struct lttng_session_trigger_list. + */ + if (state->session_triggers_ht) { + ret = cds_lfht_destroy(state->session_triggers_ht, NULL); + LTTNG_ASSERT(!ret); } - if (state->notification_channel_socket >= 0) { notification_channel_socket_destroy( state->notification_channel_socket); } + + LTTNG_ASSERT(cds_list_empty(&state->tracer_event_sources_list)); + + if (state->executor) { + action_executor_destroy(state->executor); + } lttng_poll_clean(&state->events); } +static +void mark_thread_as_ready(struct notification_thread_handle *handle) +{ + DBG("Marking notification thread as ready"); + sem_post(&handle->ready); +} + +static +void wait_until_thread_is_ready(struct notification_thread_handle *handle) +{ + DBG("Waiting for notification thread to be ready"); + sem_wait(&handle->ready); + DBG("Notification thread is ready"); +} + static int init_thread_state(struct notification_thread_handle *handle, struct notification_thread_state *state) @@ -478,6 +422,7 @@ int init_thread_state(struct notification_thread_handle *handle, memset(state, 0, sizeof(*state)); state->notification_channel_socket = -1; + state->trigger_id.next_tracer_token = 1; lttng_poll_init(&state->events); ret = notification_channel_socket_create(); @@ -492,10 +437,10 @@ int init_thread_state(struct notification_thread_handle *handle, goto end; } - DBG("[notification-thread] Listening on notification channel socket"); + DBG("Listening on notification channel socket"); ret = lttcomm_listen_unix_sock(state->notification_channel_socket); if (ret < 0) { - ERR("[notification-thread] Listen failed on notification channel socket"); + ERR("Listen failed on notification channel socket"); goto error; } @@ -505,12 +450,24 @@ int init_thread_state(struct notification_thread_handle *handle, goto error; } + state->client_id_ht = cds_lfht_new(DEFAULT_HT_SIZE, 1, 0, + CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL); + if (!state->client_id_ht) { + goto error; + } + state->channel_triggers_ht = cds_lfht_new(DEFAULT_HT_SIZE, 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL); if (!state->channel_triggers_ht) { goto error; } + state->session_triggers_ht = cds_lfht_new(DEFAULT_HT_SIZE, 1, 0, + CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL); + if (!state->session_triggers_ht) { + goto error; + } + state->channel_state_ht = cds_lfht_new(DEFAULT_HT_SIZE, 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL); if (!state->channel_state_ht) { @@ -528,12 +485,38 @@ int init_thread_state(struct notification_thread_handle *handle, if (!state->channels_ht) { goto error; } - + state->sessions_ht = cds_lfht_new(DEFAULT_HT_SIZE, + 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL); + if (!state->sessions_ht) { + goto error; + } state->triggers_ht = cds_lfht_new(DEFAULT_HT_SIZE, 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL); if (!state->triggers_ht) { goto error; } + state->triggers_by_name_uid_ht = cds_lfht_new(DEFAULT_HT_SIZE, + 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL); + if (!state->triggers_by_name_uid_ht) { + goto error; + } + + state->trigger_tokens_ht = cds_lfht_new(DEFAULT_HT_SIZE, + 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL); + if (!state->trigger_tokens_ht) { + goto error; + } + + CDS_INIT_LIST_HEAD(&state->tracer_event_sources_list); + + state->executor = action_executor_create(handle); + if (!state->executor) { + goto error; + } + + state->restart_poll = false; + + mark_thread_as_ready(handle); end: return 0; error: @@ -561,7 +544,7 @@ int handle_channel_monitoring_pipe(int fd, uint32_t revents, if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { ret = lttng_poll_del(&state->events, fd); if (ret) { - ERR("[notification-thread] Failed to remove consumer monitoring pipe from poll set"); + ERR("Failed to remove consumer monitoring pipe from poll set"); } goto end; } @@ -569,7 +552,7 @@ int handle_channel_monitoring_pipe(int fd, uint32_t revents, ret = handle_notification_thread_channel_sample( state, fd, domain); if (ret) { - ERR("[notification-thread] Consumer sample handling error occured"); + ERR("Consumer sample handling error occurred"); ret = -1; goto end; } @@ -577,27 +560,94 @@ end: return ret; } +static int handle_event_notification_pipe(int event_source_fd, + enum lttng_domain_type domain, + uint32_t revents, + struct notification_thread_state *state) +{ + int ret = 0; + + if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + ret = handle_notification_thread_tracer_event_source_died( + state, event_source_fd); + if (ret) { + ERR("Failed to remove event notification pipe from poll set: fd = %d", + event_source_fd); + } + goto end; + } + + if (testpoint(sessiond_handle_notifier_event_pipe)) { + ret = 0; + goto end; + } + + if (caa_unlikely(notifier_consumption_paused)) { + DBG("Event notifier notification consumption paused, sleeping..."); + sleep(1); + goto end; + } + + ret = handle_notification_thread_event_notification( + state, event_source_fd, domain); + if (ret) { + ERR("Event notification handling error occurred for fd: %d", + event_source_fd); + ret = -1; + goto end; + } + +end: + return ret; +} + +/* + * Return the event source domain type via parameter. + */ +static bool fd_is_event_notification_source(const struct notification_thread_state *state, + int fd, + enum lttng_domain_type *domain) +{ + struct notification_event_tracer_event_source_element *source_element; + + LTTNG_ASSERT(domain); + + cds_list_for_each_entry(source_element, + &state->tracer_event_sources_list, node) { + if (source_element->fd != fd) { + continue; + } + + *domain = source_element->domain; + return true; + } + + return false; +} + /* * This thread services notification channel clients and commands received * from various lttng-sessiond components over a command queue. */ +static void *thread_notification(void *data) { int ret; struct notification_thread_handle *handle = data; struct notification_thread_state state; + enum lttng_domain_type domain; - DBG("[notification-thread] Started notification thread"); + DBG("Started notification thread"); + + health_register(the_health_sessiond, HEALTH_SESSIOND_TYPE_NOTIFICATION); + rcu_register_thread(); + rcu_thread_online(); if (!handle) { - ERR("[notification-thread] Invalid thread context provided"); + ERR("Invalid thread context provided"); goto end; } - rcu_register_thread(); - rcu_thread_online(); - - health_register(health_sessiond, HEALTH_SESSIOND_TYPE_NOTIFICATION); health_code_update(); ret = init_thread_state(handle, &state); @@ -605,16 +655,17 @@ void *thread_notification(void *data) goto end; } - /* Ready to handle client connections. */ - sessiond_notify_ready(); + if (testpoint(sessiond_thread_notification)) { + goto end; + } while (true) { int fd_count, i; health_poll_entry(); - DBG("[notification-thread] Entering poll wait"); + DBG("Entering poll wait"); ret = lttng_poll_wait(&state.events, -1); - DBG("[notification-thread] Poll wait returned (%i)", ret); + DBG("Poll wait returned (%i)", ret); health_poll_exit(); if (ret < 0) { /* @@ -623,19 +674,22 @@ void *thread_notification(void *data) if (errno == EINTR) { continue; } - ERR("[notification-thread] Error encountered during lttng_poll_wait (%i)", ret); + ERR("Error encountered during lttng_poll_wait (%i)", ret); goto error; } + /* + * Reset restart_poll flag so that calls below might turn it + * on. + */ + state.restart_poll = false; + fd_count = ret; for (i = 0; i < fd_count; i++) { int fd = LTTNG_POLL_GETFD(&state.events, i); uint32_t revents = LTTNG_POLL_GETEV(&state.events, i); - if (!revents) { - continue; - } - DBG("[notification-thread] Handling fd (%i) activity (%u)", fd, revents); + DBG("Handling fd (%i) activity (%u)", fd, revents); if (fd == state.notification_channel_socket) { if (revents & LPOLLIN) { @@ -646,17 +700,17 @@ void *thread_notification(void *data) } } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { - ERR("[notification-thread] Notification socket poll error"); + ERR("Notification socket poll error"); goto error; } else { - ERR("[notification-thread] Unexpected poll events %u for notification socket %i", revents, fd); + ERR("Unexpected poll events %u for notification socket %i", revents, fd); goto error; } - } else if (fd == handle->cmd_queue.event_fd) { + } else if (fd == lttng_pipe_get_readfd(handle->cmd_queue.event_pipe)) { ret = handle_notification_thread_command(handle, &state); if (ret < 0) { - DBG("[notification-thread] Error encountered while servicing command queue"); + DBG("Error encountered while servicing command queue"); goto error; } else if (ret > 0) { goto exit; @@ -669,6 +723,11 @@ void *thread_notification(void *data) if (ret) { goto error; } + } else if (fd_is_event_notification_source(&state, fd, &domain)) { + ret = handle_event_notification_pipe(fd, domain, revents, &state); + if (ret) { + goto error; + } } else { /* Activity on a client's socket. */ if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { @@ -702,14 +761,57 @@ void *thread_notification(void *data) } } } + + /* + * Calls above might have changed the state of the + * FDs in `state.events`. Call _poll_wait() again to + * ensure we have a consistent state. + */ + if (state.restart_poll) { + break; + } } } exit: error: fini_thread_state(&state); - health_unregister(health_sessiond); +end: rcu_thread_offline(); rcu_unregister_thread(); -end: + health_unregister(the_health_sessiond); + return NULL; +} + +static +bool shutdown_notification_thread(void *thread_data) +{ + struct notification_thread_handle *handle = thread_data; + + notification_thread_command_quit(handle); + return true; +} + +struct lttng_thread *launch_notification_thread( + struct notification_thread_handle *handle) +{ + struct lttng_thread *thread; + + thread = lttng_thread_create("Notification", + thread_notification, + shutdown_notification_thread, + NULL, + handle); + if (!thread) { + goto error; + } + + /* + * Wait for the thread to be marked as "ready" before returning + * as other subsystems depend on the notification subsystem + * (e.g. rotation thread). + */ + wait_until_thread_is_ready(handle); + return thread; +error: return NULL; }